diff --git a/config/trackdirect.ini b/config/trackdirect.ini index fa4ee97..0a38d5e 100644 --- a/config/trackdirect.ini +++ b/config/trackdirect.ini @@ -132,11 +132,15 @@ passcode="-1" numbers_in_batch="50" ;; Packets received more frequently than the configured frequency limit will not be shown on map (limit is specified in seconds) -;; When receiving data from the OGN network this needs to be 20s or more. -;; If setting save_fast_packets to "0", packets that is received to frequently will not be save (useful for OGN, but not for APRS-IS). +;; Set to "0" to disable the frequency limit (note that the limit must be 20s or more when receiving data from OGN network)). frequency_limit="5" + +;; If save_fast_packets is set to "0", packets that is received to frequently will not be saved (useful for OGN, but not recommended for APRS-IS). save_fast_packets="1" +;; If detect_duplicates is set to "1" we will try to detect duplicates and ignore them. +detect_duplicates="1" + ;; Collector error log error_log="~/trackdirect/server/log/collector.log" @@ -155,5 +159,6 @@ error_log="~/trackdirect/server/log/collector.log" ;numbers_in_batch="30" ;frequency_limit="5" ;save_fast_packets="0" +;detect_duplicates="1" ; ;error_log="~/trackdirect/server/log/collector1.log" diff --git a/server/trackdirect/TrackDirectConfig.py b/server/trackdirect/TrackDirectConfig.py index 035c7ee..0ab9c78 100644 --- a/server/trackdirect/TrackDirectConfig.py +++ b/server/trackdirect/TrackDirectConfig.py @@ -143,13 +143,27 @@ class TrackDirectConfig(Singleton): self.collector[collectorNumber]['numbers_in_batch'] = configParser.get( 'collector' + str(collectorNumber), 'numbers_in_batch').strip('"') - self.collector[collectorNumber]['frequency_limit'] = configParser.get( - 'collector' + str(collectorNumber), 'frequency_limit').strip('"') + try: + self.collector[collectorNumber]['frequency_limit'] = configParser.get( + 'collector' + str(collectorNumber), 'frequency_limit').strip('"') + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['frequency_limit'] = "0" - saveFastPackets = configParser.get( - 'collector' + str(collectorNumber), 'save_fast_packets').strip('"') - self.collector[collectorNumber]['save_fast_packets'] = bool( - int(saveFastPackets)) + try: + saveFastPackets = configParser.get( + 'collector' + str(collectorNumber), 'save_fast_packets').strip('"') + self.collector[collectorNumber]['save_fast_packets'] = bool( + int(saveFastPackets)) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['save_fast_packets'] = False + + try: + detectDuplicates = configParser.get( + 'collector' + str(collectorNumber), 'detect_duplicates').strip('"') + self.collector[collectorNumber]['detect_duplicates'] = bool( + int(detectDuplicates)) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError): + self.collector[collectorNumber]['detect_duplicates'] = False self.collector[collectorNumber]['error_log'] = configParser.get( 'collector' + str(collectorNumber), 'error_log').strip('"') @@ -173,5 +187,6 @@ class TrackDirectConfig(Singleton): self.collector[collectorNumber]['numbers_in_batch'] = "20" self.collector[collectorNumber]['frequency_limit'] = "0" self.collector[collectorNumber]['save_fast_packets'] = True + self.collector[collectorNumber]['detect_duplicates'] = False self.collector[collectorNumber]['error_log'] = None diff --git a/server/trackdirect/TrackDirectDataCollector.py b/server/trackdirect/TrackDirectDataCollector.py index cea4ebd..fa5d01f 100644 --- a/server/trackdirect/TrackDirectDataCollector.py +++ b/server/trackdirect/TrackDirectDataCollector.py @@ -19,6 +19,8 @@ from trackdirect.database.DatabaseConnection import DatabaseConnection from trackdirect.repositories.StationRepository import StationRepository from trackdirect.objects.Packet import Packet +#from pympler.tracker import SummaryTracker + class TrackDirectDataCollector(): """An TrackDirectDataCollector instance connects to the data source and saves all received packets to the database @@ -40,10 +42,11 @@ class TrackDirectDataCollector(): self.numbersInBatch = collectorOptions['numbers_in_batch'] self.saveFastPackets = collectorOptions['save_fast_packets'] self.frequencyLimit = collectorOptions['frequency_limit'] + self.detectDuplicates = collectorOptions['detect_duplicates'] self.hardFrequencyLimit = None - if (not self.saveFastPackets): + if (not self.saveFastPackets and self.frequencyLimit is not None and int(self.frequencyLimit) > 0): # Only respect hard frequency limit if we are not saving "fast packets" - self.hardFrequencyLimit = collectorOptions['frequency_limit'] + self.hardFrequencyLimit = self.frequencyLimit self.sourceId = collectorOptions['source_id'] self.callsign = collectorOptions['callsign'] self.passcode = collectorOptions['passcode'] @@ -75,6 +78,9 @@ class TrackDirectDataCollector(): def consume(self): """Start consuming packets """ + + #tracker = SummaryTracker() + connection = AprsISConnection( self.callsign, self.passcode, self.sourceHostname, self.sourcePort) connection.setFrequencyLimit(self.hardFrequencyLimit) @@ -84,12 +90,6 @@ class TrackDirectDataCollector(): if (not reactor.running): raise StopIteration('Stopped') - if (self.hardFrequencyLimit is not None): - if (self.delay > 3): - connection.setFrequencyLimit(self.hardFrequencyLimit * 2) - elif (self.delay <= 1 and connection.getFrequencyLimit() != self.hardFrequencyLimit): - connection.setFrequencyLimit(self.hardFrequencyLimit) - timestamp = int(time.time()) deferred = threads.deferToThread(self._parse, line, timestamp) deferred.addCallback(onParseComplete) @@ -110,6 +110,8 @@ class TrackDirectDataCollector(): connection.connect() connection.filteredConsumer(onPacketRead, True, True) + #tracker.print_diff() + except (aprslib.ConnectionDrop) as exp: # Just reconnect... self.logger.warning('Lost connection') @@ -151,7 +153,9 @@ class TrackDirectDataCollector(): if (packet.mapId == 15 or packet.mapId == 16): return None - self._checkIfDuplicate(packet) + if (self.detectDuplicates): + self._checkIfDuplicate(packet) + return self._cleanPacket(packet) except (aprslib.ParseError, aprslib.UnknownFormat, TrackDirectParseError) as exp: @@ -246,9 +250,12 @@ class TrackDirectDataCollector(): Returns: Boolean """ - if (packet.mapId in [1, 5, 7, 9] and packet.isMoving == 1): + if (packet.mapId in [1, 5, 7, 9] and packet.isMoving == 1 and self.frequencyLimit is not None): frequencyLimitToApply = int(self.frequencyLimit) + if (frequencyLimitToApply == 0): + return False + if (packet.ogn is not None and packet.ogn.ognTurnRate is not None): turnRate = abs(float(packet.ogn.ognTurnRate)) if (turnRate > 0) : diff --git a/server/trackdirect/parser/AprsISConnection.py b/server/trackdirect/parser/AprsISConnection.py index d4613f6..4f097ca 100644 --- a/server/trackdirect/parser/AprsISConnection.py +++ b/server/trackdirect/parser/AprsISConnection.py @@ -25,7 +25,7 @@ class AprsISConnection(aprslib.IS): self.logger = logging.getLogger("aprslib.IS") self.frequencyLimit = None - self.stationHashTimestamps = {} + self.stationHashTimestamps = collections.OrderedDict() self.sourceId = 1 def setFrequencyLimit(self, frequencyLimit): @@ -115,4 +115,16 @@ class AprsISConnection(aprslib.IS): # This sender is sending faster than config limit return True self.stationHashTimestamps[name] = int(time.time()) - 1 + self._cacheMaintenance() return False + + def _cacheMaintenance(self): + """Make sure cache does not contain to many packets + """ + frequencyLimitToApply = int(self.frequencyLimit) + maxNumberOfPackets = frequencyLimitToApply * 1000 # We assume that we never have more than 1000 packets per second + if (len(self.stationHashTimestamps) > maxNumberOfPackets): + try: + self.stationHashTimestamps.popitem(last=False) + except (KeyError, StopIteration) as e: + pass \ No newline at end of file diff --git a/server/trackdirect/parser/policies/PacketDuplicatePolicy.py b/server/trackdirect/parser/policies/PacketDuplicatePolicy.py index 7bf9cc0..4e3c084 100644 --- a/server/trackdirect/parser/policies/PacketDuplicatePolicy.py +++ b/server/trackdirect/parser/policies/PacketDuplicatePolicy.py @@ -1,3 +1,6 @@ +import logging +from twisted.python import log + import collections from trackdirect.exceptions.TrackDirectParseError import TrackDirectParseError @@ -12,9 +15,11 @@ class PacketDuplicatePolicy(): def __init__(self, stationRepository): """The __init__ method. """ - self.minutesBackToLookForDuplicates = 50 + self.minutesBackToLookForDuplicates = 30 self.stationRepository = stationRepository + self.logger = logging.getLogger('trackdirect') + def isDuplicate(self, packet): """Method used to check if this packet is a duplicate @@ -124,12 +129,8 @@ class PacketDuplicatePolicy(): def _cacheMaintenance(self): """Make sure cache does not contain to many packets - - Note: - We want to keep messages for about 50min and calulate with 70packet/second - (I have seen duplicates that is 45 minutes old, some repeaters seems to store packet for a long time before they resend them) """ - maxNumberOfPackets = self.minutesBackToLookForDuplicates * 60 * 70 + maxNumberOfPackets = self.minutesBackToLookForDuplicates * 60 * 100 # We assume that we have an average of 100 packets per second if (len(PacketDuplicatePolicy.latestPacketsHashOrderedDict) > maxNumberOfPackets): try: PacketDuplicatePolicy.latestPacketsHashOrderedDict.popitem(