trackdirect2/server/trackdirect/websocket/responses/HistoryResponseCreator.py

191 lines
8.7 KiB
Python

import logging
import psycopg2, psycopg2.extras
from server.trackdirect.repositories.PacketRepository import PacketRepository
from server.trackdirect.websocket.queries.StationIdByMapSectorQuery import StationIdByMapSectorQuery
from server.trackdirect.websocket.responses.ResponseDataConverter import ResponseDataConverter
class HistoryResponseCreator:
"""The HistoryResponseCreator class creates websocket history responses for the latest websocket request."""
def __init__(self, state, db):
"""The __init__ method.
Args:
state (WebsocketConnectionState): WebsocketConnectionState instance that contains current state
db (psycopg2.Connection): Database connection (with autocommit)
"""
self.state = state
self.logger = logging.getLogger('trackdirect')
self.db = db
self.packet_repository = PacketRepository(db)
self.response_data_converter = ResponseDataConverter(state, db)
def get_responses(self, request, request_id):
"""Create all history responses for the current request.
Args:
request (dict): The request to process
request_id (int): Request id of processed request
Returns:
generator
"""
payload_type = request.get("payload_request_type")
if payload_type in {1, 11}:
if not self.state.is_valid_latest_position():
return
if (self.state.latest_ne_lat >= 90 and self.state.latest_ne_lng >= 180 and
self.state.latest_sw_lat <= -90 and self.state.latest_sw_lng <= -180):
# Request is requesting too much
return
yield from self._get_map_sector_history_responses(request_id)
elif payload_type == 7 and "station_id" in request:
yield from self._get_station_history_responses([request["station_id"]], None, True)
else:
self.logger.error('Request is not supported: %s', request)
def _get_map_sector_history_responses(self, request_id):
"""Creates all needed history responses for the currently visible map sectors.
Args:
request_id (int): Request id of processed request
Returns:
generator
"""
map_sector_array = self.state.get_visible_map_sectors()
if len(map_sector_array) > 20000:
self.logger.error("Too many map sectors requested!")
return
handled_station_ids = set()
for map_sector in map_sector_array:
try:
if request_id is not None and self.state.latest_requestId > request_id:
return
found_station_ids = self._get_station_ids_by_map_sector(map_sector)
station_ids = [station_id for station_id in found_station_ids if station_id not in handled_station_ids]
handled_station_ids.update(station_ids)
if station_ids:
yield from self._get_station_history_responses(station_ids, map_sector, False)
except psycopg2.InterfaceError as e:
raise e
except Exception as e:
self.logger.error('Error processing map sector %s: %s', map_sector, e, exc_info=True)
def _get_station_history_responses(self, station_ids, map_sector, include_complete_history=False):
"""Creates one history response per station.
Args:
station_ids (array): An array of the stations that we want history data for
map_sector (int): The map sector that we want history data for
include_complete_history (boolean): Include all previous packets (even if we currently only request the latest packets)
Returns:
generator
"""
min_timestamp = self.state.get_map_sector_timestamp(map_sector)
for station_id in station_ids:
try:
if self.state.latest_time_travel_request is not None:
response = self._get_past_history_response(station_id, map_sector, min_timestamp, include_complete_history)
else:
response = self._get_recent_history_response(station_id, map_sector, min_timestamp, include_complete_history)
if response is not None:
yield response
except psycopg2.InterfaceError as e:
raise e
except Exception as e:
self.logger.error('Error processing station %s: %s', station_id, e, exc_info=True)
def _get_recent_history_response(self, station_id, map_sector, min_timestamp, include_complete_history=False):
"""Creates a history response for the specified station, includes all packets from minTimestamp until now.
Args:
station_id (int): The station id that we want history data for
map_sector (int): The map sector that we want history data for
min_timestamp (int): The map sector min timestamp to use in query
include_complete_history (boolean): Include all previous packets (even if we currently only request the latest packets)
Returns:
Dict
"""
packets = []
only_latest_packet_fetched = False
current_station_ids = [station_id]
current_min_timestamp = self.state.get_station_latest_timestamp_on_map(station_id) or min_timestamp
if not self.state.only_latest_packet_requested or include_complete_history:
packets = self.packet_repository.get_object_list_by_station_id_list(current_station_ids, current_min_timestamp)
else:
packets = self.packet_repository.get_latest_confirmed_object_list_by_station_id_list(current_station_ids, current_min_timestamp)
if packets:
packets = [packets[-1]]
only_latest_packet_fetched = True
if packets:
flags = ["latest"] if only_latest_packet_fetched else []
data = self.response_data_converter.get_response_data(packets, [map_sector], flags)
return {'payload_response_type': 2, 'data': data}
def _get_past_history_response(self, station_id, map_sector, min_timestamp, include_complete_history=False):
"""Creates a history response for the specified station, includes all packets between minTimestamp and the current latestTimeTravelRequest timestamp.
Args:
station_id (int): The station id that we want history data for
map_sector (int): The map sector that we want history data for
min_timestamp (int): The map sector min timestamp to use in query
include_complete_history (boolean): Include all previous packets (even if we currently only request the latest packets)
Returns:
Dict
"""
packets = []
only_latest_packet_fetched = False
current_station_ids = [station_id]
current_min_timestamp = self.state.get_station_latest_timestamp_on_map(station_id) or min_timestamp
if self.state.only_latest_packet_requested and not include_complete_history:
if station_id not in self.state.stations_on_map_dict:
only_latest_packet_fetched = True
packets = self.packet_repository.get_latest_object_list_by_station_id_list_and_time_interval(
current_station_ids, current_min_timestamp, self.state.latest_time_travel_request)
else:
if not self.state.is_station_history_on_map(station_id):
packets = self.packet_repository.get_object_list_by_station_id_list_and_time_interval(
current_station_ids, current_min_timestamp, self.state.latest_time_travel_request)
if packets:
flags = ["latest"] if only_latest_packet_fetched else []
data = self.response_data_converter.get_response_data(packets, [map_sector], flags)
return {'payload_response_type': 2, 'data': data}
def _get_station_ids_by_map_sector(self, map_sector):
"""Returns the station id's in specified map sector.
Args:
map_sector (int): The map sector that we are interested in
Returns:
array of ints
"""
query = StationIdByMapSectorQuery(self.db)
if self.state.latest_time_travel_request is not None:
if self.state.is_map_sector_known(map_sector):
return []
start_timestamp = self.state.latest_time_travel_request - (int(self.state.latest_minutes_request) * 60)
end_timestamp = self.state.latest_time_travel_request
return query.get_station_id_list_by_map_sector(map_sector, start_timestamp, end_timestamp)
else:
timestamp = self.state.get_map_sector_timestamp(map_sector)
return query.get_station_id_list_by_map_sector(map_sector, timestamp, None)