diff --git a/server/trackdirect/websocket/responses/FilterHistoryResponseCreator.py b/server/trackdirect/websocket/responses/FilterHistoryResponseCreator.py new file mode 100644 index 0000000..19fed55 --- /dev/null +++ b/server/trackdirect/websocket/responses/FilterHistoryResponseCreator.py @@ -0,0 +1,84 @@ +import logging +from twisted.python import log + +from math import floor, ceil +import datetime, time +import psycopg2, psycopg2.extras + +from trackdirect.repositories.PacketRepository import PacketRepository + +from trackdirect.websocket.queries.MissingPacketsQuery import MissingPacketsQuery +from trackdirect.websocket.responses.ResponseDataConverter import ResponseDataConverter + +class FilterHistoryResponseCreator(): + """The FilterHistoryResponseCreator class creates history responses for stations that we are filtering on + """ + + + def __init__(self, state, db): + """The __init__ method. + + Args: + state (WebsocketConnectionState): WebsocketConnectionState instance that contains current state + db (psycopg2.Connection): Database connection (with autocommit) + """ + self.state = state + self.logger = logging.getLogger('trackdirect') + + self.db = db + self.responseDataConverter = ResponseDataConverter(state, db) + self.packetRepository = PacketRepository(db) + + + def getResponse(self) : + """Returns a filter history response + + Returns: + Dict + """ + filterStationIds = self.state.filterStationIdDict.keys() + + # When filtering we send everything in the same packet + # We need to do this since we do not send related objects, + # if user is filtering on two related OBJECTS they need to be sent together + packets = self._getPackets(filterStationIds) + + # If map is empty we need to make sure that all specified stations is included + if (self.state.isMapEmpty()) : + query = MissingPacketsQuery(self.state, self.db) + query.enableSimulateEmptyStation() + sortedFoundMissingPackets = query.getMissingPackets(filterStationIds, packets) + sortedFoundMissingPackets.extend(packets) + packets = sortedFoundMissingPackets + + if (packets) : + data = self.responseDataConverter.getResponseData(packets, []) + payload = {'payload_response_type': 2, 'data': data} + return payload + + + def _getPackets(self, stationIds) : + """Returns packets to be used in a filter history response + + Args: + stationIds (array): The station id's that we want history data for + + Returns: + array + """ + minTimestamp = None + if (len(stationIds) > 0) : + minTimestamp = self.state.getStationLatestTimestampOnMap(stationIds[0]) + if (minTimestamp is None) : + minTimestamp = self.state.getMapSectorTimestamp(None) # None as argument is useful even when not dealing with map-sectors + if (len(stationIds) > 1) : + for stationId in stationIds : + timestamp = self.state.getStationLatestTimestampOnMap(stationId) + if (timestamp is not None and timestamp > minTimestamp) : + minTimestamp = timestamp + + if (self.state.latestTimeTravelRequest is not None) : + if (not self.state.isStationsOnMap(stationIds)) : + return self.packetRepository.getObjectListByStationIdListAndTimeInterval(stationIds, minTimestamp, self.state.latestTimeTravelRequest) + else : + return self.packetRepository.getObjectListByStationIdList(stationIds, minTimestamp) diff --git a/server/trackdirect/websocket/responses/FilterResponseCreator.py b/server/trackdirect/websocket/responses/FilterResponseCreator.py new file mode 100644 index 0000000..a26740d --- /dev/null +++ b/server/trackdirect/websocket/responses/FilterResponseCreator.py @@ -0,0 +1,146 @@ +import logging +from twisted.python import log + +from math import floor, ceil +import datetime, time + +import psycopg2, psycopg2.extras + +from trackdirect.repositories.PacketRepository import PacketRepository +from trackdirect.repositories.StationRepository import StationRepository + +from trackdirect.websocket.queries.MostRecentPacketsQuery import MostRecentPacketsQuery +from trackdirect.websocket.responses.ResponseDataConverter import ResponseDataConverter + +from trackdirect.TrackDirectConfig import TrackDirectConfig + +class FilterResponseCreator(): + """The FilterResponseCreator is used to create filter responses, a response sent to client when client wants to filter on a station + """ + + + def __init__(self, state, db): + """The __init__ method. + + Args: + state (WebsocketConnectionState): WebsocketConnectionState instance that contains current state + db (psycopg2.Connection): Database connection (with autocommit) + """ + self.state = state + self.db = db + self.logger = logging.getLogger('trackdirect') + self.responseDataConverter = ResponseDataConverter(state, db) + self.packetRepository = PacketRepository(db) + self.stationRepository = StationRepository(db) + self.config = TrackDirectConfig() + + + def getResponses(self, request) : + """Creates responses related to a filter request + + Args: + request (dict): The request to process + + Returns: + generator + """ + self._updateState(request) + if (self.state.isReset()) : + yield self._getResetResponse() + yield self._getFilterResponse() + + + def _updateState(self, request) : + """Update connection state based on filter request + + Args: + request (dict): The request to process + """ + if (request["payload_request_type"] == 4 and "list" in request) : + if (len(request["list"]) > 0) : + for stationId in request["list"]: + # Only send data about specified objects + self.state.filterStationIdDict[int(stationId)] = True + else : + # User wants to see everything again + self.state.filterStationIdDict = {} + self.state.setLatestMapBounds(0, 0, 0, 0) + self.state.setLatestMinutes(60, None) + self.state.reset() + + elif (request["payload_request_type"] == 6 and "station_id" in request) : + self.state.filterStationIdDict.pop(int(request["station_id"]), None) + self.state.reset() + + elif (request["payload_request_type"] == 8 and "namelist" in request) : + if (len(request["namelist"]) > 0) : + minTimestamp = int(time.time()) - (10*365*24*60*60) + if (not self.config.allowTimeTravel) : + minTimestamp = int(time.time()) - (24*60*60) + + for stationName in request["namelist"]: + # Only send data about specified objects + stations = self.stationRepository.getObjectListByName(stationName, None, None, minTimestamp) + for station in stations: + self.state.filterStationIdDict[int(station.id)] = True + else : + # User wants to see everything again + self.state.filterStationIdDict = {} + self.state.setLatestMapBounds(0, 0, 0, 0) + self.state.setLatestMinutes(60, None) + self.state.reset() + + def _getResetResponse(self) : + """This method creates a reset response + + Returns: + Dict + """ + payload = {'payload_response_type': 40} + return payload + + + def _getFilterResponse(self) : + """This method creates a filter response + + Returns: + Dict + """ + data = [] + if (self.state.filterStationIdDict) : + filterStationIds = list(self.state.filterStationIdDict.keys()) + data = self._getFilterResponseData(filterStationIds) + + payload = {'payload_response_type': 5, 'data': data} + return payload + + + def _getFilterResponseData(self, filterStationIds) : + """Creates data to be included in a filter response + + Args: + filterStationIds (array): An array of all stations we should filter on + """ + if (self.state.latestTimeTravelRequest is not None) : + timestamp = int(self.state.latestTimeTravelRequest) - (int(self.state.latestMinutesRequest)*60) + else : + timestamp = int(time.time()) - (int(self.state.latestMinutesRequest)*60) + + query = MostRecentPacketsQuery(self.state, self.db) + query.enableSimulateEmptyStation() + packets = query.getPackets(filterStationIds) + data = self.responseDataConverter.getResponseData(packets, []) + self.state.reset() # Reset to make sure client will get the same packet on history request + result = [] + for packetData in data : + if (self.config.allowTimeTravel or packetData['timestamp'] > int(time.time()) - (24*60*60)) : + packetData['overwrite'] = 1 + packetData['realtime'] = 0 + packetData['packet_order_id'] = 1 # Last packet for this station in this response + packetData['requested_timestamp'] = timestamp + if packetData['station_id'] in filterStationIds: + packetData['related'] = 0 + else : + packetData['related'] = 1 + result.append(packetData) + return result diff --git a/server/trackdirect/websocket/responses/HistoryResponseCreator.py b/server/trackdirect/websocket/responses/HistoryResponseCreator.py new file mode 100644 index 0000000..71c521b --- /dev/null +++ b/server/trackdirect/websocket/responses/HistoryResponseCreator.py @@ -0,0 +1,235 @@ +import logging +from twisted.python import log + +from math import floor, ceil +import datetime, time +import psycopg2, psycopg2.extras + +from trackdirect.repositories.PacketRepository import PacketRepository + +from trackdirect.websocket.queries.StationIdByMapSectorQuery import StationIdByMapSectorQuery +from trackdirect.websocket.responses.ResponseDataConverter import ResponseDataConverter + + +class HistoryResponseCreator(): + """The HistoryResponseCreator class creates websocket history responses for the latest websocket request + """ + + + def __init__(self, state, db): + """The __init__ method. + + Args: + state (WebsocketConnectionState): WebsocketConnectionState instance that contains current state + db (psycopg2.Connection): Database connection (with autocommit) + """ + self.state = state + self.logger = logging.getLogger('trackdirect') + + self.db = db + self.packetRepository = PacketRepository(db) + self.responseDataConverter = ResponseDataConverter(state, db) + + + def getResponses(self, request, requestId) : + """Create all history responses for the current request + + Args: + request (dict): The request to process + requestId (int): Request id of processed request + + Returns: + generator + """ + if (request["payload_request_type"] == 1 or request["payload_request_type"] == 11) : + if (not self.state.isValidLatestPosition()) : + return + + if (self.state.latestNeLat >= 90 + and self.state.latestNeLng >= 180 + and self.state.latestSwLat <= -90 + and self.state.latestSwLng <= -180) : + # request is requesting to much + return + + for response in self._getMapSectorHistoryResponses(requestId) : + yield response + + elif (request["payload_request_type"] == 7 and "station_id" in request) : + for response in self._getStationHistoryResponses([request["station_id"]], None, True) : + yield response + + else : + self.logger.error('Request is not supported') + self.logger.error(request) + + + def _getMapSectorHistoryResponses(self, requestId) : + """Creates all needed history responses for the currently visible map sectors + + Args: + requestId (int): Request id of processed request + + Returns: + generator + """ + mapSectorArray = self.state.getVisibleMapSectors() + if (len(mapSectorArray) > 20000) : + # Our client will never send a request like this + self.logger.error("To many map sectors requested!") + return + + handledStationIdDict = {} + for mapSector in mapSectorArray : + try: + # Handle one map sector at the time + if (requestId is not None and self.state.latestRequestId > requestId) : + # If new request is recived we want to skip this one (this request is not that important) + # As long as we handle a complete map sector everything is ok + return + + foundStationIds = self._getStationIdsByMapSector(mapSector) + stationIds = [] + for stationId in foundStationIds : + if (stationId not in handledStationIdDict) : + stationIds.append(stationId) + handledStationIdDict[stationId] = True + + if (stationIds) : + for response in self._getStationHistoryResponses(stationIds, mapSector, False) : + yield response + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just exit + raise e + except Exception as e: + self.logger.error(e, exc_info=1) + + + def _getStationHistoryResponses(self, stationIds, mapSector, includeCompleteHistory = False) : + """Creates one history response per station + + Args: + stationIds (array): An array of the stations that we want history data for + mapSector (int): The map sector that we want history data for + includeCompleteHistory (boolean): Include all previous packets (even if we currently only request the latest packets) + + Returns: + generator + """ + # Important to fetch map sector timestamp before loop (may be updated in loop) + minTimestamp = self.state.getMapSectorTimestamp(mapSector) + for stationId in stationIds: + try: + if (self.state.latestTimeTravelRequest is not None) : + response = self._getPastHistoryResponse(stationId, mapSector, minTimestamp, includeCompleteHistory) + else : + response = self._getRecentHistoryResponse(stationId, mapSector, minTimestamp, includeCompleteHistory) + if (response is not None) : + yield response + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just exit + raise e + except Exception as e: + self.logger.error(e, exc_info=1) + + + def _getRecentHistoryResponse(self, stationId, mapSector, minTimestamp, includeCompleteHistory = False) : + """Creates a history response for the specified station, includes all packets from minTimestamp until now + + Args: + stationId (int): The station id that we want history data for + mapSector (int): The map sector that we want history data for + minTimestamp (int): The map sector min timestamp to use in query + includeCompleteHistory (boolean): Include all previous packets (even if we currently only request the latest packets) + + Returns: + Dict + """ + packets = [] + onlyLatestPacketFetched = False + currentStationIds = [stationId] + + currentMinTimestamp = self.state.getStationLatestTimestampOnMap(stationId) + if (currentMinTimestamp is None) : + currentMinTimestamp = minTimestamp + else : + # Since station already exists on map we should continue adding all packets + includeCompleteHistory = True + + if (not self.state.onlyLatestPacketRequested or includeCompleteHistory) : + packets = self.packetRepository.getObjectListByStationIdList(currentStationIds, currentMinTimestamp) + else : + # We could call getMostRecentConfirmedObjectListByStationIdList, would take longer time but would show all positions for a station + packets = self.packetRepository.getLatestConfirmedObjectListByStationIdList(currentStationIds, currentMinTimestamp) + if (packets) : + packets = [packets[-1]] # we only need the latest + onlyLatestPacketFetched = True + + if (packets) : + flags = [] + if (onlyLatestPacketFetched) : + flags = ["latest"] + data = self.responseDataConverter.getResponseData(packets, [mapSector], flags) + payload = {'payload_response_type': 2, 'data': data} + return payload + + + def _getPastHistoryResponse(self, stationId, mapSector, minTimestamp, includeCompleteHistory = False) : + """Creates a history response for the specified station, includes all packets between minTimestamp and the current latestTimeTravelRequest timestamp + + Args: + stationId (int): The station id that we want history data for + mapSector (int): The map sector that we want history data for + minTimestamp (int): The map sector min timestamp to use in query + includeCompleteHistory (boolean): Include all previous packets (even if we currently only request the latest packets) + + Returns: + Dict + """ + packets = [] + onlyLatestPacketFetched = False + currentStationIds = [stationId] + + currentMinTimestamp = self.state.getStationLatestTimestampOnMap(stationId) + if (currentMinTimestamp is None) : + currentMinTimestamp = minTimestamp + + if (self.state.onlyLatestPacketRequested and not includeCompleteHistory) : + if (stationId not in self.state.stationsOnMapDict) : + # we only need to fetch latest packet for a time-travel request if station is not on map + onlyLatestPacketFetched = True + packets = self.packetRepository.getLatestObjectListByStationIdListAndTimeInterval(currentStationIds, currentMinTimestamp, self.state.latestTimeTravelRequest) + else : + if (not self.state.isStationHistoryOnMap(stationId)) : + packets = self.packetRepository.getObjectListByStationIdListAndTimeInterval(currentStationIds, currentMinTimestamp, self.state.latestTimeTravelRequest) + + if (packets) : + flags = [] + if (onlyLatestPacketFetched) : + flags = ["latest"] + data = self.responseDataConverter.getResponseData(packets, [mapSector], flags) + payload = {'payload_response_type': 2, 'data': data} + return payload + + + def _getStationIdsByMapSector(self, mapSector) : + """Returns the station Id's in specified map sector + + Args: + mapSector (int): The map sector that we are interested in + + Returns: + array of ints + """ + query = StationIdByMapSectorQuery(self.db) + if (self.state.latestTimeTravelRequest is not None) : + if (self.state.isMapSectorKnown(mapSector)) : + # This map sector is under control + return [] + else : + startTimestamp = self.state.latestTimeTravelRequest - (int(self.state.latestMinutesRequest)*60) + endTimestamp = self.state.latestTimeTravelRequest; + return query.getStationIdListByMapSector(mapSector, startTimestamp, endTimestamp) + else : + timestamp = self.state.getMapSectorTimestamp(mapSector) + return query.getStationIdListByMapSector(mapSector, timestamp, None) diff --git a/server/trackdirect/websocket/responses/ResponseDataConverter.py b/server/trackdirect/websocket/responses/ResponseDataConverter.py new file mode 100644 index 0000000..9ad3c1a --- /dev/null +++ b/server/trackdirect/websocket/responses/ResponseDataConverter.py @@ -0,0 +1,377 @@ +import logging +from twisted.python import log + +from math import floor, ceil +import datetime, time + +import psycopg2, psycopg2.extras + +from trackdirect.repositories.PacketRepository import PacketRepository +from trackdirect.repositories.StationRepository import StationRepository +from trackdirect.repositories.PacketWeatherRepository import PacketWeatherRepository +from trackdirect.repositories.PacketOgnRepository import PacketOgnRepository +from trackdirect.repositories.OgnDeviceRepository import OgnDeviceRepository + +from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder +from trackdirect.database.DatabaseConnection import DatabaseConnection + +from trackdirect.websocket.queries.MostRecentPacketsQuery import MostRecentPacketsQuery +from trackdirect.websocket.queries.MissingPacketsQuery import MissingPacketsQuery + + +class ResponseDataConverter(): + """An ResponseDataConverter instance is used to create response content data based on packet objects + """ + + + def __init__(self, state, db): + """The __init__ method. + + Args: + state (WebsocketConnectionState): + db (psycopg2.Connection): Database connection (with autocommit) + """ + self.state = state + self.logger = logging.getLogger('trackdirect') + + self.db = db + self.packetRepository = PacketRepository(db) + self.stationRepository = StationRepository(db) + self.packetWeatherRepository = PacketWeatherRepository(db) + self.dbObjectFinder = DatabaseObjectFinder(db) + self.packetOgnRepository = PacketOgnRepository(db) + self.ognDeviceRepository = OgnDeviceRepository(db) + + + def getResponseData(self, packets, mapSectorList = None, flags = [], iterationCounter = 0) : + """Create response data based on specified packets + + Args: + packets (array): An array of the Packet's that should be converted to packet dict responses + mapSectorList (array): An array of the current handled map sectors + flags (array): An array with additional flags (like "realtime", "latest") + iterationCounter (int) This functionality will call itself to find related packets, this argument is used to remember the number of iterations + + Returns: + An array of packet dicts + """ + responseData = [] + for index, packet in enumerate(packets) : + packetDict = packet.getDict(True) + packetDict['packet_order_id'] = self._getPacketOrderId(packets, index, flags) + + self._updateState(packet, mapSectorList, flags) + self._addOverwriteStatus(packetDict) + if (("latest" not in flags and "realtime" not in flags) or self.state.isStationHistoryOnMap(packet.stationId)) : + if (packetDict['packet_order_id'] == 1) : + self._addStationWeatherData(packetDict) + self._addStationTelemetryData(packetDict) + if (packet.sourceId == 5) : + self._addStationOgnData(packetDict) + if (packet.sourceId == 5) : + self._addStationOgnDeviceData(packetDict) + self._addPacketPhgRng(packetDict) + + if ("realtime" not in flags) : + self._addStationIdPath(packetDict) + + self._setFlags(packetDict, flags) + responseData.append(packetDict) + return self._extendResponseWithMorePackets(responseData, flags, iterationCounter) + + + def _getPacketOrderId(self, packets, index, flags) : + """Returns the order id of the packet at specified index + + Args: + packets (array): An array of the Packet's that should be converted to packet dict responses + index (int): Index of the packet that we want an order id for + flags (array): An array with additional flags (like "realtime", "latest") + + Returns: + int + """ + if ("realtime" in flags) : + return 1 + elif (len(packets) -1 == index): + # This is the last packet of all + return 1 # Last packet in response for this marker + elif (packets[index].markerId != packets[index + 1].markerId) : + # This is the last packet for this marker + return 1 # Last packet in response for this marker + elif (index == 0 or packets[index].markerId != packets[index - 1].markerId) : + # This is the first packet for this marker + return 3 # First packet in response for this marker + else : + return 2 # Middle packet in response for this marker + + + def _updateState(self, packet, mapSectorList, flags) : + """Update connection state based on packet on the way to client + + Args: + packet (Packet): The packet that is on the way to client + mapSectorList (array): An array of the current handled map sectors + flags (array): An array with additional flags (like "realtime", "latest") + """ + self.state.setStationLatestTimestamp(packet.stationId, packet.timestamp) + + if (packet.stationId not in self.state.stationsOnMapDict) : + # Station should be added to stationsOnMapDict even if only latest packet is added + self.state.stationsOnMapDict[packet.stationId] = True + + # self.state.setCompleteStationLatestTimestamp + # Note that we depend on that the real-time aprs-is sender make sure to send previous missing packets when a new is sent + if (self.state.isStationHistoryOnMap(packet.stationId)) : + self.state.setCompleteStationLatestTimestamp(packet.stationId, packet.timestamp) + elif ("latest" not in flags and "realtime" not in flags and "related" not in flags) : + self.state.setCompleteStationLatestTimestamp(packet.stationId, packet.timestamp) + elif ("related" in flags and packet.packetTailTimestamp == packet.timestamp) : + self.state.setCompleteStationLatestTimestamp(packet.stationId, packet.timestamp) + + if (mapSectorList and packet.mapSector is not None and packet.mapSector in mapSectorList) : + if "latest" not in flags: + self.state.setMapSectorLatestTimeStamp(packet.mapSector, packet.timestamp) + else : + self.state.setMapSectorLatestOverwriteTimeStamp(packet.mapSector, packet.timestamp) + + + def _setFlags(self, packetDict, flags) : + """Set additional flags that will tell client a bit more about the packet + + Args: + packetDict (dict): The packet to which we should modify + flags (array): An array with additional flags (like "realtime", "latest") + """ + if ("realtime" in flags) : + packetDict["db"] = 0 + packetDict["realtime"] = 1 + else : + packetDict["db"] = 1 + packetDict["realtime"] = 0 + + def _addOverwriteStatus(self, packetDict) : + """Set packet overwrite status + + Args: + packetDict (dict): The packet to which we should modify + """ + packetDict['overwrite'] = 0 + + # We assume that this method is called after the "complete station on map"-state has been updated + if (not self.state.isStationHistoryOnMap(packetDict["station_id"])) : + packetDict['overwrite'] = 1 + + + def _addPacketPhgRng(self, packetDict) : + """Add previous reported phg and rng to the specified packet + + Args: + packetDict (dict): The packet to which we should modify + """ + if ('phg' in packetDict and 'rng' in packetDict) : + if (packetDict['phg'] is None and packetDict['latest_phg_timestamp'] is not None and packetDict['latest_phg_timestamp'] < packetDict['timestamp']) : + relatedPacket = self.packetRepository.getObjectByStationIdAndTimestamp(packetDict['station_id'], packetDict['latest_phg_timestamp']) + if (relatedPacket.phg is not None and relatedPacket.markerId == packetDict['marker_id']) : + packetDict['phg'] = relatedPacket.phg + + if (packetDict['rng'] is None and packetDict['latest_rng_timestamp'] is not None and packetDict['latest_rng_timestamp'] < packetDict['timestamp']) : + relatedPacket = self.packetRepository.getObjectByStationIdAndTimestamp(packetDict['station_id'], packetDict['latest_rng_timestamp']) + if (relatedPacket.rng is not None and relatedPacket.markerId == packetDict['marker_id']) : + packetDict['rng'] = relatedPacket.rng + + + def _addStationOgnData(self, packetDict) : + """Add OGN data to packet + + Args: + packetDict (dict): The packet to which we should add the related data + """ + if ('ogn' not in packetDict or packetDict['ogn'] is None) : + station = self.stationRepository.getObjectById(packetDict['station_id']) + ts = int(packetDict['timestamp']) - (24*60*60) + if (station.latestOgnPacketTimestamp is not None + and station.latestOgnPacketTimestamp > ts) : + packetDict['latest_ogn_packet_timestamp'] = station.latestOgnPacketTimestamp + + relatedPacketDict = None + if (station.latestOgnPacketId == packetDict['id']) : + relatedPacketDict = packetDict + else : + relatedPacket = self.packetRepository.getObjectByIdAndTimestamp(station.latestOgnPacketId, station.latestOgnPacketTimestamp) + if (relatedPacket.isExistingObject()) : + relatedPacketDict = relatedPacket.getDict() + + if (relatedPacketDict is not None) : + if (relatedPacketDict['marker_id'] is not None and relatedPacketDict['marker_id'] == packetDict['marker_id']) : + packetOgn = self.packetOgnRepository.getObjectByPacketIdAndTimestamp(station.latestOgnPacketId, station.latestOgnPacketTimestamp) + if (packetOgn.isExistingObject()) : + packetDict['ogn'] = packetOgn.getDict() + + + def _addStationOgnDeviceData(self, packetDict) : + """Add OGN device data to packet + + Args: + packetDict (dict): The packet to which we should add the related data + """ + station = self.stationRepository.getObjectById(packetDict['station_id']) + if (station.latestOgnSenderAddress is not None) : + ognDevice = self.ognDeviceRepository.getObjectByDeviceId(station.latestOgnSenderAddress) + if (ognDevice.isExistingObject()) : + packetDict['ogn_device'] = ognDevice.getDict() + + + def _addStationWeatherData(self, packetDict) : + """Add weather data to packet + + Args: + packetDict (dict): The packet to which we should add the related data + """ + if ('weather' not in packetDict or packetDict['weather'] is None) : + station = self.stationRepository.getObjectById(packetDict['station_id']) + ts = int(packetDict['timestamp']) - (24*60*60) + if (station.latestWeatherPacketTimestamp is not None + and station.latestWeatherPacketTimestamp > ts) : + packetDict['latest_weather_packet_timestamp'] = station.latestWeatherPacketTimestamp + + relatedPacketDict = None + if (station.latestWeatherPacketId == packetDict['id']) : + relatedPacketDict = packetDict + else : + relatedPacket = self.packetRepository.getObjectByIdAndTimestamp(station.latestWeatherPacketId, station.latestWeatherPacketTimestamp) + if (relatedPacket.isExistingObject()) : + relatedPacketDict = relatedPacket.getDict() + + if (relatedPacketDict is not None) : + if (relatedPacketDict['marker_id'] is not None and relatedPacketDict['marker_id'] == packetDict['marker_id']) : + packetWeather = self.packetWeatherRepository.getObjectByPacketIdAndTimestamp(station.latestWeatherPacketId, station.latestWeatherPacketTimestamp) + if (packetWeather.isExistingObject()) : + packetDict['weather'] = packetWeather.getDict() + + + def _addStationTelemetryData(self, packetDict) : + """Add telemetry data to packet + + Args: + packetDict (dict): The packet to which we should add the related data + """ + if ('telemetry' not in packetDict or packetDict['telemetry'] is None) : + station = self.stationRepository.getObjectById(packetDict['station_id']) + ts = int(packetDict['timestamp']) - (24*60*60) + if (station.latestTelemetryPacketTimestamp is not None + and station.latestTelemetryPacketTimestamp > ts) : + packetDict['latest_telemetry_packet_timestamp'] = station.latestTelemetryPacketTimestamp + + + def _addStationIdPath(self, packetDict) : + """Add the station id path to the specified packet + + Args: + packetDict (dict): The packet to which we should add the related station id path + """ + stationIdPath = [] + stationNamePath = [] + stationLocationPath = [] + + if (packetDict['raw_path'] is not None and "TCPIP*" not in packetDict['raw_path'] and "TCPXX*" not in packetDict['raw_path']) : + packetDate = datetime.datetime.utcfromtimestamp(int(packetDict['timestamp'])).strftime('%Y%m%d') + datePacketTable = 'packet' + packetDate + datePacketPathTable = datePacketTable + '_path' + + if (self.dbObjectFinder.checkTableExists(datePacketPathTable)) : + selectCursor = self.db.cursor() + sql = """select station_id, station.name station_name, latitude, longitude from """ + datePacketPathTable + """, station where station.id = station_id and packet_id = %s order by number""" % (packetDict['id']) + selectCursor.execute(sql) + + for record in selectCursor : + stationIdPath.append(record[0]) + stationNamePath.append(record[1]) + stationLocationPath.append([record[2], record[3]]) + selectCursor.close() + packetDict['station_id_path'] = stationIdPath + packetDict['station_name_path'] = stationNamePath + packetDict['station_location_path'] = stationLocationPath + + + def getDictListFromPacketList(self, packets) : + """Returns a packet dict list from a packet list + + Args: + packets (array): Array of Packet instances + + Returns: + A array och packet dicts + """ + packetDicts = [] + for packet in packets : + packetDicts.append(packet.getDict()) + return packetDicts + + + def _extendResponseWithMorePackets(self, packetDicts, flags, iterationCounter) : + """Extend the specified array with related packets + + Args: + packetDicts (array): An array of the packet response dicts + flags (array): An array with additional flags (like "realtime", "latest") + iterationCounter (int): This functionality will call itself to find related packets, this argument is used to remember the number of iterations + + Returns: + The modified packet array + """ + allPacketDicts = [] + hasSeveralSendersForOneStation = False + + # Add related packets (stations that the original packets depend on) + if (packetDicts) : + relatedStationIds = {} + for index, packetDict in enumerate(packetDicts) : + if (packetDict['is_moving'] == 1 or packetDict['packet_order_id'] == 1) : + # Only fetch related stations for the last stationary packet (in some cases query will return older packets) + if (packetDict['station_id_path']) : + # Also add latest packets from stations that has been involved in sending any packets in array "packets" + for stationId in packetDict['station_id_path'] : + if (stationId not in self.state.stationsOnMapDict) : + relatedStationIds[stationId] = True + + for index, packetDict in enumerate(packetDicts) : + if (packetDict['station_id'] in relatedStationIds) : + del relatedStationIds[packetDict['station_id']] + + if (relatedStationIds) : + relatedStationPackets = self._getRelatedStationPacketsByStationIds(list(relatedStationIds.keys())) + + # To avoid infinit loop we mark all related stations as added to map even we we failed doing it + for relatedStationId in list(relatedStationIds.keys()): + if (relatedStationId not in self.state.stationsOnMapDict) : + self.state.stationsOnMapDict[relatedStationId] = True + + if (relatedStationPackets) : + if ("latest" in flags) : + relatedStationPacketDicts = self.getResponseData(relatedStationPackets, None, ["latest", "related"], iterationCounter + 1) + else : + relatedStationPacketDicts = self.getResponseData(relatedStationPackets, None, ["related"], iterationCounter + 1) + allPacketDicts.extend(relatedStationPacketDicts) + + # Add original packets + allPacketDicts.extend(packetDicts) + + #return allPacketDicts.sort(key=lambda x: x['id'], reverse=False) + return allPacketDicts + + def _getRelatedStationPacketsByStationIds(self, relatedStationIdList) : + """Returns a list of the latest packet for the specified stations, this method should be used to find packets for a packet's related stations + + Args: + relatedStationIdList (array): Array of related station id's + + Returns: + An array of the latest packet for the specified stations + """ + if (relatedStationIdList) : + query = MostRecentPacketsQuery(self.state, self.db) + query.enableSimulateEmptyStation() + return query.getPackets(relatedStationIdList) + return [] + diff --git a/server/trackdirect/websocket/responses/__init__.py b/server/trackdirect/websocket/responses/__init__.py new file mode 100644 index 0000000..984c177 --- /dev/null +++ b/server/trackdirect/websocket/responses/__init__.py @@ -0,0 +1,2 @@ +__version__ = "1.0" +__author__ = "Per Qvarforth"