378 lines
18 KiB
Python
378 lines
18 KiB
Python
import logging
|
|
from twisted.python import log
|
|
|
|
from math import floor, ceil
|
|
import datetime, time
|
|
|
|
import psycopg2, psycopg2.extras
|
|
|
|
from trackdirect.repositories.PacketRepository import PacketRepository
|
|
from trackdirect.repositories.StationRepository import StationRepository
|
|
from trackdirect.repositories.PacketWeatherRepository import PacketWeatherRepository
|
|
from trackdirect.repositories.PacketOgnRepository import PacketOgnRepository
|
|
from trackdirect.repositories.OgnDeviceRepository import OgnDeviceRepository
|
|
|
|
from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder
|
|
from trackdirect.database.DatabaseConnection import DatabaseConnection
|
|
|
|
from trackdirect.websocket.queries.MostRecentPacketsQuery import MostRecentPacketsQuery
|
|
from trackdirect.websocket.queries.MissingPacketsQuery import MissingPacketsQuery
|
|
|
|
|
|
class ResponseDataConverter():
|
|
"""An ResponseDataConverter instance is used to create response content data based on packet objects
|
|
"""
|
|
|
|
|
|
def __init__(self, state, db):
|
|
"""The __init__ method.
|
|
|
|
Args:
|
|
state (WebsocketConnectionState):
|
|
db (psycopg2.Connection): Database connection (with autocommit)
|
|
"""
|
|
self.state = state
|
|
self.logger = logging.getLogger('trackdirect')
|
|
|
|
self.db = db
|
|
self.packetRepository = PacketRepository(db)
|
|
self.stationRepository = StationRepository(db)
|
|
self.packetWeatherRepository = PacketWeatherRepository(db)
|
|
self.dbObjectFinder = DatabaseObjectFinder(db)
|
|
self.packetOgnRepository = PacketOgnRepository(db)
|
|
self.ognDeviceRepository = OgnDeviceRepository(db)
|
|
|
|
|
|
def getResponseData(self, packets, mapSectorList = None, flags = [], iterationCounter = 0) :
|
|
"""Create response data based on specified packets
|
|
|
|
Args:
|
|
packets (array): An array of the Packet's that should be converted to packet dict responses
|
|
mapSectorList (array): An array of the current handled map sectors
|
|
flags (array): An array with additional flags (like "realtime", "latest")
|
|
iterationCounter (int) This functionality will call itself to find related packets, this argument is used to remember the number of iterations
|
|
|
|
Returns:
|
|
An array of packet dicts
|
|
"""
|
|
responseData = []
|
|
for index, packet in enumerate(packets) :
|
|
packetDict = packet.getDict(True)
|
|
packetDict['packet_order_id'] = self._getPacketOrderId(packets, index, flags)
|
|
|
|
self._updateState(packet, mapSectorList, flags)
|
|
self._addOverwriteStatus(packetDict)
|
|
if (("latest" not in flags and "realtime" not in flags) or self.state.isStationHistoryOnMap(packet.stationId)) :
|
|
if (packetDict['packet_order_id'] == 1) :
|
|
self._addStationWeatherData(packetDict)
|
|
self._addStationTelemetryData(packetDict)
|
|
if (packet.sourceId == 5) :
|
|
self._addStationOgnData(packetDict)
|
|
if (packet.sourceId == 5) :
|
|
self._addStationOgnDeviceData(packetDict)
|
|
self._addPacketPhgRng(packetDict)
|
|
|
|
if ("realtime" not in flags) :
|
|
self._addStationIdPath(packetDict)
|
|
|
|
self._setFlags(packetDict, flags)
|
|
responseData.append(packetDict)
|
|
return self._extendResponseWithMorePackets(responseData, flags, iterationCounter)
|
|
|
|
|
|
def _getPacketOrderId(self, packets, index, flags) :
|
|
"""Returns the order id of the packet at specified index
|
|
|
|
Args:
|
|
packets (array): An array of the Packet's that should be converted to packet dict responses
|
|
index (int): Index of the packet that we want an order id for
|
|
flags (array): An array with additional flags (like "realtime", "latest")
|
|
|
|
Returns:
|
|
int
|
|
"""
|
|
if ("realtime" in flags) :
|
|
return 1
|
|
elif (len(packets) -1 == index):
|
|
# This is the last packet of all
|
|
return 1 # Last packet in response for this marker
|
|
elif (packets[index].markerId != packets[index + 1].markerId) :
|
|
# This is the last packet for this marker
|
|
return 1 # Last packet in response for this marker
|
|
elif (index == 0 or packets[index].markerId != packets[index - 1].markerId) :
|
|
# This is the first packet for this marker
|
|
return 3 # First packet in response for this marker
|
|
else :
|
|
return 2 # Middle packet in response for this marker
|
|
|
|
|
|
def _updateState(self, packet, mapSectorList, flags) :
|
|
"""Update connection state based on packet on the way to client
|
|
|
|
Args:
|
|
packet (Packet): The packet that is on the way to client
|
|
mapSectorList (array): An array of the current handled map sectors
|
|
flags (array): An array with additional flags (like "realtime", "latest")
|
|
"""
|
|
self.state.setStationLatestTimestamp(packet.stationId, packet.timestamp)
|
|
|
|
if (packet.stationId not in self.state.stationsOnMapDict) :
|
|
# Station should be added to stationsOnMapDict even if only latest packet is added
|
|
self.state.stationsOnMapDict[packet.stationId] = True
|
|
|
|
# self.state.setCompleteStationLatestTimestamp
|
|
# Note that we depend on that the real-time aprs-is sender make sure to send previous missing packets when a new is sent
|
|
if (self.state.isStationHistoryOnMap(packet.stationId)) :
|
|
self.state.setCompleteStationLatestTimestamp(packet.stationId, packet.timestamp)
|
|
elif ("latest" not in flags and "realtime" not in flags and "related" not in flags) :
|
|
self.state.setCompleteStationLatestTimestamp(packet.stationId, packet.timestamp)
|
|
elif ("related" in flags and packet.packetTailTimestamp == packet.timestamp) :
|
|
self.state.setCompleteStationLatestTimestamp(packet.stationId, packet.timestamp)
|
|
|
|
if (mapSectorList and packet.mapSector is not None and packet.mapSector in mapSectorList) :
|
|
if "latest" not in flags:
|
|
self.state.setMapSectorLatestTimeStamp(packet.mapSector, packet.timestamp)
|
|
else :
|
|
self.state.setMapSectorLatestOverwriteTimeStamp(packet.mapSector, packet.timestamp)
|
|
|
|
|
|
def _setFlags(self, packetDict, flags) :
|
|
"""Set additional flags that will tell client a bit more about the packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should modify
|
|
flags (array): An array with additional flags (like "realtime", "latest")
|
|
"""
|
|
if ("realtime" in flags) :
|
|
packetDict["db"] = 0
|
|
packetDict["realtime"] = 1
|
|
else :
|
|
packetDict["db"] = 1
|
|
packetDict["realtime"] = 0
|
|
|
|
def _addOverwriteStatus(self, packetDict) :
|
|
"""Set packet overwrite status
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should modify
|
|
"""
|
|
packetDict['overwrite'] = 0
|
|
|
|
# We assume that this method is called after the "complete station on map"-state has been updated
|
|
if (not self.state.isStationHistoryOnMap(packetDict["station_id"])) :
|
|
packetDict['overwrite'] = 1
|
|
|
|
|
|
def _addPacketPhgRng(self, packetDict) :
|
|
"""Add previous reported phg and rng to the specified packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should modify
|
|
"""
|
|
if ('phg' in packetDict and 'rng' in packetDict) :
|
|
if (packetDict['phg'] is None and packetDict['latest_phg_timestamp'] is not None and packetDict['latest_phg_timestamp'] < packetDict['timestamp']) :
|
|
relatedPacket = self.packetRepository.getObjectByStationIdAndTimestamp(packetDict['station_id'], packetDict['latest_phg_timestamp'])
|
|
if (relatedPacket.phg is not None and relatedPacket.markerId == packetDict['marker_id']) :
|
|
packetDict['phg'] = relatedPacket.phg
|
|
|
|
if (packetDict['rng'] is None and packetDict['latest_rng_timestamp'] is not None and packetDict['latest_rng_timestamp'] < packetDict['timestamp']) :
|
|
relatedPacket = self.packetRepository.getObjectByStationIdAndTimestamp(packetDict['station_id'], packetDict['latest_rng_timestamp'])
|
|
if (relatedPacket.rng is not None and relatedPacket.markerId == packetDict['marker_id']) :
|
|
packetDict['rng'] = relatedPacket.rng
|
|
|
|
|
|
def _addStationOgnData(self, packetDict) :
|
|
"""Add OGN data to packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should add the related data
|
|
"""
|
|
if ('ogn' not in packetDict or packetDict['ogn'] is None) :
|
|
station = self.stationRepository.getObjectById(packetDict['station_id'])
|
|
ts = int(packetDict['timestamp']) - (24*60*60)
|
|
if (station.latestOgnPacketTimestamp is not None
|
|
and station.latestOgnPacketTimestamp > ts) :
|
|
packetDict['latest_ogn_packet_timestamp'] = station.latestOgnPacketTimestamp
|
|
|
|
relatedPacketDict = None
|
|
if (station.latestOgnPacketId == packetDict['id']) :
|
|
relatedPacketDict = packetDict
|
|
else :
|
|
relatedPacket = self.packetRepository.getObjectByIdAndTimestamp(station.latestOgnPacketId, station.latestOgnPacketTimestamp)
|
|
if (relatedPacket.isExistingObject()) :
|
|
relatedPacketDict = relatedPacket.getDict()
|
|
|
|
if (relatedPacketDict is not None) :
|
|
if (relatedPacketDict['marker_id'] is not None and relatedPacketDict['marker_id'] == packetDict['marker_id']) :
|
|
packetOgn = self.packetOgnRepository.getObjectByPacketIdAndTimestamp(station.latestOgnPacketId, station.latestOgnPacketTimestamp)
|
|
if (packetOgn.isExistingObject()) :
|
|
packetDict['ogn'] = packetOgn.getDict()
|
|
|
|
|
|
def _addStationOgnDeviceData(self, packetDict) :
|
|
"""Add OGN device data to packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should add the related data
|
|
"""
|
|
station = self.stationRepository.getObjectById(packetDict['station_id'])
|
|
if (station.latestOgnSenderAddress is not None) :
|
|
ognDevice = self.ognDeviceRepository.getObjectByDeviceId(station.latestOgnSenderAddress)
|
|
if (ognDevice.isExistingObject()) :
|
|
packetDict['ogn_device'] = ognDevice.getDict()
|
|
|
|
|
|
def _addStationWeatherData(self, packetDict) :
|
|
"""Add weather data to packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should add the related data
|
|
"""
|
|
if ('weather' not in packetDict or packetDict['weather'] is None) :
|
|
station = self.stationRepository.getObjectById(packetDict['station_id'])
|
|
ts = int(packetDict['timestamp']) - (24*60*60)
|
|
if (station.latestWeatherPacketTimestamp is not None
|
|
and station.latestWeatherPacketTimestamp > ts) :
|
|
packetDict['latest_weather_packet_timestamp'] = station.latestWeatherPacketTimestamp
|
|
|
|
relatedPacketDict = None
|
|
if (station.latestWeatherPacketId == packetDict['id']) :
|
|
relatedPacketDict = packetDict
|
|
else :
|
|
relatedPacket = self.packetRepository.getObjectByIdAndTimestamp(station.latestWeatherPacketId, station.latestWeatherPacketTimestamp)
|
|
if (relatedPacket.isExistingObject()) :
|
|
relatedPacketDict = relatedPacket.getDict()
|
|
|
|
if (relatedPacketDict is not None) :
|
|
if (relatedPacketDict['marker_id'] is not None and relatedPacketDict['marker_id'] == packetDict['marker_id']) :
|
|
packetWeather = self.packetWeatherRepository.getObjectByPacketIdAndTimestamp(station.latestWeatherPacketId, station.latestWeatherPacketTimestamp)
|
|
if (packetWeather.isExistingObject()) :
|
|
packetDict['weather'] = packetWeather.getDict()
|
|
|
|
|
|
def _addStationTelemetryData(self, packetDict) :
|
|
"""Add telemetry data to packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should add the related data
|
|
"""
|
|
if ('telemetry' not in packetDict or packetDict['telemetry'] is None) :
|
|
station = self.stationRepository.getObjectById(packetDict['station_id'])
|
|
ts = int(packetDict['timestamp']) - (24*60*60)
|
|
if (station.latestTelemetryPacketTimestamp is not None
|
|
and station.latestTelemetryPacketTimestamp > ts) :
|
|
packetDict['latest_telemetry_packet_timestamp'] = station.latestTelemetryPacketTimestamp
|
|
|
|
|
|
def _addStationIdPath(self, packetDict) :
|
|
"""Add the station id path to the specified packet
|
|
|
|
Args:
|
|
packetDict (dict): The packet to which we should add the related station id path
|
|
"""
|
|
stationIdPath = []
|
|
stationNamePath = []
|
|
stationLocationPath = []
|
|
|
|
if (packetDict['raw_path'] is not None and "TCPIP*" not in packetDict['raw_path'] and "TCPXX*" not in packetDict['raw_path']) :
|
|
packetDate = datetime.datetime.utcfromtimestamp(int(packetDict['timestamp'])).strftime('%Y%m%d')
|
|
datePacketTable = 'packet' + packetDate
|
|
datePacketPathTable = datePacketTable + '_path'
|
|
|
|
if (self.dbObjectFinder.checkTableExists(datePacketPathTable)) :
|
|
selectCursor = self.db.cursor()
|
|
sql = """select station_id, station.name station_name, latitude, longitude from """ + datePacketPathTable + """, station where station.id = station_id and packet_id = %s order by number""" % (packetDict['id'])
|
|
selectCursor.execute(sql)
|
|
|
|
for record in selectCursor :
|
|
stationIdPath.append(record[0])
|
|
stationNamePath.append(record[1])
|
|
stationLocationPath.append([record[2], record[3]])
|
|
selectCursor.close()
|
|
packetDict['station_id_path'] = stationIdPath
|
|
packetDict['station_name_path'] = stationNamePath
|
|
packetDict['station_location_path'] = stationLocationPath
|
|
|
|
|
|
def getDictListFromPacketList(self, packets) :
|
|
"""Returns a packet dict list from a packet list
|
|
|
|
Args:
|
|
packets (array): Array of Packet instances
|
|
|
|
Returns:
|
|
A array och packet dicts
|
|
"""
|
|
packetDicts = []
|
|
for packet in packets :
|
|
packetDicts.append(packet.getDict())
|
|
return packetDicts
|
|
|
|
|
|
def _extendResponseWithMorePackets(self, packetDicts, flags, iterationCounter) :
|
|
"""Extend the specified array with related packets
|
|
|
|
Args:
|
|
packetDicts (array): An array of the packet response dicts
|
|
flags (array): An array with additional flags (like "realtime", "latest")
|
|
iterationCounter (int): This functionality will call itself to find related packets, this argument is used to remember the number of iterations
|
|
|
|
Returns:
|
|
The modified packet array
|
|
"""
|
|
allPacketDicts = []
|
|
hasSeveralSendersForOneStation = False
|
|
|
|
# Add related packets (stations that the original packets depend on)
|
|
if (packetDicts) :
|
|
relatedStationIds = {}
|
|
for index, packetDict in enumerate(packetDicts) :
|
|
if (packetDict['is_moving'] == 1 or packetDict['packet_order_id'] == 1) :
|
|
# Only fetch related stations for the last stationary packet (in some cases query will return older packets)
|
|
if (packetDict['station_id_path']) :
|
|
# Also add latest packets from stations that has been involved in sending any packets in array "packets"
|
|
for stationId in packetDict['station_id_path'] :
|
|
if (stationId not in self.state.stationsOnMapDict) :
|
|
relatedStationIds[stationId] = True
|
|
|
|
for index, packetDict in enumerate(packetDicts) :
|
|
if (packetDict['station_id'] in relatedStationIds) :
|
|
del relatedStationIds[packetDict['station_id']]
|
|
|
|
if (relatedStationIds) :
|
|
relatedStationPackets = self._getRelatedStationPacketsByStationIds(list(relatedStationIds.keys()))
|
|
|
|
# To avoid infinit loop we mark all related stations as added to map even we we failed doing it
|
|
for relatedStationId in list(relatedStationIds.keys()):
|
|
if (relatedStationId not in self.state.stationsOnMapDict) :
|
|
self.state.stationsOnMapDict[relatedStationId] = True
|
|
|
|
if (relatedStationPackets) :
|
|
if ("latest" in flags) :
|
|
relatedStationPacketDicts = self.getResponseData(relatedStationPackets, None, ["latest", "related"], iterationCounter + 1)
|
|
else :
|
|
relatedStationPacketDicts = self.getResponseData(relatedStationPackets, None, ["related"], iterationCounter + 1)
|
|
allPacketDicts.extend(relatedStationPacketDicts)
|
|
|
|
# Add original packets
|
|
allPacketDicts.extend(packetDicts)
|
|
|
|
#return allPacketDicts.sort(key=lambda x: x['id'], reverse=False)
|
|
return allPacketDicts
|
|
|
|
def _getRelatedStationPacketsByStationIds(self, relatedStationIdList) :
|
|
"""Returns a list of the latest packet for the specified stations, this method should be used to find packets for a packet's related stations
|
|
|
|
Args:
|
|
relatedStationIdList (array): Array of related station id's
|
|
|
|
Returns:
|
|
An array of the latest packet for the specified stations
|
|
"""
|
|
if (relatedStationIdList) :
|
|
query = MostRecentPacketsQuery(self.state, self.db)
|
|
query.enableSimulateEmptyStation()
|
|
return query.getPackets(relatedStationIdList)
|
|
return []
|
|
|