trackdirect2/server/trackdirect/websocket/aprsis/AprsISReader.py

145 lines
6.4 KiB
Python

import logging
from server.trackdirect.TrackDirectConfig import TrackDirectConfig
from server.trackdirect.parser.AprsISConnection import AprsISConnection
from server.trackdirect.repositories.SenderRepository import SenderRepository
class AprsISReader:
"""The AprsISReader class will connect to an APRS-IS server and listen for APRS packets."""
def __init__(self, state, db):
"""Initialize the AprsISReader.
Args:
state (ConnectionState): Instance of ConnectionState which contains the current state of the connection.
db (psycopg2.Connection): Database connection (with autocommit).
"""
self.state = state
self.latest_real_time_filter = None
self.sender_repository = SenderRepository(db)
self.aprs_is_connection1 = None
self.aprs_is_connection2 = None
self.logger = logging.getLogger('trackdirect')
self.config = TrackDirectConfig()
def start(self):
"""Connect to APRS-IS servers based on current state."""
self._connect()
self._modify_filter()
def read(self, callback):
"""Read data from the APRS-IS servers, specified callback will be called once for every waiting packet.
Args:
callback (method): Method to call when we read a packet from the APRS IS connection.
Note:
Callback function is expecting to take 2 arguments, the "packet raw" as a string and a "source id" as an integer.
"""
try:
if self.aprs_is_connection1 or self.aprs_is_connection2:
def aprs_is_callback1(line):
callback(line, self.config.websocket_aprs_source_id1)
def aprs_is_callback2(line):
callback(line, self.config.websocket_aprs_source_id2)
if self.aprs_is_connection2:
self.aprs_is_connection2.filtered_consumer(aprs_is_callback2, False, True)
if self.aprs_is_connection1:
self.aprs_is_connection1.filtered_consumer(aprs_is_callback1, False, True)
except IOError:
callback(None, None)
except Exception as e:
self.logger.error(f"Error reading from APRS-IS: {e}", exc_info=True)
callback(None, None)
def pause(self):
"""Pause without closing the AprsISConnection, we just set filter to nothing, that will result in no received packets."""
self.latest_real_time_filter = None
if self.aprs_is_connection1:
self.aprs_is_connection1.set_filter('')
if self.aprs_is_connection2:
self.aprs_is_connection2.set_filter('')
def stop(self):
"""Kill the connections to the APRS-IS servers."""
if self.aprs_is_connection2:
self.aprs_is_connection2.close()
self.aprs_is_connection2 = None
if self.aprs_is_connection1:
self.aprs_is_connection1.close()
self.aprs_is_connection1 = None
def clear(self, limit=None):
"""Clear the socket from all waiting lines.
Args:
limit (int): Max number of packets to clear (per connection).
"""
counter1 = 0
counter2 = 0
if self.aprs_is_connection2:
for _ in self.aprs_is_connection2._socket_readlines(False):
counter1 += 1
if limit is not None and counter1 >= limit:
break
if self.aprs_is_connection1:
for _ in self.aprs_is_connection1._socket_readlines(False):
counter2 += 1
if limit is not None and counter2 >= limit:
break
return counter1 + counter2
def _connect(self):
"""Connect to APRS-IS servers (if not already connected)."""
if self.aprs_is_connection1 is None :
self.latest_real_time_filter = None
# Avoid using a verified user since server will not accept two verified users with same name
if self.config.websocket_aprs_host1 is not None :
try:
self.aprs_is_connection1 = AprsISConnection("NOCALL", "-1", self.config.websocket_aprs_host1, self.config.websocket_aprs_port1)
if self.config.websocket_frequency_limit != 0 :
self.aprs_is_connection1.set_frequency_limit(self.config.websocket_frequency_limit)
self.aprs_is_connection1.connect()
except Exception as e:
self.logger.error(e, exc_info=1)
if self.aprs_is_connection2 is None :
self.latest_real_time_filter = None
# Avoid using a verified user since server will not accept two verified users with same name
if self.config.websocket_aprs_host2 is not None :
try:
self.aprs_is_connection2 = AprsISConnection("NOCALL", "-1", self.config.websocket_aprs_host2, self.config.websocket_aprs_port2)
if self.config.websocket_frequency_limit != 0 :
self.aprs_is_connection2.set_frequency_limit(self.config.websocket_frequency_limit)
self.aprs_is_connection2.connect()
except Exception as e:
self.logger.error(e, exc_info=1)
def _modify_filter(self):
"""Set a new filter for the APRS-IS connections according to the latest requested map bounds."""
new_filter = self._get_new_filter()
if self.latest_real_time_filter is not None and new_filter == self.latest_real_time_filter:
return
self.clear()
self.latest_real_time_filter = new_filter
if self.aprs_is_connection1:
self.aprs_is_connection1.set_filter(new_filter)
if self.aprs_is_connection2:
self.aprs_is_connection2.set_filter(new_filter)
def _get_new_filter(self):
"""Create new APRS-IS filter based on the latest requested map bounds."""
if self.state.filter_station_id_dict:
filter_str = "b"
for station_id in self.state.filter_station_id_dict:
sender = self.sender_repository.get_object_by_station_id(station_id)
if sender.name:
filter_str += f"/{sender.name}"
else:
filter_str = f"a/{self.state.latest_ne_lat + 0.1}/{self.state.latest_sw_lng - 0.1}/" \
f"{self.state.latest_sw_lat - 0.1}/{self.state.latest_ne_lng + 0.1}"
return filter_str