reduced memory usage and made it possible to disable duplicate detection

This commit is contained in:
Per Qvarforth 2022-04-18 15:11:54 +02:00
parent 301e4abf28
commit 0a41df1bd9
5 changed files with 65 additions and 25 deletions

View File

@ -132,11 +132,15 @@ passcode="-1"
numbers_in_batch="50"
;; Packets received more frequently than the configured frequency limit will not be shown on map (limit is specified in seconds)
;; When receiving data from the OGN network this needs to be 20s or more.
;; If setting save_fast_packets to "0", packets that is received to frequently will not be save (useful for OGN, but not for APRS-IS).
;; Set to "0" to disable the frequency limit (note that the limit must be 20s or more when receiving data from OGN network)).
frequency_limit="5"
;; If save_fast_packets is set to "0", packets that is received to frequently will not be saved (useful for OGN, but not recommended for APRS-IS).
save_fast_packets="1"
;; If detect_duplicates is set to "1" we will try to detect duplicates and ignore them.
detect_duplicates="1"
;; Collector error log
error_log="~/trackdirect/server/log/collector.log"
@ -155,5 +159,6 @@ error_log="~/trackdirect/server/log/collector.log"
;numbers_in_batch="30"
;frequency_limit="5"
;save_fast_packets="0"
;detect_duplicates="1"
;
;error_log="~/trackdirect/server/log/collector1.log"

View File

@ -143,13 +143,27 @@ class TrackDirectConfig(Singleton):
self.collector[collectorNumber]['numbers_in_batch'] = configParser.get(
'collector' + str(collectorNumber), 'numbers_in_batch').strip('"')
self.collector[collectorNumber]['frequency_limit'] = configParser.get(
'collector' + str(collectorNumber), 'frequency_limit').strip('"')
try:
self.collector[collectorNumber]['frequency_limit'] = configParser.get(
'collector' + str(collectorNumber), 'frequency_limit').strip('"')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
self.collector[collectorNumber]['frequency_limit'] = "0"
saveFastPackets = configParser.get(
'collector' + str(collectorNumber), 'save_fast_packets').strip('"')
self.collector[collectorNumber]['save_fast_packets'] = bool(
int(saveFastPackets))
try:
saveFastPackets = configParser.get(
'collector' + str(collectorNumber), 'save_fast_packets').strip('"')
self.collector[collectorNumber]['save_fast_packets'] = bool(
int(saveFastPackets))
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
self.collector[collectorNumber]['save_fast_packets'] = False
try:
detectDuplicates = configParser.get(
'collector' + str(collectorNumber), 'detect_duplicates').strip('"')
self.collector[collectorNumber]['detect_duplicates'] = bool(
int(detectDuplicates))
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
self.collector[collectorNumber]['detect_duplicates'] = False
self.collector[collectorNumber]['error_log'] = configParser.get(
'collector' + str(collectorNumber), 'error_log').strip('"')
@ -173,5 +187,6 @@ class TrackDirectConfig(Singleton):
self.collector[collectorNumber]['numbers_in_batch'] = "20"
self.collector[collectorNumber]['frequency_limit'] = "0"
self.collector[collectorNumber]['save_fast_packets'] = True
self.collector[collectorNumber]['detect_duplicates'] = False
self.collector[collectorNumber]['error_log'] = None

View File

@ -19,6 +19,8 @@ from trackdirect.database.DatabaseConnection import DatabaseConnection
from trackdirect.repositories.StationRepository import StationRepository
from trackdirect.objects.Packet import Packet
#from pympler.tracker import SummaryTracker
class TrackDirectDataCollector():
"""An TrackDirectDataCollector instance connects to the data source and saves all received packets to the database
@ -40,10 +42,11 @@ class TrackDirectDataCollector():
self.numbersInBatch = collectorOptions['numbers_in_batch']
self.saveFastPackets = collectorOptions['save_fast_packets']
self.frequencyLimit = collectorOptions['frequency_limit']
self.detectDuplicates = collectorOptions['detect_duplicates']
self.hardFrequencyLimit = None
if (not self.saveFastPackets):
if (not self.saveFastPackets and self.frequencyLimit is not None and int(self.frequencyLimit) > 0):
# Only respect hard frequency limit if we are not saving "fast packets"
self.hardFrequencyLimit = collectorOptions['frequency_limit']
self.hardFrequencyLimit = self.frequencyLimit
self.sourceId = collectorOptions['source_id']
self.callsign = collectorOptions['callsign']
self.passcode = collectorOptions['passcode']
@ -75,6 +78,9 @@ class TrackDirectDataCollector():
def consume(self):
"""Start consuming packets
"""
#tracker = SummaryTracker()
connection = AprsISConnection(
self.callsign, self.passcode, self.sourceHostname, self.sourcePort)
connection.setFrequencyLimit(self.hardFrequencyLimit)
@ -84,12 +90,6 @@ class TrackDirectDataCollector():
if (not reactor.running):
raise StopIteration('Stopped')
if (self.hardFrequencyLimit is not None):
if (self.delay > 3):
connection.setFrequencyLimit(self.hardFrequencyLimit * 2)
elif (self.delay <= 1 and connection.getFrequencyLimit() != self.hardFrequencyLimit):
connection.setFrequencyLimit(self.hardFrequencyLimit)
timestamp = int(time.time())
deferred = threads.deferToThread(self._parse, line, timestamp)
deferred.addCallback(onParseComplete)
@ -110,6 +110,8 @@ class TrackDirectDataCollector():
connection.connect()
connection.filteredConsumer(onPacketRead, True, True)
#tracker.print_diff()
except (aprslib.ConnectionDrop) as exp:
# Just reconnect...
self.logger.warning('Lost connection')
@ -151,7 +153,9 @@ class TrackDirectDataCollector():
if (packet.mapId == 15 or packet.mapId == 16):
return None
self._checkIfDuplicate(packet)
if (self.detectDuplicates):
self._checkIfDuplicate(packet)
return self._cleanPacket(packet)
except (aprslib.ParseError, aprslib.UnknownFormat, TrackDirectParseError) as exp:
@ -246,9 +250,12 @@ class TrackDirectDataCollector():
Returns:
Boolean
"""
if (packet.mapId in [1, 5, 7, 9] and packet.isMoving == 1):
if (packet.mapId in [1, 5, 7, 9] and packet.isMoving == 1 and self.frequencyLimit is not None):
frequencyLimitToApply = int(self.frequencyLimit)
if (frequencyLimitToApply == 0):
return False
if (packet.ogn is not None and packet.ogn.ognTurnRate is not None):
turnRate = abs(float(packet.ogn.ognTurnRate))
if (turnRate > 0) :

View File

@ -25,7 +25,7 @@ class AprsISConnection(aprslib.IS):
self.logger = logging.getLogger("aprslib.IS")
self.frequencyLimit = None
self.stationHashTimestamps = {}
self.stationHashTimestamps = collections.OrderedDict()
self.sourceId = 1
def setFrequencyLimit(self, frequencyLimit):
@ -115,4 +115,16 @@ class AprsISConnection(aprslib.IS):
# This sender is sending faster than config limit
return True
self.stationHashTimestamps[name] = int(time.time()) - 1
self._cacheMaintenance()
return False
def _cacheMaintenance(self):
"""Make sure cache does not contain to many packets
"""
frequencyLimitToApply = int(self.frequencyLimit)
maxNumberOfPackets = frequencyLimitToApply * 1000 # We assume that we never have more than 1000 packets per second
if (len(self.stationHashTimestamps) > maxNumberOfPackets):
try:
self.stationHashTimestamps.popitem(last=False)
except (KeyError, StopIteration) as e:
pass

View File

@ -1,3 +1,6 @@
import logging
from twisted.python import log
import collections
from trackdirect.exceptions.TrackDirectParseError import TrackDirectParseError
@ -12,9 +15,11 @@ class PacketDuplicatePolicy():
def __init__(self, stationRepository):
"""The __init__ method.
"""
self.minutesBackToLookForDuplicates = 50
self.minutesBackToLookForDuplicates = 30
self.stationRepository = stationRepository
self.logger = logging.getLogger('trackdirect')
def isDuplicate(self, packet):
"""Method used to check if this packet is a duplicate
@ -124,12 +129,8 @@ class PacketDuplicatePolicy():
def _cacheMaintenance(self):
"""Make sure cache does not contain to many packets
Note:
We want to keep messages for about 50min and calulate with 70packet/second
(I have seen duplicates that is 45 minutes old, some repeaters seems to store packet for a long time before they resend them)
"""
maxNumberOfPackets = self.minutesBackToLookForDuplicates * 60 * 70
maxNumberOfPackets = self.minutesBackToLookForDuplicates * 60 * 100 # We assume that we have an average of 100 packets per second
if (len(PacketDuplicatePolicy.latestPacketsHashOrderedDict) > maxNumberOfPackets):
try:
PacketDuplicatePolicy.latestPacketsHashOrderedDict.popitem(