trackdirect2/server/trackdirect/objects/Packet.py

309 lines
12 KiB
Python

import logging
import json
from math import sin, cos, sqrt, atan2, radians
from server.trackdirect.common.Model import Model
from server.trackdirect.repositories.StationRepository import StationRepository
from server.trackdirect.repositories.SenderRepository import SenderRepository
from server.trackdirect.exceptions.TrackDirectMissingSenderError import TrackDirectMissingSenderError
from server.trackdirect.exceptions.TrackDirectMissingStationError import TrackDirectMissingStationError
class Packet(Model):
"""Packet represents an APRS packet, AIS packet or any other supported packet
Note:
Packet corresponds to a row in the packetYYYYMMDD table
"""
def __init__(self, db):
"""The __init__ method.
Args:
db (psycopg2.Connection): Database connection
"""
super().__init__(db)
self.logger = logging.getLogger('trackdirect')
self.id = None
self.station_id = None
self.sender_id = None
self.packet_type_id = None
self.timestamp = None
self.reported_timestamp = None
self.position_timestamp = None # Inherited from prev packet if position was equal
self.latitude = None
self.longitude = None
self.symbol = None
self.symbol_table = None
self.marker_id = None
self.marker_counter = None
self.marker_prev_packet_timestamp = None
self.map_id = None
self.source_id = None
self.map_sector = None
self.related_map_sectors = []
self.speed = None
self.course = None
self.altitude = None
self.rng = None
self.phg = None
self.latest_rng_timestamp = None
self.latest_phg_timestamp = None
self.comment = None
self.raw_path = None
self.raw = None
# packet tail timestamp indicates how long time ago we had a tail
self.packet_tail_timestamp = None
# If packet reports a new position for a moving symbol is_moving will be 1 otherwise 0
# Sometimes is_moving will be 0 for a moving symbol, but as fast we realize it is moving related packets will have is_moving set to 1
self.is_moving = 1
self.posambiguity = None
# Following attributes will not always be loaded from database (comes from related tables)
self.station_id_path = []
self.station_name_path = []
self.station_location_path = []
# Will only be used when packet is not inserted to database yet
self.replace_packet_id = None
self.replace_packet_timestamp = None
self.abnormal_packet_id = None
self.abnormal_packet_timestamp = None
self.confirm_packet_id = None
self.confirm_packet_timestamp = None
# Will only be used when packet is not inserted to database yet
self.ogn = None
self.weather = None
self.telemetry = None
self.station_telemetry_bits = None
self.station_telemetry_eqns = None
self.station_telemetry_param = None
self.station_telemetry_unit = None
self.senderName = None
self.stationName = None
def validate(self) -> bool:
"""Returns true on success (when object content is valid), otherwise false
Returns:
True on success otherwise False
"""
return True
def insert(self) -> bool:
"""Method to call when we want to save a new object to database
Since packet will be inserted in batch we never use this method.
Returns:
True on success otherwise False
"""
return False
def update(self) -> bool:
"""Method to call when we want to save changes to database
Since packet will be updated in batch we never use this method.
Returns:
True on success otherwise False
"""
return False
def get_distance(self, p2_lat: float, p2_lng: float) -> float | None:
"""Get distance in meters between current position and specified position
Args:
p2_lat (float): Position 2 latitude
p2_lng (float): Position 2 longitude
Returns:
Distance in meters between the two specified positions (as float)
"""
if self.latitude is not None and self.longitude is not None:
p1_lat = self.latitude
p1_lng = self.longitude
R = 6378137 # Earth's mean radius in meters
d_lat = radians(p2_lat - p1_lat)
d_long = radians(p2_lng - p1_lng)
a = sin(d_lat / 2) ** 2 + cos(radians(p1_lat)) * cos(radians(p2_lat)) * sin(d_long / 2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
d = R * c
return d # returns the distance in meters
return None
def get_calculated_speed(self, prev_packet: 'Packet') -> float | None:
"""Get speed compared to previous packet position and timestamp
Args:
prev_packet (Packet): Previous related packet for the same station
Returns:
Speed in km/h compared to previous packet position and timestamp (as float)
"""
if self.latitude is not None and self.longitude is not None:
distance = self.get_distance(prev_packet.latitude, prev_packet.longitude)
time = abs(prev_packet.timestamp - self.timestamp)
if (self.reported_timestamp is not None
and prev_packet.reported_timestamp is not None
and self.reported_timestamp != 0
and prev_packet.reported_timestamp != 0
and (self.reported_timestamp % 60 != 0 or prev_packet.reported_timestamp % 60 != 0)
and prev_packet.reported_timestamp != self.reported_timestamp):
time = abs(prev_packet.reported_timestamp - self.reported_timestamp)
if time == 0:
return 0
return distance / time # meters per second
return None
def is_symbol_equal(self, compare_packet: 'Packet') -> bool:
"""Returns true if current symbol is equal to symbol in specified packet
Args:
compare_packet (Packet): Packet to compare current symbol with
Returns:
True if current symbol is equal to symbol in specified packet
"""
return (self.symbol is not None
and self.symbol_table is not None
and compare_packet.symbol is not None
and compare_packet.symbol_table is not None
and self.symbol == compare_packet.symbol
and self.symbol_table == compare_packet.symbol_table)
def is_position_equal(self, compare_packet: 'Packet') -> bool:
"""Returns true if current position is equal to position in specified packet
Args:
compare_packet (Packet): Packet to compare current position with
Returns:
True if current position is equal to position in specified packet
"""
return (compare_packet.latitude is not None
and compare_packet.longitude is not None
and self.longitude is not None
and self.latitude is not None
and round(self.latitude, 5) == round(compare_packet.latitude, 5)
and round(self.longitude, 5) == round(compare_packet.longitude, 5))
def get_transmit_distance(self) -> float | None:
"""Calculate the transmit distance
Notes:
require that stationLocationPath is set
Returns:
Distance in meters for this transmission
"""
if not self.station_location_path:
return None
location = self.station_location_path[0]
if location[0] is None or location[1] is None:
return None
if self.latitude is not None and self.longitude is not None:
# Current packet contains position, use that
return self.get_distance(location[0], location[1])
else:
# Current packet is missing position, use latest station position
stationRepository = StationRepository(self.db)
station = stationRepository.get_object_by_id(self.station_id)
if not station.is_existing_object():
return None
if station.latest_confirmed_latitude is not None and station.latest_confirmed_longitude is not None:
curStationLatestLocationPacket = Packet(self.db)
curStationLatestLocationPacket.latitude = station.latest_confirmed_latitude
curStationLatestLocationPacket.longitude = station.latest_confirmed_longitude
return curStationLatestLocationPacket.get_distance(location[0], location[1])
return None
def get_dict(self, include_station_name: bool = False) -> dict:
"""Returns a dict representation of the object
Args:
include_station_name (Boolean): Include station name and sender name in dict
Returns:
Dict representation of the object
"""
data = {
'id': self.id,
'station_id': int(self.station_id) if self.station_id is not None else None,
'sender_id': int(self.sender_id) if self.sender_id is not None else None,
'packet_type_id': self.packet_type_id,
'timestamp': self.timestamp,
'reported_timestamp': self.reported_timestamp,
'position_timestamp': self.position_timestamp,
'latitude': float(self.latitude) if self.latitude is not None else None,
'longitude': float(self.longitude) if self.longitude is not None else None,
'symbol': self.symbol,
'symbol_table': self.symbol_table,
'marker_id': self.marker_id,
'marker_counter': self.marker_counter,
'map_id': self.map_id,
'source_id': self.source_id,
'map_sector': self.map_sector,
'related_map_sectors': self.related_map_sectors,
'speed': self.speed,
'course': self.course,
'altitude': self.altitude,
'rng': self.rng,
'phg': self.phg,
'latest_phg_timestamp': self.latest_phg_timestamp,
'latest_rng_timestamp': self.latest_rng_timestamp,
'comment': self.comment,
'raw_path': self.raw_path,
'raw': self.raw,
'packet_tail_timestamp': self.packet_tail_timestamp,
'is_moving': self.is_moving,
'posambiguity': self.posambiguity,
'db': 1,
'station_id_path': self.station_id_path,
'station_name_path': self.station_name_path,
'station_location_path': self.station_location_path,
'telemetry': self.telemetry.get_dict() if self.telemetry is not None else None,
'weather': self.weather.get_dict() if self.weather is not None else None,
'ogn': self.ogn.get_dict() if self.ogn is not None else None,
}
if include_station_name:
try:
stationRepository = StationRepository(self.db)
station = stationRepository.get_cached_object_by_id(data['station_id'])
data['station_name'] = station.name
except TrackDirectMissingStationError:
data['station_name'] = ''
try:
senderRepository = SenderRepository(self.db)
sender = senderRepository.get_cached_object_by_id(data['sender_id'])
data['sender_name'] = sender.name
except TrackDirectMissingSenderError:
data['sender_name'] = ''
return data
def get_json(self) -> str | None:
"""Returns a json representation of the object
Returns:
Json representation of the object (returns None on failure)
"""
data = self.get_dict()
try:
return json.dumps(data, ensure_ascii=False).encode('utf8')
except ValueError as e:
self.logger.error(e, exc_info=1)
return None