trackdirect2/server/trackdirect/parser/policies/PacketMapIdPolicy.py

229 lines
8.8 KiB
Python

from server.trackdirect.parser.policies.PacketOrderPolicy import PacketOrderPolicy
from server.trackdirect.parser.policies.PacketMaxSpeedPolicy import PacketMaxSpeedPolicy
class PacketMapIdPolicy:
"""PacketMapIdPolicy tries to find the best mapId for the current packet."""
def __init__(self, packet, prev_packet):
"""The __init__ method.
Args:
packet (Packet): Packet that we want to analyze
prev_packet (Packet): Previous related packet for the same station
"""
self.packet = packet
self.prev_packet = prev_packet
self.has_kill_character = False
self.marker_counter_confirm_limit = 3
# Results
self.mapId = None
self.marker_id = None
self.is_replacing_prev_packet = False
self.is_confirming_prev_packet = False
self.is_killing_prev_packet = False
def enable_having_kill_character(self):
"""Treat this packet as having a kill character."""
self.has_kill_character = True
def get_map_id(self):
"""Returns the map id that corresponds to the found marker id.
Returns:
int
"""
self._find_map_id()
return self.mapId
def get_marker_id(self):
"""Returns the found marker id.
Returns:
int
"""
self._find_map_id()
return self.marker_id
def is_replacing_previous_packet(self):
"""Returns true if packet replaces previous packet.
Returns:
boolean
"""
self._find_map_id()
return self.is_replacing_prev_packet
def is_confirming_previous_packet(self):
"""Returns true if packet confirms previous packet.
Returns:
boolean
"""
self._find_map_id()
return self.is_confirming_prev_packet
def is_killing_previous_packet(self):
"""Returns true if packet kills previous packet.
Returns:
boolean
"""
self._find_map_id()
return self.is_killing_prev_packet
def _find_map_id(self):
"""Find a suitable marker id for the current packet and set corresponding attribute (and related attributes)."""
if self.mapId is not None:
return
packetOrderPolicy = PacketOrderPolicy()
if self.packet.source_id == 2 and self.packet.station_id_path:
self.mapId = 16
self.marker_id = 1
elif not self._is_packet_on_map():
self.mapId = 10
self.marker_id = 1
elif packetOrderPolicy.is_packet_in_wrong_order(self.packet, self.prev_packet):
self.mapId = 6
self.marker_id = 1
elif self._is_faulty_gps_position():
self._find_map_id_for_faulty_gps_position()
elif self.packet.is_moving == 1:
self._find_map_id_for_moving_station()
else:
self._find_map_id_for_stationary_station()
def _is_packet_on_map(self):
"""Returns true if current packet will be on map.
Returns:
boolean
"""
return (
self.packet.latitude is not None
and self.packet.longitude is not None
and isinstance(self.packet.latitude, float)
and isinstance(self.packet.longitude, float)
and self.packet.source_id != 3
and self.packet.map_id in [1, 5, 7, 9]
)
def _find_map_id_for_stationary_station(self):
"""Find a suitable marker id for the current packet (assumes it is a stationary station) and set corresponding attribute (and related attributes)."""
if (
self.prev_packet.is_existing_object()
and self.packet.is_position_equal(self.prev_packet)
and self.packet.is_symbol_equal(self.prev_packet)
and self.packet.is_moving == self.prev_packet.is_moving
and self.prev_packet.map_id in [1, 7]
):
self.mapId = 1
self.marker_id = self.prev_packet.marker_id
if (self.has_kill_character):
self.mapId = 14
self.is_replacing_prev_packet = True
elif self.has_kill_character:
self.marker_id = 1
self.mapId = 4
else:
self.mapId = 1
self.marker_id = None
def _find_map_id_for_moving_station(self):
"""Find a suitable marker id for the current packet (assumes it is a moving station) and set corresponding attribute (and related attributes)."""
if self.has_kill_character:
self.mapId = 4
self.marker_id = 1
elif (
self.prev_packet.is_existing_object()
and self.prev_packet.is_moving == 1
and self.prev_packet.map_id in [1, 7]
):
calculated_distance = self.packet.get_distance(self.prev_packet.latitude, self.prev_packet.longitude)
calculated_speed = self.packet.get_calculated_speed(self.prev_packet)
packet_max_speed_policy = PacketMaxSpeedPolicy()
max_speed = packet_max_speed_policy.get_max_likely_speed(self.packet, self.prev_packet)
absolute_max_distance = 50000
absolute_max_speed = 2000
if self.packet.is_position_equal(self.prev_packet):
self._find_map_id_for_moving_station_with_same_position()
elif calculated_distance < 5000 and calculated_speed <= absolute_max_speed:
self._find_map_id_for_moving_station_with_likely_speed()
elif calculated_speed <= max_speed and calculated_speed <= absolute_max_speed and calculated_distance <= absolute_max_distance:
self._find_map_id_for_moving_station_with_likely_speed()
elif calculated_speed <= max_speed and calculated_distance > absolute_max_distance:
self._find_map_id_for_moving_station_with_to_long_distance()
else:
self.mapId = 7
self.marker_id = None
else:
if (
self.prev_packet.is_existing_object()
and self.prev_packet.is_moving == 0
and self.packet.is_symbol_equal(self.prev_packet)
):
self.is_killing_prev_packet = True
self.mapId = 1
self.marker_id = None
def _find_map_id_for_moving_station_with_to_long_distance(self):
"""Sets a suitable marker id for the current packet based on prevPacket and assumes that distance is too long between them."""
if self.prev_packet.marker_counter == 1:
self.is_killing_prev_packet = True
self.mapId = 1
self.marker_id = None
def _find_map_id_for_moving_station_with_likely_speed(self):
"""Sets a suitable marker id for the current packet based on prevPacket and assumes that speed is likely."""
self.marker_id = self.prev_packet.marker_id
if self.prev_packet.map_id == 1 or (self.prev_packet.marker_counter + 1) >= self.marker_counter_confirm_limit:
self.mapId = 1
else:
self.mapId = self.prev_packet.map_id
if self.mapId == 1 and self.prev_packet.map_id == 7:
self.is_confirming_prev_packet = True
def _find_map_id_for_moving_station_with_same_position(self):
"""Sets a suitable marker id for the current packet based on prevPacket and assumes that position is equal."""
self.marker_id = self.prev_packet.marker_id
self.is_replacing_prev_packet = True
if self.prev_packet.map_id == 1 or (self.prev_packet.marker_counter + 1) >= self.marker_counter_confirm_limit:
self.mapId = 1
else:
self.mapId = self.prev_packet.map_id
def _find_map_id_for_faulty_gps_position(self):
"""Sets a suitable marker id for the current packet based on prevPacket and assumes that gps position is faulty (mapId 5)."""
if self.has_kill_character:
self.mapId = 4
self.marker_id = 1
elif (
self.prev_packet.map_id == self.mapId
and self.packet.is_position_equal(self.prev_packet)
and self.packet.is_symbol_equal(self.prev_packet)
):
self.is_replacing_prev_packet = True
self.mapId = 5
self.marker_id = self.prev_packet.marker_id
else:
self.mapId = 5
self.marker_id = None
def _is_faulty_gps_position(self):
"""Parse the packet and modify the mapId attribute if position is faulty."""
packetlat_cmp = int(round(self.packet.latitude * 100000))
packetlng_cmp = int(round(self.packet.longitude * 100000))
faulty_positions = [
(0, 0),
(1 * 100000, 1 * 100000),
(36 * 100000, 136 * 100000),
(-48 * 100000, 0)
]
return (packetlat_cmp, packetlng_cmp) in faulty_positions