trackdirect2/server/trackdirect/TrackDirectWebsocketServer.py

309 lines
13 KiB
Python

import logging
from twisted.internet import threads, reactor, task
from twisted.internet.error import AlreadyCancelled, AlreadyCalled
from autobahn.twisted.websocket import WebSocketServerProtocol
import json
import time
import psycopg2
import psycopg2.extras
import os
from server.trackdirect.TrackDirectConfig import TrackDirectConfig
from server.trackdirect.database.DatabaseConnection import DatabaseConnection
from server.trackdirect.websocket.WebsocketResponseCreator import WebsocketResponseCreator
from server.trackdirect.websocket.WebsocketConnectionState import WebsocketConnectionState
from server.trackdirect.websocket.aprsis.AprsISReader import AprsISReader
from server.trackdirect.websocket.aprsis.AprsISPayloadCreator import AprsISPayloadCreator
class TrackDirectWebsocketServer(WebSocketServerProtocol):
"""The TrackDirectWebsocketServer class handles the incoming requests."""
def __init__(self):
"""Initialize the TrackDirectWebsocketServer."""
super().__init__()
self.logger = logging.getLogger('trackdirect')
self.max_queued_realtime_packets = None
self.max_client_idle_time = None
db_connection = DatabaseConnection()
db = db_connection.get_connection(True)
self.connection_state = WebsocketConnectionState()
self.response_creator = WebsocketResponseCreator(self.connection_state, db)
self.aprs_is_reader = AprsISReader(self.connection_state, db)
self.aprs_is_payload_creator = AprsISPayloadCreator(self.connection_state, db)
self.number_of_real_time_packet_threads = 0
self.timestamp_sender_call = None
self.real_time_listener_call = None
self.on_inactive_call = None
self.is_unknown_client = False
def onConnect(self, request):
"""Executed on connect."""
try:
config = TrackDirectConfig()
config.populate(self.factory.config_file)
self.max_client_idle_time = int(config.max_client_idle_time) * 60
self.max_queued_realtime_packets = int(config.max_queued_realtime_packets)
if 'x-forwarded-for' in request.headers:
self.logger.warning(
f"Client connecting from origin: {request.origin}, x-forwarded-for: {request.headers['x-forwarded-for']} (server pid {os.getpid()})"
)
else:
self.logger.warning(
f"Client connecting from origin: {request.origin} (server pid {os.getpid()})"
)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
def onOpen(self):
"""Executed on open."""
try:
self.logger.info("WebSocket connection open.")
self._send_response_by_type(42) # Inform client that we are active
self._start_timestamp_sender()
self._re_schedule_inactive_event()
except Exception as e:
self.logger.error(e, exc_info=True)
def onMessage(self, payload, is_binary):
"""Executed on incoming message."""
try:
request = json.loads(payload)
if self.is_unknown_client:
self.logger.warning(f"Incoming message from unknown client: {request}")
if "payload_request_type" not in request:
self.logger.warning("Incoming request has no type")
self.logger.warning(payload)
return
self._on_request(request)
except ValueError as exp:
self.logger.warning(f"Incoming request could not be parsed ({exp})")
self.logger.warning(payload)
except psycopg2.InterfaceError as e:
self.logger.error(e, exc_info=True)
raise
except Exception as e:
self.logger.error(e, exc_info=True)
def onClose(self, was_clean, code, reason):
"""Executed on close."""
try:
self.logger.info(f"WebSocket connection closed: {reason}")
self.connection_state.disconnected = True
self._stop_timestamp_sender()
self._stop_real_time_listener(True)
except Exception as e:
self.logger.error(e, exc_info=True)
def _on_request(self, request, request_id=None):
"""Executed on incoming request."""
if request["payload_request_type"] != 11:
self._re_schedule_inactive_event()
if request["payload_request_type"] in [5, 7, 9]:
deferred = threads.deferToThread(self._process_request, request, None)
deferred.addErrback(self._on_error)
else:
if request_id is None:
request_id = self.connection_state.latest_requestId + 1
self.connection_state.latest_request_type = request["payload_request_type"]
self.connection_state.latest_requestId = request_id
self.connection_state.latest_request_timestamp = int(time.time())
self._stop_real_time_listener(False)
if self.connection_state.latest_handled_request_id < request_id - 1:
reactor.callLater(0.1, self._on_request, request, request_id)
else:
self._update_state(request)
deferred = threads.deferToThread(self._process_request, request, request_id)
deferred.addErrback(self._on_error)
deferred.addCallback(self._on_request_done)
def _process_request(self, request, request_id):
"""Send a response to websocket client based on request."""
try:
for response in self.response_creator.get_responses(request, request_id):
if self.connection_state.disconnected:
break
reactor.callFromThread(self._send_dict_response, response)
return request_id
except psycopg2.InterfaceError as e:
self.logger.error(e, exc_info=True)
raise
except Exception as e:
self.logger.error(e, exc_info=True)
def _on_request_done(self, request_id):
"""Executed when request is processed."""
try:
if self.connection_state.latest_handled_request_id < request_id:
self.connection_state.latest_handled_request_id = request_id
if self.connection_state.latest_requestId == request_id:
self._send_response_by_type(35)
if (self.connection_state.latest_time_travel_request is None
and not self.connection_state.no_real_time
and self.connection_state.is_valid_latest_position()):
self._start_real_time_listener(request_id)
elif (int(time.time()) - self.connection_state.latest_request_timestamp) <= self.max_client_idle_time:
self._send_response_by_type(33)
except psycopg2.InterfaceError as e:
raise
except Exception as e:
self.logger.error(e, exc_info=True)
def _on_error(self, error):
"""Executed when a deferToThread failed."""
if reactor.running:
reactor.stop()
raise error
def _start_real_time_listener(self, related_request_id):
"""Start real time APRS-IS listener."""
def read_real_time_packet():
if (self.connection_state.latest_requestId == related_request_id
and not self.connection_state.disconnected):
self.aprs_is_reader.read(on_real_time_packet_found)
def on_real_time_packet_complete():
self.number_of_real_time_packet_threads -= 1
if self.number_of_real_time_packet_threads <= 0:
read_real_time_packet()
def on_real_time_packet_found(raw, source_id):
if raw is None and source_id is None:
self._on_inactive()
else:
if self.number_of_real_time_packet_threads > self.max_queued_realtime_packets:
self.aprs_is_reader.clear(5)
else:
self.number_of_real_time_packet_threads += 1
deferred = threads.deferToThread(self._process_real_time_packet, raw, source_id)
deferred.addCallback(lambda _: on_real_time_packet_complete())
deferred.addErrback(self._on_error)
self._send_response_by_type(34)
self.aprs_is_reader.start()
self._send_response_by_type(31)
self.real_time_listener_call = task.LoopingCall(read_real_time_packet)
self.real_time_listener_call.start(0.2, False)
def _stop_real_time_listener(self, disconnect=False):
"""Stop real time APRS-IS listener."""
if self.real_time_listener_call is not None:
try:
self.real_time_listener_call.stop()
except (AlreadyCalled, AssertionError):
pass
if disconnect:
self.aprs_is_reader.stop()
else:
self.aprs_is_reader.pause()
def _process_real_time_packet(self, raw, source_id):
"""Executed when we have a new real time packet to send."""
try:
for response in self.aprs_is_payload_creator.get_payloads(raw, source_id):
reactor.callFromThread(self._send_dict_response, response)
except psycopg2.InterfaceError as e:
self.logger.error(e, exc_info=True)
raise
except Exception as e:
self.logger.error(e, exc_info=True)
def _start_timestamp_sender(self):
"""Schedule call to _sendTimestampResponse to keep connection up."""
self.timestamp_sender_call = task.LoopingCall(self._send_timestamp_response)
self.timestamp_sender_call.start(1.0)
def _stop_timestamp_sender(self):
"""Stop looping call to _sendTimestampResponse."""
if self.timestamp_sender_call is not None:
try:
self.timestamp_sender_call.stop()
except AssertionError:
pass
def _re_schedule_inactive_event(self):
"""Schedule call to _onInactive when client has been idle too long."""
if self.on_inactive_call is not None:
try:
self.on_inactive_call.cancel()
except (AlreadyCalled, AlreadyCancelled):
pass
self.on_inactive_call = reactor.callLater(self.max_client_idle_time, self._on_inactive)
def _on_inactive(self):
"""Executed when client has been inactive too long."""
try:
self._send_response_by_type(36)
self._stop_timestamp_sender()
self._stop_real_time_listener(True)
self.connection_state.total_reset()
except psycopg2.InterfaceError as e:
self.logger.error(e, exc_info=True)
raise
except Exception as e:
self.logger.warning('Exception in _on_inactive')
self.logger.warning(e, exc_info=False)
def _send_timestamp_response(self):
"""Send server timestamp to synchronize server and client."""
try:
if self.connection_state.latest_handled_request_id < self.connection_state.latest_requestId:
return
data = {"timestamp": int(time.time())}
self._send_response_by_type(41, data)
except psycopg2.InterfaceError as e:
self.logger.error(e, exc_info=True)
raise
except Exception as e:
self.logger.warning('Exception in _send_timestamp_response')
self.logger.warning(e, exc_info=False)
def _send_response_by_type(self, payload_response_type, data=None):
"""Send specified response to client."""
payload = {'payload_response_type': payload_response_type}
if data is not None:
payload['data'] = data
self._send_dict_response(payload)
def _send_dict_response(self, payload):
"""Send message dict payload to client."""
try:
json_payload = json.dumps(payload, ensure_ascii=True).encode('utf8')
if json_payload is not None:
self.sendMessage(json_payload)
except psycopg2.InterfaceError as e:
self.logger.error(e, exc_info=True)
raise
except Exception as e:
self.logger.warning('Exception in _send_dict_response')
self.logger.warning(e, exc_info=False)
def _update_state(self, request):
"""Update the connection state based on request."""
if all(key in request for key in ["neLat", "neLng", "swLat", "swLng", "minutes"]):
self.connection_state.set_latest_map_bounds(
request["neLat"], request["neLng"], request["swLat"], request["swLng"]
)
if "onlyLatestPacket" in request:
self.connection_state.set_only_latest_packet_requested(request["onlyLatestPacket"] == 1)
if "minutes" in request:
time = request.get("time")
self.connection_state.set_latest_minutes(request["minutes"], time)
if "noRealTime" in request:
self.connection_state.disable_real_time()