Загрузил файл
This commit is contained in:
parent
3643f45385
commit
c4f414869b
|
|
@ -0,0 +1,192 @@
|
|||
import sys
|
||||
import os
|
||||
import os.path
|
||||
import ConfigParser
|
||||
from ConfigParser import SafeConfigParser
|
||||
|
||||
from trackdirect.common.Singleton import Singleton
|
||||
|
||||
|
||||
class TrackDirectConfig(Singleton):
|
||||
"""Track Direct Config class
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""The __init__ method.
|
||||
"""
|
||||
self.collector = {}
|
||||
|
||||
def populate(self, configFile):
|
||||
"""The __init__ method.
|
||||
|
||||
Args:
|
||||
configFile (string): Config file name
|
||||
"""
|
||||
configParser = SafeConfigParser()
|
||||
if (configFile.startswith('/')):
|
||||
configParser.read(os.path.expanduser(configFile))
|
||||
else:
|
||||
configParser.read(os.path.expanduser(
|
||||
'~/trackdirect/config/' + configFile))
|
||||
|
||||
# Database
|
||||
self.dbHostname = configParser.get('database', 'host').strip('"')
|
||||
self.dbName = configParser.get('database', 'database').strip('"')
|
||||
try:
|
||||
self.dbUsername = configParser.get(
|
||||
'database', 'username').strip('"')
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.dbUsername = os.getlogin()
|
||||
self.dbPassword = configParser.get('database', 'password').strip('"')
|
||||
self.dbPort = int(configParser.get('database', 'port').strip('"'))
|
||||
self.daysToSavePositionData = int(configParser.get(
|
||||
'database', 'days_to_save_position_data').strip('"'))
|
||||
self.daysToSaveStationData = int(configParser.get(
|
||||
'database', 'days_to_save_station_data').strip('"'))
|
||||
self.daysToSaveWeatherData = int(configParser.get(
|
||||
'database', 'days_to_save_weather_data').strip('"'))
|
||||
self.daysToSaveTelemetryData = int(configParser.get(
|
||||
'database', 'days_to_save_telemetry_data').strip('"'))
|
||||
|
||||
self.saveOgnStationsWithMissingIdentity = False
|
||||
try:
|
||||
saveOgnStationsWithMissingIdentity = configParser.get(
|
||||
'database', 'save_ogn_stations_with_missing_identity').strip('"')
|
||||
if (saveOgnStationsWithMissingIdentity == "1"):
|
||||
self.saveOgnStationsWithMissingIdentity = True
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
pass
|
||||
|
||||
# Websocket server
|
||||
self.websocketHostname = configParser.get(
|
||||
'websocket_server', 'host').strip('"')
|
||||
self.websocketPort = int(configParser.get(
|
||||
'websocket_server', 'port').strip('"'))
|
||||
|
||||
self.websocketExternalPort = self.websocketPort
|
||||
try :
|
||||
self.websocketExternalPort = int(configParser.get(
|
||||
'websocket_server', 'external_port').strip('"'))
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
pass
|
||||
|
||||
self.errorLog = configParser.get(
|
||||
'websocket_server', 'error_log').strip('"')
|
||||
self.websocketFrequencyLimit = configParser.get(
|
||||
'websocket_server', 'frequency_limit').strip('"')
|
||||
|
||||
self.maxDefaultTime = configParser.get(
|
||||
'websocket_server', 'max_default_time').strip('"')
|
||||
self.maxFilterTime = configParser.get(
|
||||
'websocket_server', 'max_filter_time').strip('"')
|
||||
self.maxClientIdleTime = configParser.get(
|
||||
'websocket_server', 'max_client_idle_time').strip('"')
|
||||
self.maxQueuedRealtimePackets = configParser.get(
|
||||
'websocket_server', 'max_queued_realtime_packets').strip('"')
|
||||
|
||||
allowTimeTravel = configParser.get(
|
||||
'websocket_server', 'allow_time_travel').strip('"')
|
||||
self.allowTimeTravel = False
|
||||
if (allowTimeTravel == "1"):
|
||||
self.allowTimeTravel = True
|
||||
|
||||
# Websocket server APRS connection (we support 2 different sources, more can be added...)
|
||||
try:
|
||||
self.websocketAprsHost1 = configParser.get(
|
||||
'websocket_server', 'aprs_host1').strip('"')
|
||||
self.websocketAprsPort1 = configParser.get(
|
||||
'websocket_server', 'aprs_port1').strip('"')
|
||||
self.websocketAprsSourceId1 = int(configParser.get(
|
||||
'websocket_server', 'aprs_source_id1').strip('"'))
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.websocketAprsSourceId1 = None
|
||||
self.websocketAprsHost1 = None
|
||||
self.websocketAprsPort1 = None
|
||||
|
||||
try:
|
||||
self.websocketAprsHost2 = configParser.get(
|
||||
'websocket_server', 'aprs_host2').strip('"')
|
||||
self.websocketAprsPort2 = configParser.get(
|
||||
'websocket_server', 'aprs_port2').strip('"')
|
||||
self.websocketAprsSourceId2 = int(configParser.get(
|
||||
'websocket_server', 'aprs_source_id2').strip('"'))
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.websocketAprsSourceId2 = None
|
||||
self.websocketAprsHost2 = None
|
||||
self.websocketAprsPort2 = None
|
||||
|
||||
if (self.websocketAprsSourceId1 == 5 or self.websocketAprsSourceId2 == 5) :
|
||||
# At least one source is of type OGN, disable display of older data
|
||||
self.allowTimeTravel = False
|
||||
if (self.maxDefaultTime > 1440) :
|
||||
self.maxDefaultTime = 1440
|
||||
if (self.maxFilterTime > 1440) :
|
||||
self.maxDefaultTime = 1440
|
||||
|
||||
# Collectors
|
||||
for collectorNumber in range(0, 5):
|
||||
self.collector[collectorNumber] = {}
|
||||
try:
|
||||
self.collector[collectorNumber]['source_id'] = int(configParser.get(
|
||||
'collector' + str(collectorNumber), 'source_id').strip('"'))
|
||||
self.collector[collectorNumber]['host'] = configParser.get(
|
||||
'collector' + str(collectorNumber), 'host').strip('"')
|
||||
self.collector[collectorNumber]['port_full'] = int(configParser.get(
|
||||
'collector' + str(collectorNumber), 'port_full').strip('"'))
|
||||
self.collector[collectorNumber]['port_filtered'] = int(configParser.get(
|
||||
'collector' + str(collectorNumber), 'port_filtered').strip('"'))
|
||||
|
||||
self.collector[collectorNumber]['callsign'] = configParser.get(
|
||||
'collector' + str(collectorNumber), 'callsign').strip('"')
|
||||
self.collector[collectorNumber]['passcode'] = configParser.get(
|
||||
'collector' + str(collectorNumber), 'passcode').strip('"')
|
||||
|
||||
self.collector[collectorNumber]['numbers_in_batch'] = configParser.get(
|
||||
'collector' + str(collectorNumber), 'numbers_in_batch').strip('"')
|
||||
try:
|
||||
self.collector[collectorNumber]['frequency_limit'] = configParser.get(
|
||||
'collector' + str(collectorNumber), 'frequency_limit').strip('"')
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.collector[collectorNumber]['frequency_limit'] = "0"
|
||||
|
||||
try:
|
||||
saveFastPackets = configParser.get(
|
||||
'collector' + str(collectorNumber), 'save_fast_packets').strip('"')
|
||||
self.collector[collectorNumber]['save_fast_packets'] = bool(
|
||||
int(saveFastPackets))
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.collector[collectorNumber]['save_fast_packets'] = False
|
||||
|
||||
try:
|
||||
detectDuplicates = configParser.get(
|
||||
'collector' + str(collectorNumber), 'detect_duplicates').strip('"')
|
||||
self.collector[collectorNumber]['detect_duplicates'] = bool(
|
||||
int(detectDuplicates))
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.collector[collectorNumber]['detect_duplicates'] = False
|
||||
|
||||
self.collector[collectorNumber]['error_log'] = configParser.get(
|
||||
'collector' + str(collectorNumber), 'error_log').strip('"')
|
||||
|
||||
if (self.websocketAprsSourceId1 == 5 or self.websocketAprsSourceId2 == 5) :
|
||||
# source is of type OGN, make sure we do not save to many packets (will cause to high load on db)
|
||||
if (self.collector[collectorNumber]['frequency_limit'] < 10) :
|
||||
self.collector[collectorNumber]['frequency_limit'] = 10
|
||||
self.collector[collectorNumber]['save_fast_packets'] = False
|
||||
|
||||
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
self.collector[collectorNumber]['source_id'] = None
|
||||
self.collector[collectorNumber]['host'] = None
|
||||
self.collector[collectorNumber]['port_full'] = None
|
||||
self.collector[collectorNumber]['port_filtered'] = None
|
||||
|
||||
self.collector[collectorNumber]['callsign'] = None
|
||||
self.collector[collectorNumber]['passcode'] = None
|
||||
|
||||
self.collector[collectorNumber]['numbers_in_batch'] = "20"
|
||||
self.collector[collectorNumber]['frequency_limit'] = "0"
|
||||
self.collector[collectorNumber]['save_fast_packets'] = True
|
||||
self.collector[collectorNumber]['detect_duplicates'] = False
|
||||
|
||||
self.collector[collectorNumber]['error_log'] = None
|
||||
|
|
@ -0,0 +1,527 @@
|
|||
import logging
|
||||
from twisted.python import log
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
from collections import deque
|
||||
import json
|
||||
import re
|
||||
import aprslib
|
||||
import datetime
|
||||
import time
|
||||
from twisted.internet import reactor, threads
|
||||
|
||||
from trackdirect.parser.AprsPacketParser import AprsPacketParser
|
||||
from trackdirect.parser.AprsISConnection import AprsISConnection
|
||||
from trackdirect.parser.policies.PacketDuplicatePolicy import PacketDuplicatePolicy
|
||||
from trackdirect.collector.PacketBatchInserter import PacketBatchInserter
|
||||
from trackdirect.exceptions.TrackDirectParseError import TrackDirectParseError
|
||||
from trackdirect.database.DatabaseConnection import DatabaseConnection
|
||||
from trackdirect.repositories.StationRepository import StationRepository
|
||||
from trackdirect.objects.Packet import Packet
|
||||
|
||||
#from pympler.tracker import SummaryTracker
|
||||
|
||||
class TrackDirectDataCollector():
|
||||
"""An TrackDirectDataCollector instance connects to the data source and saves all received packets to the database
|
||||
|
||||
Note:
|
||||
The collector class is built to handle ONE connection to a data source server (may be a APRS-IS server), if two is wanted run two processes.
|
||||
This is useful if you want one connection to the regular APRS-IS network and one connection to the CWOP network.
|
||||
"""
|
||||
|
||||
def __init__(self, collectorOptions, saveOgnStationsWithMissingIdentity):
|
||||
"""The __init__ method.
|
||||
|
||||
Args:
|
||||
collectorOptions (dict): Contains data like host, port, callsign, passcode, source id
|
||||
saveOgnStationsWithMissingIdentity (boolean): True if we should not ignore stationss with a missing identity
|
||||
"""
|
||||
self.saveOgnStationsWithMissingIdentity = saveOgnStationsWithMissingIdentity
|
||||
self.sourceHostname = collectorOptions['host']
|
||||
self.sourcePort = collectorOptions['port_full']
|
||||
self.numbersInBatch = collectorOptions['numbers_in_batch']
|
||||
self.saveFastPackets = collectorOptions['save_fast_packets']
|
||||
self.frequencyLimit = collectorOptions['frequency_limit']
|
||||
self.detectDuplicates = collectorOptions['detect_duplicates']
|
||||
self.hardFrequencyLimit = None
|
||||
if (not self.saveFastPackets and self.frequencyLimit is not None and int(self.frequencyLimit) > 0):
|
||||
# Only respect hard frequency limit if we are not saving "fast packets"
|
||||
self.hardFrequencyLimit = self.frequencyLimit
|
||||
self.sourceId = collectorOptions['source_id']
|
||||
self.callsign = collectorOptions['callsign']
|
||||
self.passcode = collectorOptions['passcode']
|
||||
|
||||
dbConnection = DatabaseConnection()
|
||||
self.db = dbConnection.getConnection(True)
|
||||
self.dbNoAutoCommit = dbConnection.getConnection(False)
|
||||
|
||||
self.stationRepository = StationRepository(self.db)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
self.latestPacketTimestamp = None
|
||||
self.firstPacketTimestamp = None
|
||||
self.latestBatchInsertTimestamp = int(time.time())
|
||||
|
||||
self.packets = []
|
||||
self.stationIdsWithVisiblePacket = []
|
||||
self.movingStationIdsWithVisiblePacket = []
|
||||
self.movingMarkerIdsWithVisiblePacket = []
|
||||
self.delay = 0
|
||||
|
||||
def run(self):
|
||||
"""Start the collector
|
||||
"""
|
||||
threads.deferToThread(self.consume)
|
||||
# reactor.suggestThreadPoolSize(20)
|
||||
reactor.run()
|
||||
|
||||
def consume(self):
|
||||
"""Start consuming packets
|
||||
"""
|
||||
|
||||
#tracker = SummaryTracker()
|
||||
|
||||
connection = AprsISConnection(
|
||||
self.callsign, self.passcode, self.sourceHostname, self.sourcePort)
|
||||
connection.setFrequencyLimit(self.hardFrequencyLimit)
|
||||
connection.setSourceId(self.sourceId)
|
||||
|
||||
def onPacketRead(line):
|
||||
if (not reactor.running):
|
||||
raise StopIteration('Stopped')
|
||||
|
||||
timestamp = int(time.time())
|
||||
deferred = threads.deferToThread(self._parse, line, timestamp)
|
||||
deferred.addCallback(onParseComplete)
|
||||
deferred.addErrback(onParseError)
|
||||
|
||||
def onParseComplete(packet):
|
||||
reactor.callFromThread(self._addPacket, packet)
|
||||
|
||||
def onParseError(error):
|
||||
# Parse will more or less only cast exception if db connection is lost
|
||||
|
||||
# Force restart of collector (we assume that server will be autostarted if stopped)
|
||||
if reactor.running:
|
||||
reactor.stop()
|
||||
raise error
|
||||
|
||||
try:
|
||||
connection.connect()
|
||||
connection.filteredConsumer(onPacketRead, True, True)
|
||||
|
||||
#tracker.print_diff()
|
||||
|
||||
except (aprslib.ConnectionDrop) as exp:
|
||||
# Just reconnect...
|
||||
self.logger.warning('Lost connection')
|
||||
self.logger.warning(exp)
|
||||
self.consume()
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(e)
|
||||
|
||||
# Force restart of collector
|
||||
if reactor.running:
|
||||
reactor.stop()
|
||||
|
||||
def _parse(self, line, timestamp):
|
||||
"""Parse raw packet
|
||||
|
||||
Args:
|
||||
line (string): APRS raw packet string
|
||||
timestamp (int): Receive time of packet
|
||||
|
||||
Returns:
|
||||
Returns a Packet
|
||||
"""
|
||||
try:
|
||||
self.delay = int(time.time())-timestamp
|
||||
if (self.delay > 60):
|
||||
self.logger.error(
|
||||
'Collector has a delay on %s seconds, ignoring packets until solved', self.delay)
|
||||
return None
|
||||
elif (self.delay > 15):
|
||||
self.logger.warning(
|
||||
'Collector has a delay on %s seconds', self.delay)
|
||||
|
||||
packetDict = aprslib.parse(line)
|
||||
parser = AprsPacketParser(self.db, self.saveOgnStationsWithMissingIdentity)
|
||||
parser.setSourceId(self.sourceId)
|
||||
packet = parser.getPacket(packetDict, timestamp)
|
||||
|
||||
if (packet.mapId == 15 or packet.mapId == 16):
|
||||
return None
|
||||
|
||||
if (self.detectDuplicates):
|
||||
self._checkIfDuplicate(packet)
|
||||
|
||||
return self._cleanPacket(packet)
|
||||
|
||||
except (aprslib.ParseError, aprslib.UnknownFormat, TrackDirectParseError) as exp:
|
||||
return self._parseUnsupportedPacket(line, timestamp)
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just exit
|
||||
raise e
|
||||
except (UnicodeDecodeError) as exp:
|
||||
# just forget about this packet
|
||||
pass
|
||||
except Exception as e:
|
||||
self.logger.error(e, exc_info=1)
|
||||
return None
|
||||
|
||||
def _parseUnsupportedPacket(self, line, timestamp):
|
||||
"""Try to parse raw packet that aprs-lib could not handle
|
||||
|
||||
Args:
|
||||
line (string): APRS raw packet string
|
||||
timestamp (int): Receive time of packet
|
||||
|
||||
Returns:
|
||||
Returns a Packet
|
||||
"""
|
||||
try:
|
||||
line = line.decode('utf-8', 'ignore')
|
||||
packetDict = self.basicParse(line)
|
||||
parser = AprsPacketParser(self.db, self.saveOgnStationsWithMissingIdentity)
|
||||
parser.setSourceId(self.sourceId)
|
||||
packet = parser.getPacket(packetDict, timestamp, True)
|
||||
packet.markerId = 1
|
||||
|
||||
if (packet.packetTypeId == 6): # Telemetry packet
|
||||
packet.packetTypeId = 10 # Has no position
|
||||
else:
|
||||
packet.mapId = 11 # Unsupported packet
|
||||
|
||||
return packet
|
||||
except Exception as e:
|
||||
self.logger.debug(e)
|
||||
self.logger.debug(line)
|
||||
return None
|
||||
|
||||
def _addPacket(self, packet):
|
||||
"""Adds packet to database
|
||||
|
||||
Args:
|
||||
packet (Packet): The packet
|
||||
"""
|
||||
if (packet is None):
|
||||
return
|
||||
|
||||
# Soft frequency limit check
|
||||
if (self._isStationSendingToFast(packet)):
|
||||
if (not self.saveFastPackets):
|
||||
return
|
||||
|
||||
packet.markerId = 1
|
||||
packet.mapId = 8
|
||||
|
||||
# Reset all mapId related values
|
||||
packet.replacePacketId = None
|
||||
packet.abnormalPacketId = None
|
||||
packet.confirmPacketId = None
|
||||
packet.replacePacketTimestamp = None
|
||||
packet.abnormalPacketTimestamp = None
|
||||
packet.confirmPacketTimestamp = None
|
||||
|
||||
if (packet.mapId == 6):
|
||||
# Packet received in wrong order
|
||||
if (not self.saveFastPackets):
|
||||
return
|
||||
|
||||
if (not self._isPacketValidInCurrentBatch(packet)):
|
||||
self._insertBatch()
|
||||
|
||||
if (self._shouldPacketBeAdded(packet)):
|
||||
self._addPacketToBatch(packet)
|
||||
|
||||
if (self._isBatchFull()):
|
||||
self._insertBatch()
|
||||
|
||||
if (self._isBatchOld()):
|
||||
self._insertBatch()
|
||||
|
||||
def _isStationSendingToFast(self, packet):
|
||||
"""Returns true if this packet has been sent to close to previous packet from the same station (we need to save previous packet first)
|
||||
|
||||
Args:
|
||||
packet (Packet) : The packet that may have been sent to fast
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
"""
|
||||
if (packet.mapId in [1, 5, 7, 9] and packet.isMoving == 1 and self.frequencyLimit is not None):
|
||||
frequencyLimitToApply = int(self.frequencyLimit)
|
||||
|
||||
if (frequencyLimitToApply == 0):
|
||||
return False
|
||||
|
||||
if (packet.ogn is not None and packet.ogn.ognTurnRate is not None):
|
||||
turnRate = abs(float(packet.ogn.ognTurnRate))
|
||||
if (turnRate > 0) :
|
||||
frequencyLimitToApply = int(frequencyLimitToApply / (1+turnRate))
|
||||
|
||||
if ((packet.timestamp - frequencyLimitToApply) < packet.markerPrevPacketTimestamp):
|
||||
# This station is sending faster than config limit
|
||||
return True
|
||||
|
||||
if (packet.stationId in self.movingStationIdsWithVisiblePacket):
|
||||
# This station is sending way to fast (we havn't even added the previous packet to database yet)
|
||||
return True
|
||||
|
||||
if (packet.markerId in self.movingMarkerIdsWithVisiblePacket):
|
||||
# The senders of this object is sending way to fast (we havn't even added the previous packet to database yet)
|
||||
return True
|
||||
return False
|
||||
|
||||
def _isPacketValidInCurrentBatch(self, packet):
|
||||
"""Returns true if this packet can be added to current batch
|
||||
|
||||
Args:
|
||||
packet (Packet) : The packet that e want to add to current batch
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
"""
|
||||
if (self.latestPacketTimestamp is not None):
|
||||
# If previous packet belongs to another date we can not add packet to current batch
|
||||
currentPacketDate = datetime.datetime.utcfromtimestamp(
|
||||
int(packet.timestamp)).strftime('%Y%m%d')
|
||||
latestPacketDate = datetime.datetime.utcfromtimestamp(
|
||||
self.latestPacketTimestamp).strftime('%Y%m%d')
|
||||
|
||||
if (currentPacketDate != latestPacketDate and len(self.packets) > 0):
|
||||
return False
|
||||
|
||||
if (packet.stationId in self.stationIdsWithVisiblePacket):
|
||||
# We only want to handle one packet per station per batch
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _shouldPacketBeAdded(self, packet):
|
||||
"""Returns true if this packet should be added to database
|
||||
|
||||
Args:
|
||||
packet (Packet) : The packet that we want to add to batch
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
"""
|
||||
if (packet.sourceId != 3 or packet.stationIdPath):
|
||||
# We only add pure duplicates to batch if they have a path, otherwise we are not interested
|
||||
return True
|
||||
return False
|
||||
|
||||
def _isBatchFull(self):
|
||||
"""Returns true if batch is considered full
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
"""
|
||||
# If we do insert when we have specified amount of packets (or if more than 5s has passed)
|
||||
if (int(len(self.packets)) > int(self.numbersInBatch)):
|
||||
return True
|
||||
elif (len(self.packets) > 0 and self.latestBatchInsertTimestamp < int(time.time()) - 5):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _isBatchOld(self):
|
||||
"""Returns true if batch is considered old
|
||||
|
||||
Returns:
|
||||
Boolean
|
||||
"""
|
||||
if (self.latestPacketTimestamp is not None
|
||||
and self.firstPacketTimestamp is not None
|
||||
and self.latestPacketTimestamp - self.firstPacketTimestamp > 1):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _addPacketToBatch(self, packet):
|
||||
"""Add instance of ParsedPacket to batch
|
||||
|
||||
Args:
|
||||
packet (Packet): Packet that we want to add to batch
|
||||
"""
|
||||
self.latestPacketTimestamp = int(packet.timestamp)
|
||||
if (self.firstPacketTimestamp is None):
|
||||
self.firstPacketTimestamp = int(packet.timestamp)
|
||||
self.packets.append(packet)
|
||||
if (packet.mapId in [1, 5, 7, 9]):
|
||||
self.stationIdsWithVisiblePacket.append(packet.stationId)
|
||||
if (packet.isMoving == 1):
|
||||
self.movingStationIdsWithVisiblePacket.append(packet.stationId)
|
||||
self.movingMarkerIdsWithVisiblePacket.append(packet.markerId)
|
||||
|
||||
def _insertBatch(self):
|
||||
"""Perform insert on the current batch
|
||||
"""
|
||||
if (len(self.packets) > 0):
|
||||
self.latestBatchInsertTimestamp = int(time.time())
|
||||
|
||||
# Make sure packets is inserted in the order that they where received
|
||||
self.packets.reverse()
|
||||
|
||||
# Do batch insert
|
||||
packetBatchInserter = PacketBatchInserter(
|
||||
self.db, self.dbNoAutoCommit)
|
||||
packetBatchInserter.insert(self.packets[:], self.sourceId)
|
||||
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
"""Reset all collector variables
|
||||
"""
|
||||
self.packets = []
|
||||
self.stationIdsWithVisiblePacket = []
|
||||
self.movingStationIdsWithVisiblePacket = []
|
||||
self.movingMarkerIdsWithVisiblePacket = []
|
||||
self.latestPacketTimestamp = None
|
||||
self.firstPacketTimestamp = None
|
||||
|
||||
def _cleanPacket(self, packet):
|
||||
"""Method used to clean a Packet from unused columns
|
||||
|
||||
Args:
|
||||
packet (Packet): Object of class Packet
|
||||
|
||||
Returns:
|
||||
Returns a packet (cleaned)
|
||||
"""
|
||||
if (packet.mapId not in [1, 5, 7, 9]):
|
||||
# This packet will never be shown on map, remove information that won't be used (just to save some space in database)
|
||||
packet.markerId = None
|
||||
packet.markerCounter = None
|
||||
packet.packetTailTimestamp = None
|
||||
packet.positionTimestamp = None
|
||||
packet.posambiguity = None
|
||||
packet.symbol = None
|
||||
packet.symbolTable = None
|
||||
packet.mapSector = None
|
||||
packet.relatedMapSectors = None
|
||||
packet.speed = None
|
||||
packet.course = None
|
||||
packet.altitude = None
|
||||
packet.isMoving = 1
|
||||
return packet
|
||||
|
||||
def _checkIfDuplicate(self, packet):
|
||||
"""Method used to check if this packet is a duplicate
|
||||
|
||||
Note:
|
||||
If packet is a duplicate the object attribute mapId will be updated, and some related attributes.
|
||||
|
||||
Args:
|
||||
packet (Packet): Object of class Packet
|
||||
"""
|
||||
packetDuplicatePolicy = PacketDuplicatePolicy(self.stationRepository)
|
||||
if (packetDuplicatePolicy.isDuplicate(packet)):
|
||||
# It is a duplicate (or at least we treat it as one just to be safe)
|
||||
packet.mapId = 3
|
||||
packet.markerId = 1
|
||||
packet.replacePacketId = None # No older packet should be replaced!!!
|
||||
packet.replacePacketTimestamp = None
|
||||
packet.abnormalPacketId = None # Do not mark previous as abnormal yet
|
||||
packet.abnormalPacketTimestamp = None
|
||||
packet.confirmPacketId = None # Do not confirm previous position
|
||||
packet.confirmPacketTimestamp = None
|
||||
|
||||
def basicParse(self, line):
|
||||
"""Performes a basic packet parse and returnes result as a dict
|
||||
|
||||
Args:
|
||||
line (string): Packet raw string
|
||||
|
||||
Returns:
|
||||
Returns packet dict
|
||||
"""
|
||||
# Divide into body and head
|
||||
try:
|
||||
(head, body) = line.split(':', 1)
|
||||
except:
|
||||
raise TrackDirectParseError("no body", {})
|
||||
|
||||
if len(body) == 0:
|
||||
raise TrackDirectParseError("body is empty", {})
|
||||
|
||||
packetType = body[0]
|
||||
body = body[1:]
|
||||
|
||||
# Find sender, destination and path in header
|
||||
try:
|
||||
(fromcall, path) = head.split('>', 1)
|
||||
except:
|
||||
raise TrackDirectParseError("no header", {})
|
||||
|
||||
if (not 1 <= len(fromcall) <= 9):
|
||||
raise TrackDirectParseError("fromcallsign has invalid length", {})
|
||||
|
||||
path = path.split(',')
|
||||
tocall = path[0]
|
||||
|
||||
if len(tocall) == 0:
|
||||
tocall = None
|
||||
|
||||
path = path[1:]
|
||||
|
||||
for station in path:
|
||||
if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", station, re.I):
|
||||
path = None
|
||||
break
|
||||
|
||||
objectName = ''
|
||||
if packetType == ';':
|
||||
match = re.findall(r"^([ -~]{9})(\*|_)", body)
|
||||
if match:
|
||||
name, flag = match[0]
|
||||
objectName = name
|
||||
body = body[10:]
|
||||
|
||||
if packetType == ')':
|
||||
match = re.findall(r"^([ -~!]{3,9})(\!|_)", body)
|
||||
if match:
|
||||
name, flag = match[0]
|
||||
objectName = name
|
||||
body = body[len(name)+1:]
|
||||
|
||||
comment = None
|
||||
telemetry = None
|
||||
if packetType == 'T':
|
||||
telemetry = {}
|
||||
lst = body.split(',')
|
||||
if len(lst) >= 7 :
|
||||
seq = body.split(',')[0]
|
||||
vals = body.split(',')[1:6]
|
||||
bits = body.split(',')[6][:8]
|
||||
comment = body.split(',')[6][8:]
|
||||
|
||||
if seq.startswith('T'):
|
||||
seq = seq[1:]
|
||||
if seq.startswith('#'):
|
||||
seq = seq[1:]
|
||||
|
||||
for i in range(5):
|
||||
try:
|
||||
vals[i] = float(vals[i]) if vals[i] != '' else None
|
||||
except ValueError:
|
||||
vals[i] = None
|
||||
|
||||
telemetry = {
|
||||
'seq': seq,
|
||||
'vals': vals,
|
||||
'bits': bits
|
||||
}
|
||||
|
||||
# Create result
|
||||
packet = {
|
||||
'from': fromcall,
|
||||
'to': tocall,
|
||||
'path': path,
|
||||
'raw': line,
|
||||
'object_name': objectName,
|
||||
'packet_type': packetType,
|
||||
'telemetry': telemetry,
|
||||
'comment': comment
|
||||
}
|
||||
return packet
|
||||
|
|
@ -0,0 +1,363 @@
|
|||
import logging
|
||||
import heatmap
|
||||
import random
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
from math import floor, ceil, sqrt, degrees, asinh, tan, radians, sin, asin, atan, exp
|
||||
from math import log as math_log
|
||||
from math import pi as math_pi
|
||||
import datetime
|
||||
import time
|
||||
import image_slicer
|
||||
from PIL import ImageDraw, ImageFont
|
||||
import os
|
||||
import shutil
|
||||
from twisted.python import log
|
||||
|
||||
from trackdirect.database.DatabaseConnection import DatabaseConnection
|
||||
|
||||
|
||||
class TrackDirectHeatMapCreator():
|
||||
"""The TrackDirectHeatMapCreator class is built to generate heatmaps based on the track direct database data.
|
||||
|
||||
Note:
|
||||
The heatmaps are divided into several tiles to make them smaller to download and to make the heatmap generator less memory intensive
|
||||
"""
|
||||
|
||||
def __init__(self, destinationDir):
|
||||
"""The __init__ method.
|
||||
|
||||
Args:
|
||||
destinationDir (String): Absolute path to the destination directory
|
||||
"""
|
||||
self.destinationDir = destinationDir
|
||||
|
||||
dbConnection = DatabaseConnection()
|
||||
self.db = dbConnection.getConnection(True)
|
||||
self.db.set_isolation_level(0)
|
||||
|
||||
# We can not create one image for large zoom, heatmap lib has a limit and RAM-usage will be huge
|
||||
self.minZoomForOneImage = 3
|
||||
|
||||
# Recommended tile size is 256 (or 512 if main usage is mobile phones with retina display)
|
||||
self.imageTileSize = 256
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def run(self):
|
||||
"""Start creating heatmap images and deploy them when done.
|
||||
"""
|
||||
self._createHeatMapImages()
|
||||
|
||||
# Deploy new images
|
||||
self._renameHeatMapImagesToGoogleMapStandard()
|
||||
|
||||
def _createHeatMapImages(self):
|
||||
"""Start creating heatmap images
|
||||
"""
|
||||
# Zoom 0: = 1
|
||||
# Zoom 1: 2x2 = 4
|
||||
# Zoom 2: 4x4 = 16
|
||||
# Zoom 3: 8x8 = 64 (This is the largest that we create as one single image)
|
||||
# Zoom 4: 16x16 = 256
|
||||
# Zoom 5: 32x32 = 1024
|
||||
# Zoom 6: 64x64
|
||||
# Zoom 7: 128x128
|
||||
for zoom in range(0, 8):
|
||||
if (zoom <= self.minZoomForOneImage):
|
||||
partsLength = 1
|
||||
parts = range(0, partsLength)
|
||||
imageSize = pow(2, zoom) * self.imageTileSize
|
||||
numberOfTilesPerImage = pow(2, zoom) * pow(2, zoom)
|
||||
else:
|
||||
partsLength = pow(2, zoom-self.minZoomForOneImage) * \
|
||||
pow(2, zoom-self.minZoomForOneImage)
|
||||
parts = range(0, partsLength)
|
||||
numberOfTilesPerImage = pow(
|
||||
2, self.minZoomForOneImage) * pow(2, self.minZoomForOneImage)
|
||||
imageSize = pow(2, self.minZoomForOneImage) * \
|
||||
self.imageTileSize
|
||||
|
||||
# For zoom 0-3 we only have one part
|
||||
# zoom 4: 4 parts (images)
|
||||
# zoom 5: 16 parts (images)
|
||||
# zoom 6: 64 parts (images)
|
||||
# zoom 7: 256 parts (images)
|
||||
# all images will alter be splitted into tile of size 256x256
|
||||
for part in parts:
|
||||
# We are not in any hurry so we sleep to lower the effect on other processes (we are on a shared server)
|
||||
time.sleep(1)
|
||||
|
||||
totalMinLat = float(-85.05115)
|
||||
totalMaxLat = float(85.05115)
|
||||
totalMinLng = float(-180)
|
||||
totalMaxLng = float(180)
|
||||
|
||||
totalMinLatPixel = float(
|
||||
self._getLatPixelCoordinate(totalMinLat, zoom))
|
||||
totalMaxLatPixel = float(
|
||||
self._getLatPixelCoordinate(totalMaxLat, zoom))
|
||||
totalMinLngPixel = float(
|
||||
self._getLngPixelCoordinate(totalMinLng, zoom))
|
||||
totalMaxLngPixel = float(
|
||||
self._getLngPixelCoordinate(totalMaxLng, zoom))
|
||||
|
||||
if (zoom <= self.minZoomForOneImage):
|
||||
minLat = totalMinLat
|
||||
maxLat = totalMaxLat
|
||||
minLng = totalMinLng
|
||||
maxLng = totalMaxLng
|
||||
|
||||
minLatPixel = totalMinLatPixel
|
||||
maxLatPixel = totalMaxLatPixel
|
||||
minLngPixel = totalMinLngPixel
|
||||
maxLngPixel = totalMaxLngPixel
|
||||
else:
|
||||
latPartPixelLength = (totalMinLatPixel) / sqrt(partsLength)
|
||||
lngPartPixelLength = (totalMaxLngPixel) / sqrt(partsLength)
|
||||
|
||||
partRow = floor(part / sqrt(partsLength)) # Starts on 0
|
||||
partColumn = part - \
|
||||
sqrt(partsLength) * partRow # Starts on 0
|
||||
|
||||
minLatPixel = (partRow * latPartPixelLength) + \
|
||||
latPartPixelLength
|
||||
maxLatPixel = (partRow * latPartPixelLength)
|
||||
minLngPixel = totalMinLngPixel + partColumn * lngPartPixelLength
|
||||
maxLngPixel = totalMinLngPixel + \
|
||||
((partColumn * lngPartPixelLength) + lngPartPixelLength)
|
||||
|
||||
minLat = self._getLatFromLatPixelCoordinate(
|
||||
minLatPixel, zoom)
|
||||
maxLat = self._getLatFromLatPixelCoordinate(
|
||||
maxLatPixel, zoom)
|
||||
minLng = self._getLngFromLngPixelCoordinate(
|
||||
minLngPixel, zoom)
|
||||
maxLng = self._getLngFromLngPixelCoordinate(
|
||||
maxLngPixel, zoom)
|
||||
|
||||
file = self.destinationDir + "/latest-heatmap." + \
|
||||
str(zoom)+"."+str(part)+".png"
|
||||
|
||||
#pts = [(random.uniform(minLat, maxLat), random.uniform(minLng, maxLng)) for i in range(1000)]
|
||||
pts = self._getPoints(minLat, maxLat, minLng, maxLng, zoom)
|
||||
|
||||
if (len(pts) == 0):
|
||||
self.logger.info("Skipping file:" + file)
|
||||
else:
|
||||
# Create the heatmap!
|
||||
hm = heatmap.Heatmap()
|
||||
dotSize = 2*(zoom+1)
|
||||
opacity = 230
|
||||
size = (imageSize, imageSize)
|
||||
schema = "fire"
|
||||
|
||||
#area = ((0, minLatProj), (maxLngProj*2, maxLatProj))
|
||||
area = ((minLngPixel, minLatPixel),
|
||||
(maxLngPixel, maxLatPixel))
|
||||
hm.heatmap(pts, dotSize, opacity, size,
|
||||
schema, area).save(file)
|
||||
self.logger.info("Created file:" + file)
|
||||
|
||||
# Split heatmap into tiles
|
||||
if (numberOfTilesPerImage > 1):
|
||||
# The tile image filename will be :
|
||||
# latest-heatmap.zoom.part_row_column.png
|
||||
# Example for zoom level 4:
|
||||
# latest-heatmap.4.0_01_01.png -> will be renamed to latest-heatmap.4.0.0.png
|
||||
# latest-heatmap.4.0_01_02.png
|
||||
# latest-heatmap.4.0_02_01.png
|
||||
# latest-heatmap.4.0_02_02.png
|
||||
# latest-heatmap.4.1_01_01.png
|
||||
# latest-heatmap.4.1_01_02.png
|
||||
# latest-heatmap.4.1_02_01.png
|
||||
# latest-heatmap.4.1_02_02.png
|
||||
# latest-heatmap.4.2_01_01.png
|
||||
# latest-heatmap.4.2_01_02.png
|
||||
# latest-heatmap.4.2_02_01.png
|
||||
# latest-heatmap.4.2_02_02.png
|
||||
# latest-heatmap.4.3_01_01.png -> will be renamed to latest-heatmap.4.3.0.png
|
||||
# latest-heatmap.4.3_01_02.png
|
||||
# latest-heatmap.4.3_02_01.png
|
||||
# latest-heatmap.4.3_02_02.png
|
||||
|
||||
# Use this for production
|
||||
image_slicer.slice(file, numberOfTilesPerImage)
|
||||
|
||||
# Use following code to print something on each tile (to make sure tile is placed correct on map)
|
||||
#tiles = image_slicer.slice(file, numberOfTilesPerImage, save= False)
|
||||
# for tile in tiles:
|
||||
# overlay = ImageDraw.Draw(tile.image)
|
||||
# overlay.text((5, 5), str(zoom) + ':' +str(part) + ':'+ str(tile.number), (255, 255, 255), ImageFont.load_default())
|
||||
#image_slicer.save_tiles(tiles, directory='htdocs/public/heatmaps/', prefix="latest-heatmap."+str(zoom)+"."+str(part))
|
||||
|
||||
def _renameHeatMapImagesToGoogleMapStandard(self):
|
||||
"""Deploy the new heatmap images (this is done by renaming the created files)
|
||||
"""
|
||||
for zoom in range(0, 8):
|
||||
if (zoom <= self.minZoomForOneImage):
|
||||
partsLength = 1
|
||||
parts = range(0, partsLength)
|
||||
imageSize = pow(2, zoom) * self.imageTileSize
|
||||
numberOfTilesPerImage = pow(2, zoom) * pow(2, zoom)
|
||||
else:
|
||||
partsLength = pow(2, zoom-self.minZoomForOneImage) * \
|
||||
pow(2, zoom-self.minZoomForOneImage)
|
||||
parts = range(0, partsLength)
|
||||
numberOfTilesPerImage = pow(
|
||||
2, self.minZoomForOneImage) * pow(2, self.minZoomForOneImage)
|
||||
imageSize = pow(2, self.minZoomForOneImage) * \
|
||||
self.imageTileSize
|
||||
|
||||
for part in parts:
|
||||
if (numberOfTilesPerImage == 1):
|
||||
# This is only effects the zoom 0 file
|
||||
os.rename(self.destinationDir + "/latest-heatmap.0.0.png",
|
||||
self.destinationDir + "/latest-heatmap.0.0.0.png")
|
||||
|
||||
else:
|
||||
# Lets rename the files to google map standard (or something like it)
|
||||
for row in range(1, int(sqrt(numberOfTilesPerImage)) + 1):
|
||||
for column in range(1, int(sqrt(numberOfTilesPerImage)) + 1):
|
||||
if (row < 10):
|
||||
oldRowStr = '0'+str(row)
|
||||
else:
|
||||
oldRowStr = str(row)
|
||||
|
||||
if (column < 10):
|
||||
oldColumStr = '0'+str(column)
|
||||
else:
|
||||
oldColumStr = str(column)
|
||||
|
||||
# Starts on 0
|
||||
partRow = floor(part / sqrt(partsLength))
|
||||
partColumn = part - \
|
||||
sqrt(partsLength) * partRow # Starts on 0
|
||||
newRowStr = str(
|
||||
int((row - 1) + (partRow * sqrt(numberOfTilesPerImage))))
|
||||
newColumStr = str(
|
||||
int((column - 1) + (partColumn * sqrt(numberOfTilesPerImage)))) # 1-1 + 1*8
|
||||
|
||||
oldFile = self.destinationDir + "/latest-heatmap." + \
|
||||
str(zoom)+"."+str(part)+"_" + \
|
||||
str(oldRowStr)+"_"+str(oldColumStr)+".png"
|
||||
newFile = self.destinationDir + "/latest-heatmap." + \
|
||||
str(zoom)+"."+str(newRowStr) + \
|
||||
"."+str(newColumStr)+".png"
|
||||
|
||||
if (os.path.exists(oldFile)):
|
||||
self.logger.info(
|
||||
"Renaming file : " + oldFile + " -> " + newFile)
|
||||
os.rename(oldFile, newFile)
|
||||
else:
|
||||
oldFile = self.destinationDir + '/transparent.png'
|
||||
self.logger.info(
|
||||
"Copy file : " + oldFile + " -> " + newFile)
|
||||
shutil.copyfile(oldFile, newFile)
|
||||
|
||||
def _getPoints(self, minLat, maxLat, minLng, maxLng, zoom):
|
||||
"""Get latitude, longitude point in the specified map bounds
|
||||
|
||||
Args:
|
||||
maxLat (float): The max latitude
|
||||
maxLng (float): The max longitude
|
||||
minLat (float): The min latitude
|
||||
minLng (float): The min longitude
|
||||
|
||||
Returns:
|
||||
Array of points (a point is a latitude, longitude tuple)
|
||||
"""
|
||||
result = []
|
||||
timestampLimit = int(time.time()) - (60*60*3)
|
||||
selectCursor = self.db.cursor()
|
||||
|
||||
selectCursor.execute("""
|
||||
select latest_confirmed_latitude latitude, latest_confirmed_longitude longitude
|
||||
from station
|
||||
where latest_confirmed_packet_timestamp > %s
|
||||
and latest_confirmed_latitude between %s and %s
|
||||
and latest_confirmed_longitude between %s and %s""", (timestampLimit, minLat, maxLat, minLng, maxLng,))
|
||||
|
||||
for record in selectCursor:
|
||||
if (record != None):
|
||||
lngProjection = self._getLngPixelCoordinate(
|
||||
record["longitude"], zoom)
|
||||
latProjection = self._getLatPixelCoordinate(
|
||||
record["latitude"], zoom)
|
||||
result.append((lngProjection, latProjection))
|
||||
selectCursor.close()
|
||||
return result
|
||||
|
||||
def _getLatPixelCoordinate(self, lat, zoom):
|
||||
"""Translate a latitude to a pixel coordinate value for a specified zoom
|
||||
|
||||
Args:
|
||||
lat (float): The latitude
|
||||
zoom (int): The zoom
|
||||
|
||||
Returns:
|
||||
Returns a pixel coordinate value as an int
|
||||
"""
|
||||
pixelGlobeSize = self.imageTileSize * pow(2, zoom)
|
||||
yPixelsToRadiansRatio = pixelGlobeSize / (2 * math_pi)
|
||||
halfPixelGlobeSize = float(pixelGlobeSize / 2)
|
||||
pixelGlobeCenterY = halfPixelGlobeSize
|
||||
degreesToRadiansRatio = 180 / math_pi
|
||||
siny = sin(lat * math_pi / 180)
|
||||
|
||||
# Truncating to 0.9999 effectively limits latitude to 89.189. This is
|
||||
# about a third of a tile past the edge of the world tile.
|
||||
if (siny < -0.9999):
|
||||
siny = -0.9999
|
||||
if (siny > 0.9999):
|
||||
siny = 0.9999
|
||||
latY = round(pixelGlobeCenterY + .5 *
|
||||
math_log((1 + siny) / (1 - siny)) * -yPixelsToRadiansRatio)
|
||||
return latY
|
||||
|
||||
def _getLngPixelCoordinate(self, lng, zoom):
|
||||
"""Translate a longitude to a pixel coordinate value for a specified zoom
|
||||
|
||||
Args:
|
||||
lng (float): The longitude
|
||||
zoom (int): The zoom
|
||||
|
||||
Returns:
|
||||
Returns a pixel coordinate value as an int
|
||||
"""
|
||||
scale = 1 << zoom
|
||||
lngX = floor(self.imageTileSize * (0.5 + lng / 360) * scale)
|
||||
return lngX
|
||||
|
||||
def _getLatFromLatPixelCoordinate(self, latPixelCoord, zoom):
|
||||
"""Translate a pixel coordinate value to a latitude for a specified zoom
|
||||
|
||||
Args:
|
||||
latPixelCoord (int): The pixel coordinate value
|
||||
zoom (int): The zoom
|
||||
|
||||
Returns:
|
||||
Returns a latitude as a float
|
||||
"""
|
||||
pixelGlobeSize = self.imageTileSize * pow(2, zoom)
|
||||
yPixelsToRadiansRatio = pixelGlobeSize / (2 * math_pi)
|
||||
halfPixelGlobeSize = float(pixelGlobeSize / 2)
|
||||
pixelGlobeCenterY = halfPixelGlobeSize
|
||||
degreesToRadiansRatio = 180 / math_pi
|
||||
lat = (2 * atan(exp((latPixelCoord - pixelGlobeCenterY) / -
|
||||
yPixelsToRadiansRatio)) - math_pi / 2) * degreesToRadiansRatio
|
||||
return lat
|
||||
|
||||
def _getLngFromLngPixelCoordinate(self, lngPixelCoord, zoom):
|
||||
"""Translate a pixel coordinate value to a longitude for a specified zoom
|
||||
|
||||
Args:
|
||||
lngPixelCoord (int): The pixel coordinate value
|
||||
zoom (int): The zoom
|
||||
|
||||
Returns:
|
||||
Returns a longitude as a float
|
||||
"""
|
||||
scale = 1 << zoom
|
||||
lng = (((lngPixelCoord / scale) / self.imageTileSize) - 0.5) * 360
|
||||
return lng
|
||||
|
|
@ -0,0 +1,439 @@
|
|||
import logging
|
||||
|
||||
from twisted.python import log
|
||||
from twisted.internet import threads, reactor, task
|
||||
from twisted.internet.error import AlreadyCancelled, AlreadyCalled
|
||||
|
||||
from autobahn.twisted.websocket import WebSocketServerProtocol
|
||||
|
||||
import json
|
||||
import datetime
|
||||
import time
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import os
|
||||
import re
|
||||
|
||||
from trackdirect.TrackDirectConfig import TrackDirectConfig
|
||||
|
||||
from trackdirect.database.DatabaseConnection import DatabaseConnection
|
||||
|
||||
from trackdirect.websocket.WebsocketResponseCreator import WebsocketResponseCreator
|
||||
from trackdirect.websocket.WebsocketConnectionState import WebsocketConnectionState
|
||||
|
||||
from trackdirect.websocket.aprsis.AprsISReader import AprsISReader
|
||||
from trackdirect.websocket.aprsis.AprsISPayloadCreator import AprsISPayloadCreator
|
||||
|
||||
|
||||
class TrackDirectWebsocketServer(WebSocketServerProtocol):
|
||||
"""The TrackDirectWebsocketServer class handles the incoming requests
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""The __init__ method.
|
||||
"""
|
||||
WebSocketServerProtocol.__init__(self)
|
||||
self.logger = logging.getLogger('trackdirect')
|
||||
|
||||
self.config = TrackDirectConfig()
|
||||
self.maxClientIdleTime = int(self.config.maxClientIdleTime) * 60
|
||||
self.maxQueuedRealtimePackets = int(
|
||||
self.config.maxQueuedRealtimePackets)
|
||||
|
||||
dbConnection = DatabaseConnection()
|
||||
db = dbConnection.getConnection(True)
|
||||
|
||||
self.connectionState = WebsocketConnectionState()
|
||||
self.responseCreator = WebsocketResponseCreator(
|
||||
self.connectionState, db)
|
||||
self.aprsISReader = AprsISReader(self.connectionState, db)
|
||||
self.aprsISPayloadCreator = AprsISPayloadCreator(
|
||||
self.connectionState, db)
|
||||
|
||||
self.numberOfRealTimePacketThreads = 0
|
||||
self.timestampSenderCall = None
|
||||
self.realTimeListenerCall = None
|
||||
self.onInactiveCall = None
|
||||
self.isUnknownClient = False
|
||||
|
||||
def onConnect(self, request):
|
||||
"""Method that is executed on connect
|
||||
|
||||
Args:
|
||||
request (object): The connection request
|
||||
"""
|
||||
try:
|
||||
if ('x-forwarded-for' in request.headers):
|
||||
self.logger.warning("Client connecting from origin: {0}, x-forwarded-for: {1} (server pid {2})".format(
|
||||
request.origin, request.headers['x-forwarded-for'], str(os.getpid())))
|
||||
else:
|
||||
self.logger.warning(
|
||||
"Client connecting from origin: {0} (server pid {1})".format(request.origin, str(os.getpid())))
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
|
||||
def onOpen(self):
|
||||
"""Method that is executed on open
|
||||
"""
|
||||
try:
|
||||
self.logger.info("WebSocket connection open.")
|
||||
|
||||
self._sendResponseByType(42) # Inform client that we are active
|
||||
self._startTimestampSender()
|
||||
self._reScheduleInactiveEvent()
|
||||
except Exception as e:
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def onMessage(self, payload, isBinary):
|
||||
"""Method that is executed on incoming message
|
||||
|
||||
Args:
|
||||
request (object): The connection request
|
||||
isBinary (boolean): True if binary otherwise false
|
||||
"""
|
||||
try:
|
||||
request = json.loads(payload)
|
||||
if (self.isUnknownClient):
|
||||
self.logger.warning(
|
||||
"Incoming message from unknown client: {0}".format(str(request)))
|
||||
|
||||
if ("payload_request_type" not in request):
|
||||
self.logger.warning(
|
||||
"Incoming request has no type (%s)" % (exp))
|
||||
self.logger.warning(payload)
|
||||
return
|
||||
self._onRequest(request)
|
||||
except (ValueError) as exp:
|
||||
self.logger.warning(
|
||||
"Incoming request could not be parsed (%s)" % (exp))
|
||||
self.logger.warning(payload)
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def onClose(self, wasClean, code, reason):
|
||||
"""Method that is executed on close
|
||||
|
||||
Args:
|
||||
wasClean (boolean): True if clean close otherwise false
|
||||
code (int): Close code
|
||||
reason (object): Reason for close
|
||||
"""
|
||||
try:
|
||||
self.logger.info("WebSocket connection closed: {0}".format(reason))
|
||||
self.connectionState.disconnected = True
|
||||
self._stopTimestampSender()
|
||||
self._stopRealTimeListener(True)
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def _onRequest(self, request, requestId=None):
|
||||
"""Method that is executed on incoming request
|
||||
|
||||
Args:
|
||||
request (object): The connection request
|
||||
requestId (int): Id of the request
|
||||
"""
|
||||
if (request["payload_request_type"] != 11):
|
||||
self._reScheduleInactiveEvent()
|
||||
|
||||
if (request["payload_request_type"] in [5, 7, 9]):
|
||||
# Request that not affects the current map status (to much)
|
||||
deferred = threads.deferToThread(
|
||||
self._processRequest, request, None)
|
||||
deferred.addErrback(self._onError)
|
||||
|
||||
else:
|
||||
# Request that affects map and current state
|
||||
if (requestId is None):
|
||||
requestId = self.connectionState.latestRequestId + 1
|
||||
self.connectionState.latestRequestType = request["payload_request_type"]
|
||||
self.connectionState.latestRequestId = requestId
|
||||
self.connectionState.latestRequestTimestamp = int(time.time())
|
||||
self._stopRealTimeListener(False)
|
||||
|
||||
if (self.connectionState.latestHandledRequestId < requestId - 1):
|
||||
reactor.callLater(0.1, self._onRequest, request, requestId)
|
||||
else:
|
||||
self._updateState(request)
|
||||
|
||||
deferred = threads.deferToThread(
|
||||
self._processRequest, request, requestId)
|
||||
deferred.addErrback(self._onError)
|
||||
deferred.addCallback(self._onRequestDone)
|
||||
|
||||
def _processRequest(self, request, requestId):
|
||||
"""Method that sends a response to websocket client based on request
|
||||
|
||||
Args:
|
||||
request (Dict): Request from websocket client
|
||||
requestId (int): Request id of processed request
|
||||
"""
|
||||
try:
|
||||
for response in self.responseCreator.getResponses(request, requestId):
|
||||
if self.connectionState.disconnected:
|
||||
break
|
||||
reactor.callFromThread(self._sendDictResponse, response)
|
||||
return requestId
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def _onRequestDone(self, requestId):
|
||||
"""Method that is executed when request is processed
|
||||
|
||||
Args:
|
||||
requestId (int): Request id of processed request
|
||||
"""
|
||||
try:
|
||||
if (self.connectionState.latestHandledRequestId < requestId):
|
||||
self.connectionState.latestHandledRequestId = requestId
|
||||
if (self.connectionState.latestRequestId == requestId):
|
||||
# We have no newer requests
|
||||
# Tell client response is complete
|
||||
self._sendResponseByType(35)
|
||||
|
||||
if (self.connectionState.latestTimeTravelRequest is None
|
||||
and self.connectionState.noRealTime is False
|
||||
and self.connectionState.isValidLatestPosition()):
|
||||
self._startRealTimeListener(requestId)
|
||||
|
||||
elif ((int(time.time()) - self.connectionState.latestRequestTimestamp) <= self.maxClientIdleTime):
|
||||
self._sendResponseByType(33) # Tell client we are idle
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def _onError(self, error):
|
||||
"""Method that is executed when a deferToThread failed
|
||||
|
||||
Args:
|
||||
error (Exception): The Exception
|
||||
"""
|
||||
# Exception should only end up here if db connection is lost
|
||||
# Force restart of wsserver
|
||||
if reactor.running:
|
||||
reactor.stop()
|
||||
raise error
|
||||
|
||||
def _startRealTimeListener(self, relatedRequestId):
|
||||
"""Start real time APRS-IS listener, onRealTimePacketFound will be executed when a packet is received
|
||||
|
||||
Args:
|
||||
relatedRequestId (int): Request id of related request
|
||||
"""
|
||||
def readRealTimePacket():
|
||||
if (self.connectionState.latestRequestId == relatedRequestId and not self.connectionState.disconnected):
|
||||
self.aprsISReader.read(onRealTimePacketFound)
|
||||
|
||||
def onRealTimePacketComplete():
|
||||
self.numberOfRealTimePacketThreads -= 1
|
||||
if (self.numberOfRealTimePacketThreads <= 0):
|
||||
# If we have no packets on the way we should see if we have another waiting
|
||||
readRealTimePacket()
|
||||
|
||||
def onRealTimePacketFound(raw, sourceId):
|
||||
if (raw is None and sourceId is None):
|
||||
# Something went wrong, stop everything
|
||||
self._onInactive()
|
||||
else:
|
||||
if (self.numberOfRealTimePacketThreads > self.maxQueuedRealtimePackets):
|
||||
# To many packets, several previous LoopingCall's is not done yet.
|
||||
# We need to discard some packets, otherwise server will be overloaded and we will only send old packets.
|
||||
# Client is required to request total update now and then, so the discarded packets should be send to client later.
|
||||
|
||||
counter = self.aprsISReader.clear(5)
|
||||
#self.logger.warning('Discarding ' + str(counter) + ' packets')
|
||||
else:
|
||||
self.numberOfRealTimePacketThreads += 1
|
||||
deferred = threads.deferToThread(
|
||||
self._processRealTimePacket, raw, sourceId)
|
||||
deferred.addCallback(lambda _: onRealTimePacketComplete())
|
||||
deferred.addErrback(self._onError)
|
||||
|
||||
# Tell client we are connecting to real time feed
|
||||
self._sendResponseByType(34)
|
||||
self.aprsISReader.start() # Will start if needed and change filter if needed
|
||||
# Tell client we are listening on real time feed
|
||||
self._sendResponseByType(31)
|
||||
|
||||
self.realTimeListenerCall = task.LoopingCall(readRealTimePacket)
|
||||
self.realTimeListenerCall.start(0.2)
|
||||
|
||||
def _stopRealTimeListener(self, disconnect=False):
|
||||
"""Stop real time APRS-IS listener, onRealTimePacketFound will be executed when a packet is received
|
||||
|
||||
Args:
|
||||
disconnect (Boolean): Set to true to also disconnect from APRS-IS servers
|
||||
"""
|
||||
if (self.realTimeListenerCall is not None):
|
||||
try:
|
||||
self.realTimeListenerCall.stop()
|
||||
except (AlreadyCalled, AssertionError) as e:
|
||||
pass
|
||||
|
||||
if (disconnect):
|
||||
self.aprsISReader.stop()
|
||||
else:
|
||||
self.aprsISReader.pause()
|
||||
|
||||
def _processRealTimePacket(self, raw, sourceId):
|
||||
"""Method that is executed when we have a new real time packet to send
|
||||
|
||||
Args:
|
||||
raw (string): Raw packet from APRS-IS
|
||||
sourceId (int): The id of the source (1 for APRS and 2 for CWOP ...)
|
||||
"""
|
||||
try:
|
||||
for response in self.aprsISPayloadCreator.getPayloads(raw, sourceId):
|
||||
reactor.callFromThread(self._sendDictResponse, response)
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def _startTimestampSender(self):
|
||||
"""Method schedules call to _sendTimestampResponse to keep connection up
|
||||
"""
|
||||
self.timestampSenderCall = task.LoopingCall(
|
||||
self._sendTimestampResponse)
|
||||
self.timestampSenderCall.start(1.0)
|
||||
|
||||
def _stopTimestampSender(self):
|
||||
"""Stop looping call to _sendTimestampResponse
|
||||
"""
|
||||
if (self.timestampSenderCall is not None):
|
||||
try:
|
||||
self.timestampSenderCall.stop()
|
||||
except AssertionError as e:
|
||||
pass
|
||||
|
||||
def _reScheduleInactiveEvent(self):
|
||||
"""Method schedules call to _onInactive when client has been idle too long
|
||||
|
||||
Note:
|
||||
When _reScheduleInactiveEvent is called any previous schedules will be cancelled and countdown will be reset
|
||||
"""
|
||||
if (self.onInactiveCall is not None):
|
||||
try:
|
||||
self.onInactiveCall.cancel()
|
||||
except (AlreadyCalled, AlreadyCancelled) as e:
|
||||
pass
|
||||
self.onInactiveCall = reactor.callLater(
|
||||
self.maxClientIdleTime, self._onInactive)
|
||||
|
||||
def _onInactive(self):
|
||||
"""Method that is executed when client has been inactive too long
|
||||
"""
|
||||
try:
|
||||
# Client is inactive, pause (to save bandwidth, cpu and memory)
|
||||
self._sendResponseByType(36)
|
||||
self._stopTimestampSender()
|
||||
self._stopRealTimeListener(True)
|
||||
self.connectionState.totalReset()
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def _sendTimestampResponse(self):
|
||||
"""Send server timestamp to syncronize server and client
|
||||
|
||||
Notes:
|
||||
This is also used to tell the client that we are still here
|
||||
Most browser will disconnect if they do not hear anything in 300sec
|
||||
"""
|
||||
try:
|
||||
if (self.connectionState.latestHandledRequestId < self.connectionState.latestRequestId):
|
||||
# server is busy with request, no point in doing this now
|
||||
return
|
||||
data = {}
|
||||
data["timestamp"] = int(time.time())
|
||||
self._sendResponseByType(41, data)
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.error(e, exc_info=1)
|
||||
|
||||
def _sendResponseByType(self, payloadResponseType, data=None):
|
||||
"""Send specified response to client
|
||||
|
||||
Args:
|
||||
payloadResponseType (int): A number that specifies what type of response we are sending
|
||||
data (dict): The response data as a dict
|
||||
"""
|
||||
if (data is not None):
|
||||
payload = {
|
||||
'payload_response_type': payloadResponseType, 'data': data}
|
||||
else:
|
||||
payload = {'payload_response_type': payloadResponseType}
|
||||
self._sendDictResponse(payload)
|
||||
|
||||
def _sendDictResponse(self, payload):
|
||||
"""Send message dict payload to client
|
||||
|
||||
Args:
|
||||
payload (Dict): Response payload
|
||||
"""
|
||||
try:
|
||||
jsonPayload = json.dumps(payload, ensure_ascii=True).encode('utf8')
|
||||
if (jsonPayload is not None):
|
||||
self.sendMessage(jsonPayload)
|
||||
except psycopg2.InterfaceError as e:
|
||||
# Connection to database is lost, better just terminate connection to make user reconnect with new db connection
|
||||
self.logger.error(e, exc_info=1)
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log error to make us aware of unknow problem
|
||||
self.logger.warning(e, exc_info=1)
|
||||
|
||||
def _updateState(self, request):
|
||||
"""Update the connection state based on request
|
||||
|
||||
Args:
|
||||
request (Dict): Request form client
|
||||
"""
|
||||
if ("neLat" in request
|
||||
and "neLng" in request
|
||||
and "swLat" in request
|
||||
and "swLng" in request
|
||||
and "minutes" in request):
|
||||
self.connectionState.setLatestMapBounds(
|
||||
request["neLat"], request["neLng"], request["swLat"], request["swLng"])
|
||||
|
||||
if ("onlyLatestPacket" in request):
|
||||
self.connectionState.setOnlyLatestPacketRequested(
|
||||
(request["onlyLatestPacket"] == 1))
|
||||
|
||||
if ("minutes" in request):
|
||||
if ("time" in request):
|
||||
self.connectionState.setLatestMinutes(
|
||||
request["minutes"], request["time"])
|
||||
else:
|
||||
self.connectionState.setLatestMinutes(request["minutes"], None)
|
||||
|
||||
if ("noRealTime" in request):
|
||||
self.connectionState.disableRealTime()
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
__version__ = "1.0"
|
||||
__author__ = "Per Qvarforth"
|
||||
|
||||
from TrackDirectDataCollector import *
|
||||
from TrackDirectWebsocketServer import *
|
||||
from TrackDirectHeatMapCreator import *
|
||||
from TrackDirectConfig import *
|
||||
Loading…
Reference in New Issue