From c4f414869bcd652225956c71d46354c944834352 Mon Sep 17 00:00:00 2001 From: Batareiken Date: Wed, 28 Feb 2024 23:32:33 +0000 Subject: [PATCH] =?UTF-8?q?=D0=97=D0=B0=D0=B3=D1=80=D1=83=D0=B7=D0=B8?= =?UTF-8?q?=D0=BB=20=D1=84=D0=B0=D0=B9=D0=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/trackdirect/TrackDirectConfig.py | 192 +++++++ .../trackdirect/TrackDirectDataCollector.py | 527 ++++++++++++++++++ .../trackdirect/TrackDirectHeatMapCreator.py | 363 ++++++++++++ .../trackdirect/TrackDirectWebsocketServer.py | 439 +++++++++++++++ server/trackdirect/__init__.py | 7 + 5 files changed, 1528 insertions(+) create mode 100644 server/trackdirect/TrackDirectConfig.py create mode 100644 server/trackdirect/TrackDirectDataCollector.py create mode 100644 server/trackdirect/TrackDirectHeatMapCreator.py create mode 100644 server/trackdirect/TrackDirectWebsocketServer.py create mode 100644 server/trackdirect/__init__.py diff --git a/server/trackdirect/TrackDirectConfig.py b/server/trackdirect/TrackDirectConfig.py new file mode 100644 index 0000000..0ab9c78 --- /dev/null +++ b/server/trackdirect/TrackDirectConfig.py @@ -0,0 +1,192 @@ +import sys +import os +import os.path +import ConfigParser +from ConfigParser import SafeConfigParser + +from trackdirect.common.Singleton import Singleton + + +class TrackDirectConfig(Singleton): + """Track Direct Config class + """ + + def __init__(self): + """The __init__ method. + """ + self.collector = {} + + def populate(self, configFile): + """The __init__ method. + + Args: + configFile (string): Config file name + """ + configParser = SafeConfigParser() + if (configFile.startswith('/')): + configParser.read(os.path.expanduser(configFile)) + else: + configParser.read(os.path.expanduser( + '~/trackdirect/config/' + configFile)) + + # Database + self.dbHostname = configParser.get('database', 'host').strip('"') + self.dbName = configParser.get('database', 'database').strip('"') + try: + self.dbUsername = configParser.get( + 'database', 'username').strip('"') + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.dbUsername = os.getlogin() + self.dbPassword = configParser.get('database', 'password').strip('"') + self.dbPort = int(configParser.get('database', 'port').strip('"')) + self.daysToSavePositionData = int(configParser.get( + 'database', 'days_to_save_position_data').strip('"')) + self.daysToSaveStationData = int(configParser.get( + 'database', 'days_to_save_station_data').strip('"')) + self.daysToSaveWeatherData = int(configParser.get( + 'database', 'days_to_save_weather_data').strip('"')) + self.daysToSaveTelemetryData = int(configParser.get( + 'database', 'days_to_save_telemetry_data').strip('"')) + + self.saveOgnStationsWithMissingIdentity = False + try: + saveOgnStationsWithMissingIdentity = configParser.get( + 'database', 'save_ogn_stations_with_missing_identity').strip('"') + if (saveOgnStationsWithMissingIdentity == "1"): + self.saveOgnStationsWithMissingIdentity = True + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + pass + + # Websocket server + self.websocketHostname = configParser.get( + 'websocket_server', 'host').strip('"') + self.websocketPort = int(configParser.get( + 'websocket_server', 'port').strip('"')) + + self.websocketExternalPort = self.websocketPort + try : + self.websocketExternalPort = int(configParser.get( + 'websocket_server', 'external_port').strip('"')) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + pass + + self.errorLog = configParser.get( + 'websocket_server', 'error_log').strip('"') + self.websocketFrequencyLimit = configParser.get( + 'websocket_server', 'frequency_limit').strip('"') + + self.maxDefaultTime = configParser.get( + 'websocket_server', 'max_default_time').strip('"') + self.maxFilterTime = configParser.get( + 'websocket_server', 'max_filter_time').strip('"') + self.maxClientIdleTime = configParser.get( + 'websocket_server', 'max_client_idle_time').strip('"') + self.maxQueuedRealtimePackets = configParser.get( + 'websocket_server', 'max_queued_realtime_packets').strip('"') + + allowTimeTravel = configParser.get( + 'websocket_server', 'allow_time_travel').strip('"') + self.allowTimeTravel = False + if (allowTimeTravel == "1"): + self.allowTimeTravel = True + + # Websocket server APRS connection (we support 2 different sources, more can be added...) + try: + self.websocketAprsHost1 = configParser.get( + 'websocket_server', 'aprs_host1').strip('"') + self.websocketAprsPort1 = configParser.get( + 'websocket_server', 'aprs_port1').strip('"') + self.websocketAprsSourceId1 = int(configParser.get( + 'websocket_server', 'aprs_source_id1').strip('"')) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.websocketAprsSourceId1 = None + self.websocketAprsHost1 = None + self.websocketAprsPort1 = None + + try: + self.websocketAprsHost2 = configParser.get( + 'websocket_server', 'aprs_host2').strip('"') + self.websocketAprsPort2 = configParser.get( + 'websocket_server', 'aprs_port2').strip('"') + self.websocketAprsSourceId2 = int(configParser.get( + 'websocket_server', 'aprs_source_id2').strip('"')) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.websocketAprsSourceId2 = None + self.websocketAprsHost2 = None + self.websocketAprsPort2 = None + + if (self.websocketAprsSourceId1 == 5 or self.websocketAprsSourceId2 == 5) : + # At least one source is of type OGN, disable display of older data + self.allowTimeTravel = False + if (self.maxDefaultTime > 1440) : + self.maxDefaultTime = 1440 + if (self.maxFilterTime > 1440) : + self.maxDefaultTime = 1440 + + # Collectors + for collectorNumber in range(0, 5): + self.collector[collectorNumber] = {} + try: + self.collector[collectorNumber]['source_id'] = int(configParser.get( + 'collector' + str(collectorNumber), 'source_id').strip('"')) + self.collector[collectorNumber]['host'] = configParser.get( + 'collector' + str(collectorNumber), 'host').strip('"') + self.collector[collectorNumber]['port_full'] = int(configParser.get( + 'collector' + str(collectorNumber), 'port_full').strip('"')) + self.collector[collectorNumber]['port_filtered'] = int(configParser.get( + 'collector' + str(collectorNumber), 'port_filtered').strip('"')) + + self.collector[collectorNumber]['callsign'] = configParser.get( + 'collector' + str(collectorNumber), 'callsign').strip('"') + self.collector[collectorNumber]['passcode'] = configParser.get( + 'collector' + str(collectorNumber), 'passcode').strip('"') + + self.collector[collectorNumber]['numbers_in_batch'] = configParser.get( + 'collector' + str(collectorNumber), 'numbers_in_batch').strip('"') + try: + self.collector[collectorNumber]['frequency_limit'] = configParser.get( + 'collector' + str(collectorNumber), 'frequency_limit').strip('"') + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['frequency_limit'] = "0" + + try: + saveFastPackets = configParser.get( + 'collector' + str(collectorNumber), 'save_fast_packets').strip('"') + self.collector[collectorNumber]['save_fast_packets'] = bool( + int(saveFastPackets)) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['save_fast_packets'] = False + + try: + detectDuplicates = configParser.get( + 'collector' + str(collectorNumber), 'detect_duplicates').strip('"') + self.collector[collectorNumber]['detect_duplicates'] = bool( + int(detectDuplicates)) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['detect_duplicates'] = False + + self.collector[collectorNumber]['error_log'] = configParser.get( + 'collector' + str(collectorNumber), 'error_log').strip('"') + + if (self.websocketAprsSourceId1 == 5 or self.websocketAprsSourceId2 == 5) : + # source is of type OGN, make sure we do not save to many packets (will cause to high load on db) + if (self.collector[collectorNumber]['frequency_limit'] < 10) : + self.collector[collectorNumber]['frequency_limit'] = 10 + self.collector[collectorNumber]['save_fast_packets'] = False + + + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['source_id'] = None + self.collector[collectorNumber]['host'] = None + self.collector[collectorNumber]['port_full'] = None + self.collector[collectorNumber]['port_filtered'] = None + + self.collector[collectorNumber]['callsign'] = None + self.collector[collectorNumber]['passcode'] = None + + self.collector[collectorNumber]['numbers_in_batch'] = "20" + self.collector[collectorNumber]['frequency_limit'] = "0" + self.collector[collectorNumber]['save_fast_packets'] = True + self.collector[collectorNumber]['detect_duplicates'] = False + + self.collector[collectorNumber]['error_log'] = None diff --git a/server/trackdirect/TrackDirectDataCollector.py b/server/trackdirect/TrackDirectDataCollector.py new file mode 100644 index 0000000..fa5d01f --- /dev/null +++ b/server/trackdirect/TrackDirectDataCollector.py @@ -0,0 +1,527 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras +from collections import deque +import json +import re +import aprslib +import datetime +import time +from twisted.internet import reactor, threads + +from trackdirect.parser.AprsPacketParser import AprsPacketParser +from trackdirect.parser.AprsISConnection import AprsISConnection +from trackdirect.parser.policies.PacketDuplicatePolicy import PacketDuplicatePolicy +from trackdirect.collector.PacketBatchInserter import PacketBatchInserter +from trackdirect.exceptions.TrackDirectParseError import TrackDirectParseError +from trackdirect.database.DatabaseConnection import DatabaseConnection +from trackdirect.repositories.StationRepository import StationRepository +from trackdirect.objects.Packet import Packet + +#from pympler.tracker import SummaryTracker + +class TrackDirectDataCollector(): + """An TrackDirectDataCollector instance connects to the data source and saves all received packets to the database + + Note: + The collector class is built to handle ONE connection to a data source server (may be a APRS-IS server), if two is wanted run two processes. + This is useful if you want one connection to the regular APRS-IS network and one connection to the CWOP network. + """ + + def __init__(self, collectorOptions, saveOgnStationsWithMissingIdentity): + """The __init__ method. + + Args: + collectorOptions (dict): Contains data like host, port, callsign, passcode, source id + saveOgnStationsWithMissingIdentity (boolean): True if we should not ignore stationss with a missing identity + """ + self.saveOgnStationsWithMissingIdentity = saveOgnStationsWithMissingIdentity + self.sourceHostname = collectorOptions['host'] + self.sourcePort = collectorOptions['port_full'] + self.numbersInBatch = collectorOptions['numbers_in_batch'] + self.saveFastPackets = collectorOptions['save_fast_packets'] + self.frequencyLimit = collectorOptions['frequency_limit'] + self.detectDuplicates = collectorOptions['detect_duplicates'] + self.hardFrequencyLimit = None + if (not self.saveFastPackets and self.frequencyLimit is not None and int(self.frequencyLimit) > 0): + # Only respect hard frequency limit if we are not saving "fast packets" + self.hardFrequencyLimit = self.frequencyLimit + self.sourceId = collectorOptions['source_id'] + self.callsign = collectorOptions['callsign'] + self.passcode = collectorOptions['passcode'] + + dbConnection = DatabaseConnection() + self.db = dbConnection.getConnection(True) + self.dbNoAutoCommit = dbConnection.getConnection(False) + + self.stationRepository = StationRepository(self.db) + self.logger = logging.getLogger(__name__) + + self.latestPacketTimestamp = None + self.firstPacketTimestamp = None + self.latestBatchInsertTimestamp = int(time.time()) + + self.packets = [] + self.stationIdsWithVisiblePacket = [] + self.movingStationIdsWithVisiblePacket = [] + self.movingMarkerIdsWithVisiblePacket = [] + self.delay = 0 + + def run(self): + """Start the collector + """ + threads.deferToThread(self.consume) + # reactor.suggestThreadPoolSize(20) + reactor.run() + + def consume(self): + """Start consuming packets + """ + + #tracker = SummaryTracker() + + connection = AprsISConnection( + self.callsign, self.passcode, self.sourceHostname, self.sourcePort) + connection.setFrequencyLimit(self.hardFrequencyLimit) + connection.setSourceId(self.sourceId) + + def onPacketRead(line): + if (not reactor.running): + raise StopIteration('Stopped') + + timestamp = int(time.time()) + deferred = threads.deferToThread(self._parse, line, timestamp) + deferred.addCallback(onParseComplete) + deferred.addErrback(onParseError) + + def onParseComplete(packet): + reactor.callFromThread(self._addPacket, packet) + + def onParseError(error): + # Parse will more or less only cast exception if db connection is lost + + # Force restart of collector (we assume that server will be autostarted if stopped) + if reactor.running: + reactor.stop() + raise error + + try: + connection.connect() + connection.filteredConsumer(onPacketRead, True, True) + + #tracker.print_diff() + + except (aprslib.ConnectionDrop) as exp: + # Just reconnect... + self.logger.warning('Lost connection') + self.logger.warning(exp) + self.consume() + + except Exception as e: + self.logger.error(e) + + # Force restart of collector + if reactor.running: + reactor.stop() + + def _parse(self, line, timestamp): + """Parse raw packet + + Args: + line (string): APRS raw packet string + timestamp (int): Receive time of packet + + Returns: + Returns a Packet + """ + try: + self.delay = int(time.time())-timestamp + if (self.delay > 60): + self.logger.error( + 'Collector has a delay on %s seconds, ignoring packets until solved', self.delay) + return None + elif (self.delay > 15): + self.logger.warning( + 'Collector has a delay on %s seconds', self.delay) + + packetDict = aprslib.parse(line) + parser = AprsPacketParser(self.db, self.saveOgnStationsWithMissingIdentity) + parser.setSourceId(self.sourceId) + packet = parser.getPacket(packetDict, timestamp) + + if (packet.mapId == 15 or packet.mapId == 16): + return None + + if (self.detectDuplicates): + self._checkIfDuplicate(packet) + + return self._cleanPacket(packet) + + except (aprslib.ParseError, aprslib.UnknownFormat, TrackDirectParseError) as exp: + return self._parseUnsupportedPacket(line, timestamp) + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just exit + raise e + except (UnicodeDecodeError) as exp: + # just forget about this packet + pass + except Exception as e: + self.logger.error(e, exc_info=1) + return None + + def _parseUnsupportedPacket(self, line, timestamp): + """Try to parse raw packet that aprs-lib could not handle + + Args: + line (string): APRS raw packet string + timestamp (int): Receive time of packet + + Returns: + Returns a Packet + """ + try: + line = line.decode('utf-8', 'ignore') + packetDict = self.basicParse(line) + parser = AprsPacketParser(self.db, self.saveOgnStationsWithMissingIdentity) + parser.setSourceId(self.sourceId) + packet = parser.getPacket(packetDict, timestamp, True) + packet.markerId = 1 + + if (packet.packetTypeId == 6): # Telemetry packet + packet.packetTypeId = 10 # Has no position + else: + packet.mapId = 11 # Unsupported packet + + return packet + except Exception as e: + self.logger.debug(e) + self.logger.debug(line) + return None + + def _addPacket(self, packet): + """Adds packet to database + + Args: + packet (Packet): The packet + """ + if (packet is None): + return + + # Soft frequency limit check + if (self._isStationSendingToFast(packet)): + if (not self.saveFastPackets): + return + + packet.markerId = 1 + packet.mapId = 8 + + # Reset all mapId related values + packet.replacePacketId = None + packet.abnormalPacketId = None + packet.confirmPacketId = None + packet.replacePacketTimestamp = None + packet.abnormalPacketTimestamp = None + packet.confirmPacketTimestamp = None + + if (packet.mapId == 6): + # Packet received in wrong order + if (not self.saveFastPackets): + return + + if (not self._isPacketValidInCurrentBatch(packet)): + self._insertBatch() + + if (self._shouldPacketBeAdded(packet)): + self._addPacketToBatch(packet) + + if (self._isBatchFull()): + self._insertBatch() + + if (self._isBatchOld()): + self._insertBatch() + + def _isStationSendingToFast(self, packet): + """Returns true if this packet has been sent to close to previous packet from the same station (we need to save previous packet first) + + Args: + packet (Packet) : The packet that may have been sent to fast + + Returns: + Boolean + """ + if (packet.mapId in [1, 5, 7, 9] and packet.isMoving == 1 and self.frequencyLimit is not None): + frequencyLimitToApply = int(self.frequencyLimit) + + if (frequencyLimitToApply == 0): + return False + + if (packet.ogn is not None and packet.ogn.ognTurnRate is not None): + turnRate = abs(float(packet.ogn.ognTurnRate)) + if (turnRate > 0) : + frequencyLimitToApply = int(frequencyLimitToApply / (1+turnRate)) + + if ((packet.timestamp - frequencyLimitToApply) < packet.markerPrevPacketTimestamp): + # This station is sending faster than config limit + return True + + if (packet.stationId in self.movingStationIdsWithVisiblePacket): + # This station is sending way to fast (we havn't even added the previous packet to database yet) + return True + + if (packet.markerId in self.movingMarkerIdsWithVisiblePacket): + # The senders of this object is sending way to fast (we havn't even added the previous packet to database yet) + return True + return False + + def _isPacketValidInCurrentBatch(self, packet): + """Returns true if this packet can be added to current batch + + Args: + packet (Packet) : The packet that e want to add to current batch + + Returns: + Boolean + """ + if (self.latestPacketTimestamp is not None): + # If previous packet belongs to another date we can not add packet to current batch + currentPacketDate = datetime.datetime.utcfromtimestamp( + int(packet.timestamp)).strftime('%Y%m%d') + latestPacketDate = datetime.datetime.utcfromtimestamp( + self.latestPacketTimestamp).strftime('%Y%m%d') + + if (currentPacketDate != latestPacketDate and len(self.packets) > 0): + return False + + if (packet.stationId in self.stationIdsWithVisiblePacket): + # We only want to handle one packet per station per batch + return False + + return True + + def _shouldPacketBeAdded(self, packet): + """Returns true if this packet should be added to database + + Args: + packet (Packet) : The packet that we want to add to batch + + Returns: + Boolean + """ + if (packet.sourceId != 3 or packet.stationIdPath): + # We only add pure duplicates to batch if they have a path, otherwise we are not interested + return True + return False + + def _isBatchFull(self): + """Returns true if batch is considered full + + Returns: + Boolean + """ + # If we do insert when we have specified amount of packets (or if more than 5s has passed) + if (int(len(self.packets)) > int(self.numbersInBatch)): + return True + elif (len(self.packets) > 0 and self.latestBatchInsertTimestamp < int(time.time()) - 5): + return True + return False + + def _isBatchOld(self): + """Returns true if batch is considered old + + Returns: + Boolean + """ + if (self.latestPacketTimestamp is not None + and self.firstPacketTimestamp is not None + and self.latestPacketTimestamp - self.firstPacketTimestamp > 1): + return True + return False + + def _addPacketToBatch(self, packet): + """Add instance of ParsedPacket to batch + + Args: + packet (Packet): Packet that we want to add to batch + """ + self.latestPacketTimestamp = int(packet.timestamp) + if (self.firstPacketTimestamp is None): + self.firstPacketTimestamp = int(packet.timestamp) + self.packets.append(packet) + if (packet.mapId in [1, 5, 7, 9]): + self.stationIdsWithVisiblePacket.append(packet.stationId) + if (packet.isMoving == 1): + self.movingStationIdsWithVisiblePacket.append(packet.stationId) + self.movingMarkerIdsWithVisiblePacket.append(packet.markerId) + + def _insertBatch(self): + """Perform insert on the current batch + """ + if (len(self.packets) > 0): + self.latestBatchInsertTimestamp = int(time.time()) + + # Make sure packets is inserted in the order that they where received + self.packets.reverse() + + # Do batch insert + packetBatchInserter = PacketBatchInserter( + self.db, self.dbNoAutoCommit) + packetBatchInserter.insert(self.packets[:], self.sourceId) + + self._reset() + + def _reset(self): + """Reset all collector variables + """ + self.packets = [] + self.stationIdsWithVisiblePacket = [] + self.movingStationIdsWithVisiblePacket = [] + self.movingMarkerIdsWithVisiblePacket = [] + self.latestPacketTimestamp = None + self.firstPacketTimestamp = None + + def _cleanPacket(self, packet): + """Method used to clean a Packet from unused columns + + Args: + packet (Packet): Object of class Packet + + Returns: + Returns a packet (cleaned) + """ + if (packet.mapId not in [1, 5, 7, 9]): + # This packet will never be shown on map, remove information that won't be used (just to save some space in database) + packet.markerId = None + packet.markerCounter = None + packet.packetTailTimestamp = None + packet.positionTimestamp = None + packet.posambiguity = None + packet.symbol = None + packet.symbolTable = None + packet.mapSector = None + packet.relatedMapSectors = None + packet.speed = None + packet.course = None + packet.altitude = None + packet.isMoving = 1 + return packet + + def _checkIfDuplicate(self, packet): + """Method used to check if this packet is a duplicate + + Note: + If packet is a duplicate the object attribute mapId will be updated, and some related attributes. + + Args: + packet (Packet): Object of class Packet + """ + packetDuplicatePolicy = PacketDuplicatePolicy(self.stationRepository) + if (packetDuplicatePolicy.isDuplicate(packet)): + # It is a duplicate (or at least we treat it as one just to be safe) + packet.mapId = 3 + packet.markerId = 1 + packet.replacePacketId = None # No older packet should be replaced!!! + packet.replacePacketTimestamp = None + packet.abnormalPacketId = None # Do not mark previous as abnormal yet + packet.abnormalPacketTimestamp = None + packet.confirmPacketId = None # Do not confirm previous position + packet.confirmPacketTimestamp = None + + def basicParse(self, line): + """Performes a basic packet parse and returnes result as a dict + + Args: + line (string): Packet raw string + + Returns: + Returns packet dict + """ + # Divide into body and head + try: + (head, body) = line.split(':', 1) + except: + raise TrackDirectParseError("no body", {}) + + if len(body) == 0: + raise TrackDirectParseError("body is empty", {}) + + packetType = body[0] + body = body[1:] + + # Find sender, destination and path in header + try: + (fromcall, path) = head.split('>', 1) + except: + raise TrackDirectParseError("no header", {}) + + if (not 1 <= len(fromcall) <= 9): + raise TrackDirectParseError("fromcallsign has invalid length", {}) + + path = path.split(',') + tocall = path[0] + + if len(tocall) == 0: + tocall = None + + path = path[1:] + + for station in path: + if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", station, re.I): + path = None + break + + objectName = '' + if packetType == ';': + match = re.findall(r"^([ -~]{9})(\*|_)", body) + if match: + name, flag = match[0] + objectName = name + body = body[10:] + + if packetType == ')': + match = re.findall(r"^([ -~!]{3,9})(\!|_)", body) + if match: + name, flag = match[0] + objectName = name + body = body[len(name)+1:] + + comment = None + telemetry = None + if packetType == 'T': + telemetry = {} + lst = body.split(',') + if len(lst) >= 7 : + seq = body.split(',')[0] + vals = body.split(',')[1:6] + bits = body.split(',')[6][:8] + comment = body.split(',')[6][8:] + + if seq.startswith('T'): + seq = seq[1:] + if seq.startswith('#'): + seq = seq[1:] + + for i in range(5): + try: + vals[i] = float(vals[i]) if vals[i] != '' else None + except ValueError: + vals[i] = None + + telemetry = { + 'seq': seq, + 'vals': vals, + 'bits': bits + } + + # Create result + packet = { + 'from': fromcall, + 'to': tocall, + 'path': path, + 'raw': line, + 'object_name': objectName, + 'packet_type': packetType, + 'telemetry': telemetry, + 'comment': comment + } + return packet diff --git a/server/trackdirect/TrackDirectHeatMapCreator.py b/server/trackdirect/TrackDirectHeatMapCreator.py new file mode 100644 index 0000000..b94e0cc --- /dev/null +++ b/server/trackdirect/TrackDirectHeatMapCreator.py @@ -0,0 +1,363 @@ +import logging +import heatmap +import random +import psycopg2 +import psycopg2.extras +from math import floor, ceil, sqrt, degrees, asinh, tan, radians, sin, asin, atan, exp +from math import log as math_log +from math import pi as math_pi +import datetime +import time +import image_slicer +from PIL import ImageDraw, ImageFont +import os +import shutil +from twisted.python import log + +from trackdirect.database.DatabaseConnection import DatabaseConnection + + +class TrackDirectHeatMapCreator(): + """The TrackDirectHeatMapCreator class is built to generate heatmaps based on the track direct database data. + + Note: + The heatmaps are divided into several tiles to make them smaller to download and to make the heatmap generator less memory intensive + """ + + def __init__(self, destinationDir): + """The __init__ method. + + Args: + destinationDir (String): Absolute path to the destination directory + """ + self.destinationDir = destinationDir + + dbConnection = DatabaseConnection() + self.db = dbConnection.getConnection(True) + self.db.set_isolation_level(0) + + # We can not create one image for large zoom, heatmap lib has a limit and RAM-usage will be huge + self.minZoomForOneImage = 3 + + # Recommended tile size is 256 (or 512 if main usage is mobile phones with retina display) + self.imageTileSize = 256 + + self.logger = logging.getLogger(__name__) + + def run(self): + """Start creating heatmap images and deploy them when done. + """ + self._createHeatMapImages() + + # Deploy new images + self._renameHeatMapImagesToGoogleMapStandard() + + def _createHeatMapImages(self): + """Start creating heatmap images + """ + # Zoom 0: = 1 + # Zoom 1: 2x2 = 4 + # Zoom 2: 4x4 = 16 + # Zoom 3: 8x8 = 64 (This is the largest that we create as one single image) + # Zoom 4: 16x16 = 256 + # Zoom 5: 32x32 = 1024 + # Zoom 6: 64x64 + # Zoom 7: 128x128 + for zoom in range(0, 8): + if (zoom <= self.minZoomForOneImage): + partsLength = 1 + parts = range(0, partsLength) + imageSize = pow(2, zoom) * self.imageTileSize + numberOfTilesPerImage = pow(2, zoom) * pow(2, zoom) + else: + partsLength = pow(2, zoom-self.minZoomForOneImage) * \ + pow(2, zoom-self.minZoomForOneImage) + parts = range(0, partsLength) + numberOfTilesPerImage = pow( + 2, self.minZoomForOneImage) * pow(2, self.minZoomForOneImage) + imageSize = pow(2, self.minZoomForOneImage) * \ + self.imageTileSize + + # For zoom 0-3 we only have one part + # zoom 4: 4 parts (images) + # zoom 5: 16 parts (images) + # zoom 6: 64 parts (images) + # zoom 7: 256 parts (images) + # all images will alter be splitted into tile of size 256x256 + for part in parts: + # We are not in any hurry so we sleep to lower the effect on other processes (we are on a shared server) + time.sleep(1) + + totalMinLat = float(-85.05115) + totalMaxLat = float(85.05115) + totalMinLng = float(-180) + totalMaxLng = float(180) + + totalMinLatPixel = float( + self._getLatPixelCoordinate(totalMinLat, zoom)) + totalMaxLatPixel = float( + self._getLatPixelCoordinate(totalMaxLat, zoom)) + totalMinLngPixel = float( + self._getLngPixelCoordinate(totalMinLng, zoom)) + totalMaxLngPixel = float( + self._getLngPixelCoordinate(totalMaxLng, zoom)) + + if (zoom <= self.minZoomForOneImage): + minLat = totalMinLat + maxLat = totalMaxLat + minLng = totalMinLng + maxLng = totalMaxLng + + minLatPixel = totalMinLatPixel + maxLatPixel = totalMaxLatPixel + minLngPixel = totalMinLngPixel + maxLngPixel = totalMaxLngPixel + else: + latPartPixelLength = (totalMinLatPixel) / sqrt(partsLength) + lngPartPixelLength = (totalMaxLngPixel) / sqrt(partsLength) + + partRow = floor(part / sqrt(partsLength)) # Starts on 0 + partColumn = part - \ + sqrt(partsLength) * partRow # Starts on 0 + + minLatPixel = (partRow * latPartPixelLength) + \ + latPartPixelLength + maxLatPixel = (partRow * latPartPixelLength) + minLngPixel = totalMinLngPixel + partColumn * lngPartPixelLength + maxLngPixel = totalMinLngPixel + \ + ((partColumn * lngPartPixelLength) + lngPartPixelLength) + + minLat = self._getLatFromLatPixelCoordinate( + minLatPixel, zoom) + maxLat = self._getLatFromLatPixelCoordinate( + maxLatPixel, zoom) + minLng = self._getLngFromLngPixelCoordinate( + minLngPixel, zoom) + maxLng = self._getLngFromLngPixelCoordinate( + maxLngPixel, zoom) + + file = self.destinationDir + "/latest-heatmap." + \ + str(zoom)+"."+str(part)+".png" + + #pts = [(random.uniform(minLat, maxLat), random.uniform(minLng, maxLng)) for i in range(1000)] + pts = self._getPoints(minLat, maxLat, minLng, maxLng, zoom) + + if (len(pts) == 0): + self.logger.info("Skipping file:" + file) + else: + # Create the heatmap! + hm = heatmap.Heatmap() + dotSize = 2*(zoom+1) + opacity = 230 + size = (imageSize, imageSize) + schema = "fire" + + #area = ((0, minLatProj), (maxLngProj*2, maxLatProj)) + area = ((minLngPixel, minLatPixel), + (maxLngPixel, maxLatPixel)) + hm.heatmap(pts, dotSize, opacity, size, + schema, area).save(file) + self.logger.info("Created file:" + file) + + # Split heatmap into tiles + if (numberOfTilesPerImage > 1): + # The tile image filename will be : + # latest-heatmap.zoom.part_row_column.png + # Example for zoom level 4: + # latest-heatmap.4.0_01_01.png -> will be renamed to latest-heatmap.4.0.0.png + # latest-heatmap.4.0_01_02.png + # latest-heatmap.4.0_02_01.png + # latest-heatmap.4.0_02_02.png + # latest-heatmap.4.1_01_01.png + # latest-heatmap.4.1_01_02.png + # latest-heatmap.4.1_02_01.png + # latest-heatmap.4.1_02_02.png + # latest-heatmap.4.2_01_01.png + # latest-heatmap.4.2_01_02.png + # latest-heatmap.4.2_02_01.png + # latest-heatmap.4.2_02_02.png + # latest-heatmap.4.3_01_01.png -> will be renamed to latest-heatmap.4.3.0.png + # latest-heatmap.4.3_01_02.png + # latest-heatmap.4.3_02_01.png + # latest-heatmap.4.3_02_02.png + + # Use this for production + image_slicer.slice(file, numberOfTilesPerImage) + + # Use following code to print something on each tile (to make sure tile is placed correct on map) + #tiles = image_slicer.slice(file, numberOfTilesPerImage, save= False) + # for tile in tiles: + # overlay = ImageDraw.Draw(tile.image) + # overlay.text((5, 5), str(zoom) + ':' +str(part) + ':'+ str(tile.number), (255, 255, 255), ImageFont.load_default()) + #image_slicer.save_tiles(tiles, directory='htdocs/public/heatmaps/', prefix="latest-heatmap."+str(zoom)+"."+str(part)) + + def _renameHeatMapImagesToGoogleMapStandard(self): + """Deploy the new heatmap images (this is done by renaming the created files) + """ + for zoom in range(0, 8): + if (zoom <= self.minZoomForOneImage): + partsLength = 1 + parts = range(0, partsLength) + imageSize = pow(2, zoom) * self.imageTileSize + numberOfTilesPerImage = pow(2, zoom) * pow(2, zoom) + else: + partsLength = pow(2, zoom-self.minZoomForOneImage) * \ + pow(2, zoom-self.minZoomForOneImage) + parts = range(0, partsLength) + numberOfTilesPerImage = pow( + 2, self.minZoomForOneImage) * pow(2, self.minZoomForOneImage) + imageSize = pow(2, self.minZoomForOneImage) * \ + self.imageTileSize + + for part in parts: + if (numberOfTilesPerImage == 1): + # This is only effects the zoom 0 file + os.rename(self.destinationDir + "/latest-heatmap.0.0.png", + self.destinationDir + "/latest-heatmap.0.0.0.png") + + else: + # Lets rename the files to google map standard (or something like it) + for row in range(1, int(sqrt(numberOfTilesPerImage)) + 1): + for column in range(1, int(sqrt(numberOfTilesPerImage)) + 1): + if (row < 10): + oldRowStr = '0'+str(row) + else: + oldRowStr = str(row) + + if (column < 10): + oldColumStr = '0'+str(column) + else: + oldColumStr = str(column) + + # Starts on 0 + partRow = floor(part / sqrt(partsLength)) + partColumn = part - \ + sqrt(partsLength) * partRow # Starts on 0 + newRowStr = str( + int((row - 1) + (partRow * sqrt(numberOfTilesPerImage)))) + newColumStr = str( + int((column - 1) + (partColumn * sqrt(numberOfTilesPerImage)))) # 1-1 + 1*8 + + oldFile = self.destinationDir + "/latest-heatmap." + \ + str(zoom)+"."+str(part)+"_" + \ + str(oldRowStr)+"_"+str(oldColumStr)+".png" + newFile = self.destinationDir + "/latest-heatmap." + \ + str(zoom)+"."+str(newRowStr) + \ + "."+str(newColumStr)+".png" + + if (os.path.exists(oldFile)): + self.logger.info( + "Renaming file : " + oldFile + " -> " + newFile) + os.rename(oldFile, newFile) + else: + oldFile = self.destinationDir + '/transparent.png' + self.logger.info( + "Copy file : " + oldFile + " -> " + newFile) + shutil.copyfile(oldFile, newFile) + + def _getPoints(self, minLat, maxLat, minLng, maxLng, zoom): + """Get latitude, longitude point in the specified map bounds + + Args: + maxLat (float): The max latitude + maxLng (float): The max longitude + minLat (float): The min latitude + minLng (float): The min longitude + + Returns: + Array of points (a point is a latitude, longitude tuple) + """ + result = [] + timestampLimit = int(time.time()) - (60*60*3) + selectCursor = self.db.cursor() + + selectCursor.execute(""" + select latest_confirmed_latitude latitude, latest_confirmed_longitude longitude + from station + where latest_confirmed_packet_timestamp > %s + and latest_confirmed_latitude between %s and %s + and latest_confirmed_longitude between %s and %s""", (timestampLimit, minLat, maxLat, minLng, maxLng,)) + + for record in selectCursor: + if (record != None): + lngProjection = self._getLngPixelCoordinate( + record["longitude"], zoom) + latProjection = self._getLatPixelCoordinate( + record["latitude"], zoom) + result.append((lngProjection, latProjection)) + selectCursor.close() + return result + + def _getLatPixelCoordinate(self, lat, zoom): + """Translate a latitude to a pixel coordinate value for a specified zoom + + Args: + lat (float): The latitude + zoom (int): The zoom + + Returns: + Returns a pixel coordinate value as an int + """ + pixelGlobeSize = self.imageTileSize * pow(2, zoom) + yPixelsToRadiansRatio = pixelGlobeSize / (2 * math_pi) + halfPixelGlobeSize = float(pixelGlobeSize / 2) + pixelGlobeCenterY = halfPixelGlobeSize + degreesToRadiansRatio = 180 / math_pi + siny = sin(lat * math_pi / 180) + + # Truncating to 0.9999 effectively limits latitude to 89.189. This is + # about a third of a tile past the edge of the world tile. + if (siny < -0.9999): + siny = -0.9999 + if (siny > 0.9999): + siny = 0.9999 + latY = round(pixelGlobeCenterY + .5 * + math_log((1 + siny) / (1 - siny)) * -yPixelsToRadiansRatio) + return latY + + def _getLngPixelCoordinate(self, lng, zoom): + """Translate a longitude to a pixel coordinate value for a specified zoom + + Args: + lng (float): The longitude + zoom (int): The zoom + + Returns: + Returns a pixel coordinate value as an int + """ + scale = 1 << zoom + lngX = floor(self.imageTileSize * (0.5 + lng / 360) * scale) + return lngX + + def _getLatFromLatPixelCoordinate(self, latPixelCoord, zoom): + """Translate a pixel coordinate value to a latitude for a specified zoom + + Args: + latPixelCoord (int): The pixel coordinate value + zoom (int): The zoom + + Returns: + Returns a latitude as a float + """ + pixelGlobeSize = self.imageTileSize * pow(2, zoom) + yPixelsToRadiansRatio = pixelGlobeSize / (2 * math_pi) + halfPixelGlobeSize = float(pixelGlobeSize / 2) + pixelGlobeCenterY = halfPixelGlobeSize + degreesToRadiansRatio = 180 / math_pi + lat = (2 * atan(exp((latPixelCoord - pixelGlobeCenterY) / - + yPixelsToRadiansRatio)) - math_pi / 2) * degreesToRadiansRatio + return lat + + def _getLngFromLngPixelCoordinate(self, lngPixelCoord, zoom): + """Translate a pixel coordinate value to a longitude for a specified zoom + + Args: + lngPixelCoord (int): The pixel coordinate value + zoom (int): The zoom + + Returns: + Returns a longitude as a float + """ + scale = 1 << zoom + lng = (((lngPixelCoord / scale) / self.imageTileSize) - 0.5) * 360 + return lng diff --git a/server/trackdirect/TrackDirectWebsocketServer.py b/server/trackdirect/TrackDirectWebsocketServer.py new file mode 100644 index 0000000..9633f7a --- /dev/null +++ b/server/trackdirect/TrackDirectWebsocketServer.py @@ -0,0 +1,439 @@ +import logging + +from twisted.python import log +from twisted.internet import threads, reactor, task +from twisted.internet.error import AlreadyCancelled, AlreadyCalled + +from autobahn.twisted.websocket import WebSocketServerProtocol + +import json +import datetime +import time +import psycopg2 +import psycopg2.extras +import os +import re + +from trackdirect.TrackDirectConfig import TrackDirectConfig + +from trackdirect.database.DatabaseConnection import DatabaseConnection + +from trackdirect.websocket.WebsocketResponseCreator import WebsocketResponseCreator +from trackdirect.websocket.WebsocketConnectionState import WebsocketConnectionState + +from trackdirect.websocket.aprsis.AprsISReader import AprsISReader +from trackdirect.websocket.aprsis.AprsISPayloadCreator import AprsISPayloadCreator + + +class TrackDirectWebsocketServer(WebSocketServerProtocol): + """The TrackDirectWebsocketServer class handles the incoming requests + """ + + def __init__(self): + """The __init__ method. + """ + WebSocketServerProtocol.__init__(self) + self.logger = logging.getLogger('trackdirect') + + self.config = TrackDirectConfig() + self.maxClientIdleTime = int(self.config.maxClientIdleTime) * 60 + self.maxQueuedRealtimePackets = int( + self.config.maxQueuedRealtimePackets) + + dbConnection = DatabaseConnection() + db = dbConnection.getConnection(True) + + self.connectionState = WebsocketConnectionState() + self.responseCreator = WebsocketResponseCreator( + self.connectionState, db) + self.aprsISReader = AprsISReader(self.connectionState, db) + self.aprsISPayloadCreator = AprsISPayloadCreator( + self.connectionState, db) + + self.numberOfRealTimePacketThreads = 0 + self.timestampSenderCall = None + self.realTimeListenerCall = None + self.onInactiveCall = None + self.isUnknownClient = False + + def onConnect(self, request): + """Method that is executed on connect + + Args: + request (object): The connection request + """ + try: + if ('x-forwarded-for' in request.headers): + self.logger.warning("Client connecting from origin: {0}, x-forwarded-for: {1} (server pid {2})".format( + request.origin, request.headers['x-forwarded-for'], str(os.getpid()))) + else: + self.logger.warning( + "Client connecting from origin: {0} (server pid {1})".format(request.origin, str(os.getpid()))) + + except Exception as e: + self.logger.error(e, exc_info=1) + raise e + + def onOpen(self): + """Method that is executed on open + """ + try: + self.logger.info("WebSocket connection open.") + + self._sendResponseByType(42) # Inform client that we are active + self._startTimestampSender() + self._reScheduleInactiveEvent() + except Exception as e: + self.logger.error(e, exc_info=1) + + def onMessage(self, payload, isBinary): + """Method that is executed on incoming message + + Args: + request (object): The connection request + isBinary (boolean): True if binary otherwise false + """ + try: + request = json.loads(payload) + if (self.isUnknownClient): + self.logger.warning( + "Incoming message from unknown client: {0}".format(str(request))) + + if ("payload_request_type" not in request): + self.logger.warning( + "Incoming request has no type (%s)" % (exp)) + self.logger.warning(payload) + return + self._onRequest(request) + except (ValueError) as exp: + self.logger.warning( + "Incoming request could not be parsed (%s)" % (exp)) + self.logger.warning(payload) + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + self.logger.error(e, exc_info=1) + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def onClose(self, wasClean, code, reason): + """Method that is executed on close + + Args: + wasClean (boolean): True if clean close otherwise false + code (int): Close code + reason (object): Reason for close + """ + try: + self.logger.info("WebSocket connection closed: {0}".format(reason)) + self.connectionState.disconnected = True + self._stopTimestampSender() + self._stopRealTimeListener(True) + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def _onRequest(self, request, requestId=None): + """Method that is executed on incoming request + + Args: + request (object): The connection request + requestId (int): Id of the request + """ + if (request["payload_request_type"] != 11): + self._reScheduleInactiveEvent() + + if (request["payload_request_type"] in [5, 7, 9]): + # Request that not affects the current map status (to much) + deferred = threads.deferToThread( + self._processRequest, request, None) + deferred.addErrback(self._onError) + + else: + # Request that affects map and current state + if (requestId is None): + requestId = self.connectionState.latestRequestId + 1 + self.connectionState.latestRequestType = request["payload_request_type"] + self.connectionState.latestRequestId = requestId + self.connectionState.latestRequestTimestamp = int(time.time()) + self._stopRealTimeListener(False) + + if (self.connectionState.latestHandledRequestId < requestId - 1): + reactor.callLater(0.1, self._onRequest, request, requestId) + else: + self._updateState(request) + + deferred = threads.deferToThread( + self._processRequest, request, requestId) + deferred.addErrback(self._onError) + deferred.addCallback(self._onRequestDone) + + def _processRequest(self, request, requestId): + """Method that sends a response to websocket client based on request + + Args: + request (Dict): Request from websocket client + requestId (int): Request id of processed request + """ + try: + for response in self.responseCreator.getResponses(request, requestId): + if self.connectionState.disconnected: + break + reactor.callFromThread(self._sendDictResponse, response) + return requestId + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + self.logger.error(e, exc_info=1) + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def _onRequestDone(self, requestId): + """Method that is executed when request is processed + + Args: + requestId (int): Request id of processed request + """ + try: + if (self.connectionState.latestHandledRequestId < requestId): + self.connectionState.latestHandledRequestId = requestId + if (self.connectionState.latestRequestId == requestId): + # We have no newer requests + # Tell client response is complete + self._sendResponseByType(35) + + if (self.connectionState.latestTimeTravelRequest is None + and self.connectionState.noRealTime is False + and self.connectionState.isValidLatestPosition()): + self._startRealTimeListener(requestId) + + elif ((int(time.time()) - self.connectionState.latestRequestTimestamp) <= self.maxClientIdleTime): + self._sendResponseByType(33) # Tell client we are idle + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def _onError(self, error): + """Method that is executed when a deferToThread failed + + Args: + error (Exception): The Exception + """ + # Exception should only end up here if db connection is lost + # Force restart of wsserver + if reactor.running: + reactor.stop() + raise error + + def _startRealTimeListener(self, relatedRequestId): + """Start real time APRS-IS listener, onRealTimePacketFound will be executed when a packet is received + + Args: + relatedRequestId (int): Request id of related request + """ + def readRealTimePacket(): + if (self.connectionState.latestRequestId == relatedRequestId and not self.connectionState.disconnected): + self.aprsISReader.read(onRealTimePacketFound) + + def onRealTimePacketComplete(): + self.numberOfRealTimePacketThreads -= 1 + if (self.numberOfRealTimePacketThreads <= 0): + # If we have no packets on the way we should see if we have another waiting + readRealTimePacket() + + def onRealTimePacketFound(raw, sourceId): + if (raw is None and sourceId is None): + # Something went wrong, stop everything + self._onInactive() + else: + if (self.numberOfRealTimePacketThreads > self.maxQueuedRealtimePackets): + # To many packets, several previous LoopingCall's is not done yet. + # We need to discard some packets, otherwise server will be overloaded and we will only send old packets. + # Client is required to request total update now and then, so the discarded packets should be send to client later. + + counter = self.aprsISReader.clear(5) + #self.logger.warning('Discarding ' + str(counter) + ' packets') + else: + self.numberOfRealTimePacketThreads += 1 + deferred = threads.deferToThread( + self._processRealTimePacket, raw, sourceId) + deferred.addCallback(lambda _: onRealTimePacketComplete()) + deferred.addErrback(self._onError) + + # Tell client we are connecting to real time feed + self._sendResponseByType(34) + self.aprsISReader.start() # Will start if needed and change filter if needed + # Tell client we are listening on real time feed + self._sendResponseByType(31) + + self.realTimeListenerCall = task.LoopingCall(readRealTimePacket) + self.realTimeListenerCall.start(0.2) + + def _stopRealTimeListener(self, disconnect=False): + """Stop real time APRS-IS listener, onRealTimePacketFound will be executed when a packet is received + + Args: + disconnect (Boolean): Set to true to also disconnect from APRS-IS servers + """ + if (self.realTimeListenerCall is not None): + try: + self.realTimeListenerCall.stop() + except (AlreadyCalled, AssertionError) as e: + pass + + if (disconnect): + self.aprsISReader.stop() + else: + self.aprsISReader.pause() + + def _processRealTimePacket(self, raw, sourceId): + """Method that is executed when we have a new real time packet to send + + Args: + raw (string): Raw packet from APRS-IS + sourceId (int): The id of the source (1 for APRS and 2 for CWOP ...) + """ + try: + for response in self.aprsISPayloadCreator.getPayloads(raw, sourceId): + reactor.callFromThread(self._sendDictResponse, response) + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + self.logger.error(e, exc_info=1) + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def _startTimestampSender(self): + """Method schedules call to _sendTimestampResponse to keep connection up + """ + self.timestampSenderCall = task.LoopingCall( + self._sendTimestampResponse) + self.timestampSenderCall.start(1.0) + + def _stopTimestampSender(self): + """Stop looping call to _sendTimestampResponse + """ + if (self.timestampSenderCall is not None): + try: + self.timestampSenderCall.stop() + except AssertionError as e: + pass + + def _reScheduleInactiveEvent(self): + """Method schedules call to _onInactive when client has been idle too long + + Note: + When _reScheduleInactiveEvent is called any previous schedules will be cancelled and countdown will be reset + """ + if (self.onInactiveCall is not None): + try: + self.onInactiveCall.cancel() + except (AlreadyCalled, AlreadyCancelled) as e: + pass + self.onInactiveCall = reactor.callLater( + self.maxClientIdleTime, self._onInactive) + + def _onInactive(self): + """Method that is executed when client has been inactive too long + """ + try: + # Client is inactive, pause (to save bandwidth, cpu and memory) + self._sendResponseByType(36) + self._stopTimestampSender() + self._stopRealTimeListener(True) + self.connectionState.totalReset() + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + self.logger.error(e, exc_info=1) + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def _sendTimestampResponse(self): + """Send server timestamp to syncronize server and client + + Notes: + This is also used to tell the client that we are still here + Most browser will disconnect if they do not hear anything in 300sec + """ + try: + if (self.connectionState.latestHandledRequestId < self.connectionState.latestRequestId): + # server is busy with request, no point in doing this now + return + data = {} + data["timestamp"] = int(time.time()) + self._sendResponseByType(41, data) + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + self.logger.error(e, exc_info=1) + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.error(e, exc_info=1) + + def _sendResponseByType(self, payloadResponseType, data=None): + """Send specified response to client + + Args: + payloadResponseType (int): A number that specifies what type of response we are sending + data (dict): The response data as a dict + """ + if (data is not None): + payload = { + 'payload_response_type': payloadResponseType, 'data': data} + else: + payload = {'payload_response_type': payloadResponseType} + self._sendDictResponse(payload) + + def _sendDictResponse(self, payload): + """Send message dict payload to client + + Args: + payload (Dict): Response payload + """ + try: + jsonPayload = json.dumps(payload, ensure_ascii=True).encode('utf8') + if (jsonPayload is not None): + self.sendMessage(jsonPayload) + except psycopg2.InterfaceError as e: + # Connection to database is lost, better just terminate connection to make user reconnect with new db connection + self.logger.error(e, exc_info=1) + raise e + except Exception as e: + # Log error to make us aware of unknow problem + self.logger.warning(e, exc_info=1) + + def _updateState(self, request): + """Update the connection state based on request + + Args: + request (Dict): Request form client + """ + if ("neLat" in request + and "neLng" in request + and "swLat" in request + and "swLng" in request + and "minutes" in request): + self.connectionState.setLatestMapBounds( + request["neLat"], request["neLng"], request["swLat"], request["swLng"]) + + if ("onlyLatestPacket" in request): + self.connectionState.setOnlyLatestPacketRequested( + (request["onlyLatestPacket"] == 1)) + + if ("minutes" in request): + if ("time" in request): + self.connectionState.setLatestMinutes( + request["minutes"], request["time"]) + else: + self.connectionState.setLatestMinutes(request["minutes"], None) + + if ("noRealTime" in request): + self.connectionState.disableRealTime() diff --git a/server/trackdirect/__init__.py b/server/trackdirect/__init__.py new file mode 100644 index 0000000..b2521e0 --- /dev/null +++ b/server/trackdirect/__init__.py @@ -0,0 +1,7 @@ +__version__ = "1.0" +__author__ = "Per Qvarforth" + +from TrackDirectDataCollector import * +from TrackDirectWebsocketServer import * +from TrackDirectHeatMapCreator import * +from TrackDirectConfig import *