Загрузил файл

This commit is contained in:
Batareiken 2024-02-28 23:31:17 +00:00
parent 1d62452523
commit 7a07ef7b75
5 changed files with 634 additions and 0 deletions

62
server/bin/collector.py Normal file
View File

@ -0,0 +1,62 @@
import sys
import os.path
import logging
import logging.handlers
import trackdirect
if __name__ == '__main__':
if (len(sys.argv) < 2):
print("\n" + sys.argv[0] + ' [config.ini] [collector number]')
sys.exit()
elif (sys.argv[1].startswith("/")):
if (not os.path.isfile(sys.argv[1])):
print("\n" + sys.argv[0] + ' [config.ini] [collector number]')
sys.exit()
elif (not os.path.isfile(os.path.expanduser('~/trackdirect/config/' + sys.argv[1]))):
print("\n" + sys.argv[0] + ' [config.ini] [collector number]')
sys.exit()
config = trackdirect.TrackDirectConfig()
config.populate(sys.argv[1])
if (len(sys.argv) < 3):
collectorNumber = 0
else:
collectorNumber = int(sys.argv[2])
collectorOptions = config.collector[collectorNumber]
saveOgnStationsWithMissingIdentity = False
if (config.saveOgnStationsWithMissingIdentity):
saveOgnStationsWithMissingIdentity = True
fh = logging.handlers.RotatingFileHandler(filename=os.path.expanduser(
collectorOptions['error_log']), mode='a', maxBytes=1000000, backupCount=10)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
trackDirectLogger = logging.getLogger('trackdirect')
trackDirectLogger.addHandler(fh)
trackDirectLogger.addHandler(consoleHandler)
trackDirectLogger.setLevel(logging.INFO)
aprslibLogger = logging.getLogger('aprslib.IS')
aprslibLogger.addHandler(fh)
aprslibLogger.addHandler(consoleHandler)
aprslibLogger.setLevel(logging.INFO)
trackDirectLogger.warning("Starting (Collecting from " + collectorOptions['host'] + ":" + str(
collectorOptions['port_full']) + " using " + collectorOptions['callsign'] + " and " + str(collectorOptions['passcode']) + ")")
try:
trackDirectDataCollector = trackdirect.TrackDirectDataCollector(
collectorOptions,
saveOgnStationsWithMissingIdentity)
trackDirectDataCollector.run()
except Exception as e:
trackDirectLogger.error(e, exc_info=1)

View File

@ -0,0 +1,45 @@
import sys
import os.path
import logging
import logging.handlers
import trackdirect
if __name__ == '__main__':
if (len(sys.argv) < 3):
print "\n" + sys.argv[0] + ' [config.ini] [/output/directory]'
sys.exit()
elif (sys.argv[1].startswith("/")):
if (not os.path.isfile(sys.argv[1])):
print "\n" + sys.argv[0] + ' [config.ini] [/output/directory]'
sys.exit()
elif (not os.path.isfile(os.path.expanduser('~/trackdirect/config/' + sys.argv[1]))):
print "\n" + sys.argv[0] + ' [config.ini] [/output/directory]'
sys.exit()
config = trackdirect.TrackDirectConfig()
config.populate(sys.argv[1])
fh = logging.handlers.RotatingFileHandler(filename=os.path.expanduser(
'~/trackdirect/server/log/heatmap.log'), mode='a', maxBytes=1000000, backupCount=10)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
trackDirectLogger = logging.getLogger('trackdirect')
trackDirectLogger.addHandler(fh)
trackDirectLogger.addHandler(consoleHandler)
trackDirectLogger.setLevel(logging.INFO)
trackDirectLogger.info("Starting (output directory: " + sys.argv[2] + ")")
try:
trackDirectHeatMapCreator = trackdirect.TrackDirectHeatMapCreator(
sys.argv[2])
trackDirectHeatMapCreator.run()
except Exception as e:
trackDirectLogger.error(e, exc_info=1)

249
server/bin/remover.py Normal file
View File

@ -0,0 +1,249 @@
import trackdirect
import sys
import os.path
import logging
import logging.handlers
import psycopg2
import psycopg2.extras
import datetime
import time
from trackdirect.database.DatabaseConnection import DatabaseConnection
from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder
from trackdirect.TrackDirectConfig import TrackDirectConfig
from trackdirect.repositories.PacketRepository import PacketRepository
if __name__ == '__main__':
if (len(sys.argv) < 2):
print "\n" + sys.argv[0] + ' [config.ini]'
sys.exit()
elif (sys.argv[1].startswith("/")):
if (not os.path.isfile(sys.argv[1])):
print "\n" + sys.argv[0] + ' [config.ini]'
sys.exit()
elif (not os.path.isfile(os.path.expanduser('~/trackdirect/config/' + sys.argv[1]))):
print "\n" + sys.argv[0] + ' [config.ini]'
sys.exit()
config = TrackDirectConfig()
config.populate(sys.argv[1])
maxDaysToSavePositionData = int(config.daysToSavePositionData)
maxDaysToSaveStationData = int(config.daysToSaveStationData)
maxDaysToSaveWeatherData = int(config.daysToSaveWeatherData)
maxDaysToSaveTelemetryData = int(config.daysToSaveTelemetryData)
try:
fh = logging.handlers.RotatingFileHandler(filename=os.path.expanduser(
'~/trackdirect/server/log/remover_' + config.dbName + '.log'), mode='a', maxBytes=1000000, backupCount=10)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
trackDirectLogger = logging.getLogger('trackdirect')
trackDirectLogger.addHandler(fh)
trackDirectLogger.addHandler(consoleHandler)
trackDirectLogger.setLevel(logging.INFO)
trackDirectLogger.info("Starting")
trackDirectLogger.info(
"Saving position data for %s days" % (maxDaysToSavePositionData))
trackDirectLogger.info(
"Saving station data for %s days" % (maxDaysToSaveStationData))
trackDirectLogger.info(
"Saving weather data for %s days" % (maxDaysToSaveWeatherData))
trackDirectLogger.info(
"Saving telemetry data for %s days" % (maxDaysToSaveTelemetryData))
trackDirectDb = DatabaseConnection()
dbNoAutoCommit = trackDirectDb.getConnection(False)
db = trackDirectDb.getConnection(True)
db.set_isolation_level(0)
cursor = db.cursor()
cursor.execute("SET statement_timeout = '240s'")
trackDirectDbObjectFinder = DatabaseObjectFinder(db)
packetRepository = PacketRepository(db)
#
# Loop over the latest days and delete packets that is not needed any more
#
for x in range(2, 16):
prevDay = datetime.date.today() - datetime.timedelta(x) # today minus x days
prevDayTimestamp = prevDay.strftime("%s")
prevDayFormat = datetime.datetime.utcfromtimestamp(
int(prevDayTimestamp)).strftime('%Y%m%d')
packetTable = "packet" + prevDayFormat
if (trackDirectDbObjectFinder.checkTableExists(packetTable)):
deletedRows = None
doFullVacuum = False
while (deletedRows is None or deletedRows >= 5000):
sql = """delete from """ + packetTable + \
""" where id in (select id from """ + packetTable + \
""" where map_id not in (1,12) limit 5000)"""
cursor.execute(sql)
deletedRows = cursor.rowcount
trackDirectLogger.info(
"Deleted %s %s" % (deletedRows, packetTable))
if (deletedRows > 0):
doFullVacuum = True
time.sleep(0.5)
if (doFullVacuum):
cursor.execute("""VACUUM FULL """ +
packetTable + """_path""")
cursor.execute("""REINDEX TABLE """ +
packetTable + """_path""")
cursor.execute("""VACUUM FULL """ + packetTable)
cursor.execute("""REINDEX TABLE """ + packetTable)
#
# Drop packet_weather
#
for x in range(maxDaysToSaveWeatherData, maxDaysToSaveWeatherData+100):
prevDay = datetime.date.today() - datetime.timedelta(x) # today minus x days
prevDayTimestamp = prevDay.strftime("%s")
prevDayFormat = datetime.datetime.utcfromtimestamp(
int(prevDayTimestamp)).strftime('%Y%m%d')
packetTable = "packet" + prevDayFormat
if (trackDirectDbObjectFinder.checkTableExists(packetTable + "_weather")):
cursor.execute("""drop table """ +
packetTable + """_weather""")
trackDirectLogger.info(
"Dropped table %s_weather" % (packetTable))
#
# Drop packet_telemetry
#
for x in range(maxDaysToSaveTelemetryData, maxDaysToSaveTelemetryData+100):
prevDay = datetime.date.today() - datetime.timedelta(x) # today minus x days
prevDayTimestamp = prevDay.strftime("%s")
prevDayFormat = datetime.datetime.utcfromtimestamp(
int(prevDayTimestamp)).strftime('%Y%m%d')
packetTable = "packet" + prevDayFormat
if (trackDirectDbObjectFinder.checkTableExists(packetTable + "_telemetry")):
cursor.execute("""drop table """ +
packetTable + """_telemetry""")
trackDirectLogger.info(
"Dropped table %s_telemetry" % (packetTable))
#
# Drop packets
#
for x in range(maxDaysToSavePositionData, maxDaysToSavePositionData+100):
prevDay = datetime.date.today() - datetime.timedelta(x) # today minus x days
prevDayTimestamp = prevDay.strftime("%s")
prevDayFormat = datetime.datetime.utcfromtimestamp(
int(prevDayTimestamp)).strftime('%Y%m%d')
packetTable = "packet" + prevDayFormat
# Drop packet_ogn table
if (trackDirectDbObjectFinder.checkTableExists(packetTable + "_ogn")):
cursor.execute("""drop table """ + packetTable + """_ogn""")
trackDirectLogger.info("Dropped table %s_ogn" % (packetTable))
#
# Drop packet_path table
#
if (trackDirectDbObjectFinder.checkTableExists(packetTable + "_path")):
cursor.execute("""drop table """ + packetTable + """_path""")
trackDirectLogger.info("Dropped table %s_path" % (packetTable))
#
# Drop packet table
#
if (trackDirectDbObjectFinder.checkTableExists(packetTable)):
cursor.execute("""drop table """ + packetTable)
trackDirectLogger.info("Dropped table %s" % (packetTable))
#
# Delete old stations
#
timestampLimit = int(time.time()) - (60*60*24*maxDaysToSaveStationData)
deletedRows = 0
sql = """select station.id, station.latest_sender_id, station.name
from station
where latest_packet_timestamp < %s
and (
exists (
select 1
from sender
where sender.id = station.latest_sender_id
and sender.name != station.name
)
or
not exists (
select 1
from station station2, sender
where sender.id = station.latest_sender_id
and station2.latest_sender_id = sender.id
and station2.name != sender.name
)
)
order by latest_packet_timestamp"""
selectStationCursor = db.cursor()
selectStationCursor.execute(sql, (timestampLimit,))
for record in selectStationCursor:
trackDirectLogger.info("Trying to delete station %s (%s)" % (
record["name"], record["id"]))
try:
deleteCursor = dbNoAutoCommit.cursor()
sql = """delete from station_telemetry_bits where station_id = %s"""
deleteCursor.execute(sql, (record["id"],))
sql = """delete from station_telemetry_eqns where station_id = %s"""
deleteCursor.execute(sql, (record["id"],))
sql = """delete from station_telemetry_param where station_id = %s"""
deleteCursor.execute(sql, (record["id"],))
sql = """delete from station_telemetry_unit where station_id = %s"""
deleteCursor.execute(sql, (record["id"],))
sql = """delete from station_city where station_id = %s"""
deleteCursor.execute(sql, (record["id"],))
sql = """delete from station where id = %s"""
deleteCursor.execute(sql, (record["id"],))
sql = """delete from sender where id = %s and not exists (select 1 from station where latest_sender_id = sender.id)"""
deleteCursor.execute(sql, (record["latest_sender_id"],))
dbNoAutoCommit.commit()
deleteCursor.close()
deletedRows += 1
time.sleep(0.5)
except Exception as e:
# Something went wrong
#trackDirectLogger.error(e, exc_info=1)
dbNoAutoCommit.rollback()
deleteCursor.close()
selectStationCursor.close()
if (deletedRows > 0):
trackDirectLogger.info("Deleted %s stations" % (deletedRows))
cursor.execute("""VACUUM ANALYZE station""")
cursor.execute("""REINDEX TABLE station""")
cursor.execute("""VACUUM ANALYZE sender""")
cursor.execute("""REINDEX TABLE sender""")
#
# Close DB connection
#
cursor.close()
db.close()
trackDirectLogger.info("Done!")
except Exception as e:
trackDirectLogger.error(e, exc_info=1)

View File

@ -0,0 +1,125 @@
import trackdirect
import sys
import os.path
import logging
import logging.handlers
import psycopg2
import psycopg2.extras
import datetime
import time
from trackdirect.database.DatabaseConnection import DatabaseConnection
from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder
from trackdirect.TrackDirectConfig import TrackDirectConfig
if __name__ == '__main__':
if (len(sys.argv) < 3):
print "\n" + sys.argv[0] + ' [config.ini] [staion id]'
sys.exit()
elif (sys.argv[1].startswith("/")):
if (not os.path.isfile(sys.argv[1])):
print "\n" + sys.argv[0] + ' [config.ini] [staion id]'
sys.exit()
elif (not os.path.isfile(os.path.expanduser('~/trackdirect/config/' + sys.argv[1]))):
print "\n" + sys.argv[0] + ' [config.ini] [staion id]'
sys.exit()
stationId = sys.argv[2]
config = TrackDirectConfig()
config.populate(sys.argv[1])
try:
fh = logging.handlers.RotatingFileHandler(filename=os.path.expanduser(
'~/trackdirect/server/log/stationremover.log'), mode='a', maxBytes=1000000, backupCount=10)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
trackDirectLogger = logging.getLogger('trackdirect')
trackDirectLogger.addHandler(fh)
trackDirectLogger.addHandler(consoleHandler)
trackDirectLogger.setLevel(logging.INFO)
trackDirectLogger.info("Starting")
trackDirectDb = DatabaseConnection()
db = trackDirectDb.getConnection(True)
db.set_isolation_level(0)
cursor = db.cursor()
cursor.execute("SET statement_timeout = '120s'")
trackDirectDbObjectFinder = DatabaseObjectFinder(db)
# If saving longer than 365 days, modify range
for x in range(0, 365):
prevDay = datetime.date.today() - datetime.timedelta(x) # today minus x days
prevDayTimestamp = prevDay.strftime("%s")
prevDayFormat = datetime.datetime.utcfromtimestamp(
int(prevDayTimestamp)).strftime('%Y%m%d')
packetTable = "packet" + prevDayFormat
packetPathTable = "packet" + prevDayFormat + "_path"
packetWeatherTable = "packet" + prevDayFormat + "_weather"
packetTelemetryTable = "packet" + prevDayFormat + "_telemetry"
if (trackDirectDbObjectFinder.checkTableExists(packetPathTable)):
# Delete paths for this station
sql = """delete from """ + packetPathTable + \
""" where packet_id in (select id from """ + \
packetTable + """ where station_id = %s)"""
cursor.execute(sql, (stationId,))
trackDirectLogger.info("Deleted %s rows in %s" % (
cursor.rowcount or 0, packetPathTable))
time.sleep(0.5)
# Delete paths related to this station
sql = """delete from """ + packetPathTable + """ where station_id = %s"""
cursor.execute(sql, (stationId,))
trackDirectLogger.info("Deleted %s related rows in %s" % (
cursor.rowcount or 0, packetPathTable))
time.sleep(0.5)
if (trackDirectDbObjectFinder.checkTableExists(packetTelemetryTable)):
# Delete telemetry for this station
sql = """delete from """ + packetTelemetryTable + """ where station_id = %s"""
cursor.execute(sql, (stationId,))
trackDirectLogger.info("Deleted %s rows in %s" % (
cursor.rowcount or 0, packetTelemetryTable))
time.sleep(0.5)
if (trackDirectDbObjectFinder.checkTableExists(packetWeatherTable)):
# Delete weather for this station
sql = """delete from """ + packetWeatherTable + """ where station_id = %s"""
cursor.execute(sql, (stationId,))
trackDirectLogger.info("Deleted %s rows in %s" % (
cursor.rowcount or 0, packetWeatherTable))
time.sleep(0.5)
if (trackDirectDbObjectFinder.checkTableExists(packetTable)):
# Delete packets for this station
sql = """delete from """ + packetTable + """ where station_id = %s"""
cursor.execute(sql, (stationId,))
trackDirectLogger.info("Deleted %s rows in %s" % (
cursor.rowcount or 0, packetTable))
time.sleep(0.5)
# Delete station
sql = "delete from station where id = %s"
cursor.execute(sql, (stationId,))
trackDirectLogger.info(
"Deleted %s rows from station" % (cursor.rowcount or 0))
time.sleep(0.5)
cursor.close()
db.close()
trackDirectLogger.info("Done!")
except Exception as e:
trackDirectLogger.error(e, exc_info=1)

153
server/bin/wsserver.py Normal file
View File

@ -0,0 +1,153 @@
from autobahn.twisted.websocket import WebSocketServerFactory
from autobahn.twisted.resource import WebSocketResource
from autobahn.websocket.compress import PerMessageDeflateOffer, PerMessageDeflateOfferAccept
import trackdirect
import argparse
import psutil
import sys
import os.path
import logging
import logging.handlers
from twisted.internet import reactor
from twisted.web.server import Site
from twisted.web.static import File
from socket import AF_INET
def master(options, trackDirectLogger):
"""
Start of the master process.
"""
config = trackdirect.TrackDirectConfig()
config.populate(options.config)
workerPid = os.getpid()
p = psutil.Process(workerPid)
p.cpu_affinity([0])
trackDirectLogger.warning("Starting master with PID " + str(workerPid) + " (on CPU id(s): " + ','.join(map(str, p.cpu_affinity())) + ")")
try:
factory = WebSocketServerFactory(
"ws://" + config.websocketHostname + ":" + str(config.websocketPort),
externalPort = config.websocketExternalPort)
factory.protocol = trackdirect.TrackDirectWebsocketServer
resource = WebSocketResource(factory)
root = File(".")
root.putChild(b"ws", resource)
site = Site(root)
port = reactor.listenTCP(config.websocketPort, site)
for i in range(1, options.workers):
args = [sys.executable, "-u", __file__]
args.extend(sys.argv[1:])
args.extend(["--fd", str(port.fileno()), "--cpuid", str(i)])
reactor.spawnProcess(
None, sys.executable, args,
childFDs={0: 0, 1: 1, 2: 2, port.fileno(): port.fileno()},
env=os.environ)
options.fd = port.fileno()
listen(options, trackDirectLogger)
except Exception as e:
trackDirectLogger.error(e, exc_info=1)
def worker(options, trackDirectLogger):
"""
Start background worker process.
"""
config = trackdirect.TrackDirectConfig()
config.populate(options.config)
try:
workerPid = os.getpid()
p = psutil.Process(workerPid)
p.cpu_affinity([options.cpuid])
trackDirectLogger.warning("Starting worker with PID " + str(workerPid) + " (on CPU id(s): " + ','.join(map(str, p.cpu_affinity())) + ")")
listen(options, trackDirectLogger)
except Exception as e:
trackDirectLogger.error(e, exc_info=1)
def listen(options, trackDirectLogger) :
"""
Start to listen on websocket requests.
"""
config = trackdirect.TrackDirectConfig()
config.populate(options.config)
factory = WebSocketServerFactory(
"ws://" + config.websocketHostname + ":" + str(config.websocketPort),
externalPort = config.websocketExternalPort)
factory.protocol = trackdirect.TrackDirectWebsocketServer
# Enable WebSocket extension "permessage-deflate".
# Function to accept offers from the client ..
def accept(offers):
for offer in offers:
if isinstance(offer, PerMessageDeflateOffer):
return PerMessageDeflateOfferAccept(offer)
factory.setProtocolOptions(perMessageCompressionAccept=accept)
reactor.suggestThreadPoolSize(25)
# Socket already created, just start listening and accepting
reactor.adoptStreamPort(options.fd, AF_INET, factory)
reactor.run()
if __name__ == '__main__':
DEFAULT_WORKERS = psutil.cpu_count()
parser = argparse.ArgumentParser(
description='Track Direct WebSocket Server')
parser.add_argument('--config', dest='config', type=str, default=None,
help='The Track Direct config file, e.g. trackdirect.ini')
parser.add_argument('--workers', dest='workers', type=int, default=DEFAULT_WORKERS,
help='Number of workers to spawn - should fit the number of (physical) CPU cores.')
parser.add_argument('--fd', dest='fd', type=int, default=None,
help='If given, this is a worker which will use provided FD and all other options are ignored.')
parser.add_argument('--cpuid', dest='cpuid', type=int, default=None,
help='If given, this is a worker which will use provided CPU core to set its affinity.')
options = parser.parse_args()
config = trackdirect.TrackDirectConfig()
config.populate(options.config)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.handlers.RotatingFileHandler(filename=os.path.expanduser(
config.errorLog), mode='a', maxBytes=1000000, backupCount=10)
fh.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
trackDirectLogger = logging.getLogger('trackdirect')
trackDirectLogger.addHandler(fh)
trackDirectLogger.addHandler(consoleHandler)
trackDirectLogger.setLevel(logging.INFO)
fh2 = logging.handlers.RotatingFileHandler(filename=os.path.expanduser(
config.errorLog), mode='a', maxBytes=1000000, backupCount=10)
# aprslib is logging non important "socket error on ..." using ERROR-level
fh2.setFormatter(formatter)
aprslibLogger = logging.getLogger('aprslib.IS')
aprslibLogger.addHandler(fh2)
aprslibLogger.addHandler(consoleHandler)
aprslibLogger.setLevel(logging.INFO)
if options.fd is not None:
worker(options, trackDirectLogger)
else:
master(options, trackDirectLogger)