openwebrxplus/owrx/map.py

186 lines
6.1 KiB
Python

from datetime import datetime, timedelta, timezone
from owrx.config import Config
from owrx.bands import Band
from abc import abstractmethod, ABC, ABCMeta
import threading
import time
import sys
import logging
logger = logging.getLogger(__name__)
class Location(object):
def getTTL(self) -> timedelta:
pm = Config.get()
return timedelta(seconds=pm["map_position_retention_time"])
def __dict__(self):
return {
"ttl": self.getTTL().total_seconds() * 1000
}
class Map(object):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with Map.creationLock:
if Map.sharedInstance is None:
Map.sharedInstance = Map()
return Map.sharedInstance
def __init__(self):
self.clients = []
self.positions = {}
self.positionsLock = threading.Lock()
def removeLoop():
loops = 0
while True:
try:
self.removeOldPositions()
except Exception:
logger.exception("error while removing old map positions")
loops += 1
# rebuild the positions dictionary every once in a while, it consumes lots of memory otherwise
if loops == 60:
try:
self.rebuildPositions()
except Exception:
logger.exception("error while rebuilding positions")
loops = 0
time.sleep(60)
threading.Thread(target=removeLoop, daemon=True, name="map_removeloop").start()
super().__init__()
def broadcast(self, update):
for c in self.clients:
c.write_update(update)
def addClient(self, client):
self.clients.append(client)
with self.positionsLock:
positions = [
{
"callsign": callsign,
"location": record["location"].__dict__(),
"lastseen": record["updated"].timestamp() * 1000,
"mode": record["mode"],
"band": record["band"].getName() if record["band"] is not None else None,
"hops": record["hops"],
}
for (callsign, record) in self.positions.items()
]
client.write_update(positions)
def removeClient(self, client):
try:
self.clients.remove(client)
except ValueError:
pass
def updateLocation(self, key, loc: Location, mode: str, band: Band = None, hops: list[str] = [], timestamp: datetime = None):
if timestamp is None:
timestamp = datetime.now(timezone.utc)
else:
# if we get an external timestamp, make sure it's not already expired
if datetime.now(timezone.utc) - loc.getTTL() > timestamp:
return
pm = Config.get()
ignoreIndirect = pm["map_ignore_indirect_reports"]
preferRecent = pm["map_prefer_recent_reports"]
needBroadcast = False
with self.positionsLock:
# ignore indirect reports if ignoreIndirect set
if not ignoreIndirect or len(hops)==0:
# prefer messages with shorter hop count unless preferRecent set
if preferRecent or key not in self.positions or len(hops) <= len(self.positions[key]["hops"]):
if isinstance(loc, IncrementalUpdate) and key in self.positions:
loc.update(self.positions[key]["location"])
self.positions[key] = {"location": loc, "updated": timestamp, "mode": mode, "band": band, "hops": hops }
needBroadcast = True
if needBroadcast:
self.broadcast(
[
{
"callsign": key,
"location": loc.__dict__(),
"lastseen": timestamp.timestamp() * 1000,
"mode": mode,
"band": band.getName() if band is not None else None,
"hops": hops,
}
]
)
def touchLocation(self, key):
# not implemented on the client side yet, so do not use!
ts = datetime.now(timezone.utc)
with self.positionsLock:
if key in self.positions:
self.positions[key]["updated"] = ts
self.broadcast([{"callsign": key, "lastseen": ts.timestamp() * 1000}])
def removeLocation(self, key):
with self.positionsLock:
if key in self.positions:
del self.positions[key]
# TODO broadcast removal to clients
def removeOldPositions(self):
now = datetime.now(timezone.utc)
with self.positionsLock:
to_be_removed = [
key for (key, pos) in self.positions.items() if now - pos["location"].getTTL() > pos["updated"]
]
for key in to_be_removed:
self.removeLocation(key)
def rebuildPositions(self):
logger.debug("rebuilding map storage; size before: %i", sys.getsizeof(self.positions))
with self.positionsLock:
p = {key: value for key, value in self.positions.items()}
self.positions = p
logger.debug("rebuild complete; size after: %i", sys.getsizeof(self.positions))
class LatLngLocation(Location):
def __init__(self, lat: float, lon: float):
self.lat = lat
self.lon = lon
def __dict__(self):
res = super().__dict__()
res.update(
{"type": "latlon", "lat": self.lat, "lon": self.lon}
)
return res
class LocatorLocation(Location):
def __init__(self, locator: str):
self.locator = locator
def __dict__(self):
res = super().__dict__()
res.update(
{"type": "locator", "locator": self.locator}
)
return res
class IncrementalUpdate(Location, metaclass=ABCMeta):
@abstractmethod
def update(self, previousLocation: Location):
pass