trackdirect2/server/trackdirect/parser/AprsISConnection.py

135 lines
4.1 KiB
Python

import logging
import aprslib
import collections
import time
import re
class AprsISConnection(aprslib.IS):
"""Handles communication with the APRS-IS server."""
def __init__(self, callsign, passwd="-1", host="rotate.aprs.net", port=10152):
"""
Initialize the APRS-IS connection.
Args:
callsign (str): APRS-IS callsign.
passwd (str): APRS-IS password.
host (str): APRS-IS Server hostname.
port (int): APRS-IS Server port.
"""
super().__init__(callsign, passwd, host, port)
self.logger = logging.getLogger("aprslib.IS")
self.frequency_limit = None
self.station_hash_timestamps = collections.OrderedDict()
self.source_id = 1
def set_frequency_limit(self, frequency_limit):
"""
Set frequency limit.
Args:
frequency_limit (int): Hard frequency limit (in seconds).
"""
self.frequency_limit = frequency_limit
def get_frequency_limit(self):
"""
Get frequency limit.
Returns:
int: The frequency limit.
"""
return self.frequency_limit
def set_source_id(self, source_id):
"""
Set the source ID for the packet (APRS, CWOP, etc.).
Args:
source_id (int): ID that corresponds to ID in the source table.
"""
self.source_id = source_id
def filtered_consumer(self, callback, blocking=True, raw=False):
"""
Consume packets with filtering.
Args:
callback (callable): Method to call with the result.
blocking (bool): Set to True if consume should be blocking.
raw (bool): Set to True if result should be raw.
"""
def filter_callback(line):
try:
line = line.decode().replace('\x00', '')
except UnicodeError:
return
if line.startswith('dup'):
line = line[4:].strip()
if self._is_sending_too_fast(line):
return
if raw:
callback(line)
else:
callback(self._parse(line))
self.consumer(filter_callback, blocking, False, True)
def _is_sending_too_fast(self, line):
"""
Check if sending frequency limit is too fast.
Args:
line (str): Packet string.
Returns:
bool: True if sending frequency limit is too fast, False otherwise.
"""
if self.frequency_limit is not None:
try:
name, other = line.split('>', 1)
head, body = other.split(':', 1)
except ValueError:
return False
if not body:
return False
packet_type = body[0]
body = body[1:]
frequency_limit_to_apply = int(self.frequency_limit)
if self.source_id == 5:
match = re.search(r"(\+|\-)(\d\.\d)rot ", line)
if match:
try:
turn_rate = abs(float(match.group(2)))
if turn_rate > 0:
frequency_limit_to_apply = int(frequency_limit_to_apply / (1 + turn_rate))
except ValueError:
pass
latest_timestamp_on_map = self.station_hash_timestamps.get(name + packet_type, 0)
current_time = int(time.time()) - 1
if (current_time - frequency_limit_to_apply) < latest_timestamp_on_map:
return True
self.station_hash_timestamps[name + packet_type] = current_time
self._cache_maintenance()
return False
def _cache_maintenance(self):
"""Ensure cache does not contain too many packets."""
if self.frequency_limit is not None:
max_packets = int(self.frequency_limit) * 1000
while len(self.station_hash_timestamps) > max_packets:
try:
self.station_hash_timestamps.popitem(last=False)
except KeyError:
break