trackdirect2/server/trackdirect/collector/PacketBatchInserter.py

426 lines
19 KiB
Python

import logging
import psycopg2
import psycopg2.extras
from trackdirect.collector.StationLatestPacketModifier import StationLatestPacketModifier
from trackdirect.collector.PacketMapIdModifier import PacketMapIdModifier
from trackdirect.database.PacketTableCreator import PacketTableCreator
from trackdirect.database.PacketPathTableCreator import PacketPathTableCreator
from trackdirect.database.PacketWeatherTableCreator import PacketWeatherTableCreator
from trackdirect.database.PacketTelemetryTableCreator import PacketTelemetryTableCreator
from trackdirect.database.PacketOgnTableCreator import PacketOgnTableCreator
class PacketBatchInserter():
"""PacketBatchInserter is used to add a list of packets to the database
"""
def __init__(self, db, dbNoAutoCommit):
"""The __init__ method.
Args:
db (psycopg2.Connection): Database connection (with autocommit)
dbNoAutoCommit (psycopg2.Connection): Database connection (without autocommit)
"""
self.db = db
self.dbNoAutoCommit = dbNoAutoCommit
self.logger = logging.getLogger(__name__)
self.packetIdList = []
self.weatherPacketIdList = []
self.ognPacketIdList = []
self.telemetryPacketIdList = []
self.pathPacketIdList = []
self.positionPacketIdList = []
self.confirmedPositionPacketIdList = []
def insert(self, packets, sourceId):
"""Insert this packets into database
Args:
packets (array): Packets to insert
sourceId (int): Id that corresponds to id in source-table
"""
cur = self.dbNoAutoCommit.cursor()
try:
# Make sure needed tables exists before starting the main database transaction
self._makeSureTablesExists(packets)
# insert telemetry data
self._insertTelemetryDefinitions(packets)
# Update mapId on previous packets
packetTableCreator = PacketTableCreator(self.db)
packetMapIdModifier = PacketMapIdModifier(cur, packetTableCreator)
packetMapIdModifier.execute(packets)
# insert into packet (and packet_path ...)
self._insertIntoPacketTables(packets, cur)
self.dbNoAutoCommit.commit()
cur.close()
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
self.dbNoAutoCommit.rollback()
cur.close()
raise e
except Exception as e:
# Something went wrong
self.logger.error(e, exc_info=1)
self.dbNoAutoCommit.rollback()
cur.close()
return
self._performPostInsertActions(packets)
def _makeSureTablesExists(self, packets):
"""Make sures all tables needed to insert specified packets exists
Args:
packets (array): Packets to insert
"""
timestamp = packets[0].timestamp # All packets is known to be on the same date
packetTableCreator = PacketTableCreator(self.db)
packetTable = packetTableCreator.getPacketTable(timestamp)
packetPathTableCreator = PacketPathTableCreator(self.db)
packetPathTable = packetPathTableCreator.getPacketPathTable(timestamp)
packetWeatherTableCreator = PacketWeatherTableCreator(self.db)
packetWeatherTable = packetWeatherTableCreator.getPacketWeatherTable(
timestamp)
packetTelemetryTableCreator = PacketTelemetryTableCreator(self.db)
packetTelemetryTable = packetTelemetryTableCreator.getPacketTelemetryTable(
timestamp)
packetOgnTableCreator = PacketOgnTableCreator(self.db)
packetOgnTable = packetOgnTableCreator.getPacketOgnTable(timestamp)
def _performPostInsertActions(self, packets):
"""Perform post insert updates like updating station latest packet and related
Args:
packets (array): Packets to insert
"""
timestamp = packets[0].timestamp
latestPacketModifier = StationLatestPacketModifier(self.db)
latestPacketModifier.updateStationLatestPacket(
self.packetIdList, timestamp)
latestPacketModifier.updateStationLatestTelemetryPacket(
self.telemetryPacketIdList, timestamp)
latestPacketModifier.updateStationLatestWeatherPacket(
self.weatherPacketIdList, timestamp)
latestPacketModifier.updateStationLatestOgnPacket(
self.ognPacketIdList, timestamp)
latestPacketModifier.updateStationLatestLocationPacket(
self.positionPacketIdList, timestamp)
latestPacketModifier.updateStationLatestConfirmedPacket(
self.confirmedPositionPacketIdList, timestamp)
def _insertIntoPacketTables(self, packets, cur):
"""Insert packets into the correct packet tables
Args:
packets (array): Packets to insert
cur (cursor): Database curser to use
"""
self._insertIntoPacketTable(packets, cur)
self._insertIntoPacketPathTable(packets, cur)
self._insertIntoPacketWeatherTable(packets, cur)
self._insertIntoPacketOgnTable(packets, cur)
self._insertIntoPacketTelemetryTable(packets, cur)
def _insertIntoPacketTable(self, packets, cur):
"""Insert packets into the correct packet table
Args:
packets (array): Packets to insert
cur (cursor): Database curser to use
"""
timestamp = packets[0].timestamp
packetTableCreator = PacketTableCreator(self.db)
packetTable = packetTableCreator.getPacketTable(timestamp)
datePacketTuples = []
for packet in packets:
datePacketTuples.append((packet.stationId,
packet.senderId,
packet.mapId,
packet.sourceId,
packet.packetTypeId,
packet.latitude,
packet.longitude,
packet.posambiguity,
packet.symbol,
packet.symbolTable,
packet.mapSector,
packet.relatedMapSectors,
packet.markerId,
packet.markerCounter,
packet.speed,
packet.course,
packet.altitude,
packet.rng,
packet.phg,
packet.latestPhgTimestamp,
packet.latestRngTimestamp,
packet.timestamp,
packet.packetTailTimestamp,
packet.isMoving,
packet.reportedTimestamp,
packet.positionTimestamp,
packet.comment,
packet.rawPath,
packet.raw))
try:
# insert into packetYYYYMMDD
argString = b','.join(cur.mogrify(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", x) for x in datePacketTuples)
sql = "insert into " + packetTable + "(station_id, sender_id, map_id, source_id, packet_type_id, latitude, longitude, posambiguity, symbol, symbol_table, map_sector, related_map_sectors, marker_id, marker_counter, speed, course, altitude, rng, phg, latest_phg_timestamp, latest_rng_timestamp, timestamp, packet_tail_timestamp, is_moving, reported_timestamp, position_timestamp, comment, raw_path, raw) values " + argString.decode() + " RETURNING id"
cur.execute(sql)
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
raise e
except Exception as e:
# Something went wrong, log error so we can fix problem
self.logger.error(e, exc_info=1)
self.logger.error(sql)
return
i = 0
for record in cur:
if packets[i]:
packets[i].id = record["id"]
if packets[i].mapId in [1]:
self.confirmedPositionPacketIdList.append(record["id"])
elif packets[i].mapId in [1, 5, 7, 9]:
self.positionPacketIdList.append(record["id"])
else:
# We only need to add the packet to the packetIdList if not in the positionPacketIdList or confirmedPositionPacketIdList array's
self.packetIdList.append(record["id"])
i += 1
def _insertIntoPacketPathTable(self, packets, cur):
"""Insert packets into the correct packet path table
Args:
packets (array): Packets to insert
cur (cursor): Database curser to use
"""
timestamp = packets[0].timestamp
packetPathTableCreator = PacketPathTableCreator(self.db)
packetPathTable = packetPathTableCreator.getPacketPathTable(timestamp)
i = 0
pathTuples = []
for packet in packets:
if (packet.stationIdPath):
self.pathPacketIdList.append(packet.id)
number = 0
for stationId in packet.stationIdPath:
if (packet.stationLocationPath
and packet.stationLocationPath[number]):
latitude = packet.stationLocationPath[number][0]
longitude = packet.stationLocationPath[number][1]
distance = packet.getTransmitDistance()
pathTuples.append(
(packet.id, stationId, latitude, longitude, packet.timestamp, distance, number, packet.stationId, packet.latitude, packet.longitude))
number += 1
i += 1
# insert into packetYYYYMMDD_path
if pathTuples:
try:
argString = ','.join(cur.mogrify(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", x) for x in pathTuples)
cur.execute("insert into " + packetPathTable +
"(packet_id, station_id, latitude, longitude, timestamp, distance, number, sending_station_id, sending_latitude, sending_longitude) values " + argString)
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
raise e
except Exception as e:
# Something went wrong, log error so we can fix problem
self.logger.error(e, exc_info=1)
self.logger.error(argString)
def _insertIntoPacketWeatherTable(self, packets, cur):
"""Insert packets into the correct packet weather table
Args:
packets (array): Packets to insert
cur (cursor): Database curser to use
"""
timestamp = packets[0].timestamp
packetWeatherTableCreator = PacketWeatherTableCreator(self.db)
packetWeatherTable = packetWeatherTableCreator.getPacketWeatherTable(
timestamp)
i = 0
weatherTuples = []
for packet in packets:
if (packet.weather):
self.weatherPacketIdList.append(packet.id)
weatherTuples.append((packet.id,
packet.stationId,
packet.timestamp,
packet.weather.humidity,
packet.weather.pressure,
packet.weather.rain1h,
packet.weather.rain24h,
packet.weather.rainSinceMidnight,
packet.weather.temperature,
packet.weather.windDirection,
packet.weather.windGust,
packet.weather.windSpeed,
packet.weather.luminosity,
packet.weather.snow,
packet.weather.wxRawTimestamp))
i += 1
# insert into packetYYYYMMDD_weather
if weatherTuples:
try:
argString = ','.join(cur.mogrify(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", x) for x in weatherTuples)
cur.execute("insert into " + packetWeatherTable +
"(packet_id, station_id, timestamp, humidity, pressure, rain_1h, rain_24h, rain_since_midnight, temperature, wind_direction, wind_gust, wind_speed, luminosity, snow, wx_raw_timestamp) values " + argString)
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
raise e
except Exception as e:
# Something went wrong, log error so we can fix problem
self.logger.error(e, exc_info=1)
self.logger.error(argString)
def _insertIntoPacketOgnTable(self, packets, cur):
"""Insert packets into the correct packet OGN table
Args:
packets (array): Packets to insert
cur (cursor): Database curser to use
"""
timestamp = packets[0].timestamp
packetOgnTableCreator = PacketOgnTableCreator(self.db)
packetOgnTable = packetOgnTableCreator.getPacketOgnTable(timestamp)
i = 0
ognTuples = []
for packet in packets:
if (packet.ogn):
self.ognPacketIdList.append(packet.id)
ognTuples.append((packet.id,
packet.stationId,
packet.timestamp,
packet.ogn.ognSenderAddress,
packet.ogn.ognAddressTypeId,
packet.ogn.ognAircraftTypeId,
packet.ogn.ognClimbRate,
packet.ogn.ognTurnRate,
packet.ogn.ognSignalToNoiseRatio,
packet.ogn.ognBitErrorsCorrected,
packet.ogn.ognFrequencyOffset))
i += 1
# insert into packetYYYYMMDD_ogn
if ognTuples:
try:
argString = ','.join(cur.mogrify(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", x) for x in ognTuples)
cur.execute("insert into " + packetOgnTable + "(packet_id, station_id, timestamp, ogn_sender_address, ogn_address_type_id, ogn_aircraft_type_id, ogn_climb_rate, ogn_turn_rate, ogn_signal_to_noise_ratio, ogn_bit_errors_corrected, ogn_frequency_offset) values " + argString)
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
raise e
except Exception as e:
# Something went wrong, log error so we can fix problem
self.logger.error(e, exc_info=1)
self.logger.error(argString)
def _insertIntoPacketTelemetryTable(self, packets, cur):
"""Insert packets into the correct packet telemetry table
Args:
packets (array): Packets to insert
cur (cursor): Database curser to use
"""
timestamp = packets[0].timestamp
packetTelemetryTableCreator = PacketTelemetryTableCreator(self.db)
packetTelemetryTable = packetTelemetryTableCreator.getPacketTelemetryTable(
timestamp)
i = 0
telemetryTuples = []
for packet in packets:
if (packet.telemetry):
self.telemetryPacketIdList.append(packet.id)
if (not packet.telemetry.isDuplicate()):
telemetryTuples.append((packet.id,
packet.stationId,
packet.timestamp,
packet.telemetry.val1,
packet.telemetry.val2,
packet.telemetry.val3,
packet.telemetry.val4,
packet.telemetry.val5,
packet.telemetry.bits,
packet.telemetry.seq))
i += 1
# insert into packetYYYYMMDD_telemetry
if telemetryTuples:
try:
argString = ','.join(cur.mogrify(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", x) for x in telemetryTuples)
cur.execute("insert into " + packetTelemetryTable +
"(packet_id, station_id, timestamp, val1, val2, val3, val4, val5, bits, seq) values " + argString + " returning id")
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
raise e
except Exception as e:
# Something went wrong, log error so we can fix problem
self.logger.error(e, exc_info=1)
packetTelemetryIds = []
record = cur.fetchone()
while record:
packetTelemetryIds.append(record['id'])
record = cur.fetchone()
try:
cur.execute("""update """ + packetTelemetryTable + """ packet_telemetry set
station_telemetry_param_id = (select id from station_telemetry_param where station_id = packet_telemetry.station_id and valid_to_ts is null),
station_telemetry_unit_id = (select id from station_telemetry_unit where station_id = packet_telemetry.station_id and valid_to_ts is null),
station_telemetry_eqns_id = (select id from station_telemetry_eqns where station_id = packet_telemetry.station_id and valid_to_ts is null),
station_telemetry_bits_id = (select id from station_telemetry_bits where station_id = packet_telemetry.station_id and valid_to_ts is null)
where id in %s""", (tuple(packetTelemetryIds), ))
except psycopg2.InterfaceError as e:
# Connection to database is lost, better just exit
raise e
except Exception as e:
# Something went wrong, log error so we can fix problem
self.logger.error(e, exc_info=1)
def _insertTelemetryDefinitions(self, packets):
"""Insert telemetry definitions (PARAM, UNIT, EQNS, BITS) if any exists in current packets
Args:
packets (array): Packets to insert
"""
for packet in packets:
if (packet.stationTelemetryBits):
packet.stationTelemetryBits.save()
if (packet.stationTelemetryEqns):
packet.stationTelemetryEqns.save()
if (packet.stationTelemetryParam):
packet.stationTelemetryParam.save()
if (packet.stationTelemetryUnit):
packet.stationTelemetryUnit.save()