diff --git a/server/trackdirect/database/DatabaseConnection.py b/server/trackdirect/database/DatabaseConnection.py new file mode 100644 index 0000000..22495cc --- /dev/null +++ b/server/trackdirect/database/DatabaseConnection.py @@ -0,0 +1,66 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras + +from trackdirect.TrackDirectConfig import TrackDirectConfig + + +class DatabaseConnection(): + """The DatabaseConnection class handles the most basic communication with the database + """ + + db = None + dbNoAutoCommit = None + + def __init__(self): + """The __init__ method. + """ + config = TrackDirectConfig() + self.host = config.dbHostname + self.database = config.dbName + self.username = config.dbUsername + self.password = config.dbPassword + self.port = config.dbPort + + def getConnection(self, autocommit=True, createNewConnection=False): + """Returns a connection to the database + + Args: + autocommit (boolean): set to true if you want the connection to autocommit otherwise false + createNewConnection (boolean): set to true to force a new connection + Returns: + psycopg2.Connection + """ + if (createNewConnection): + db = self._createNewConnection() + if (autocommit): + # Active autocommit to avoid open transactions laying around + DatabaseConnection.db.autocommit = True + return db + + elif (autocommit): + if (DatabaseConnection.db is None): + DatabaseConnection.db = self._createNewConnection() + # Active autocommit to avoid open transactions laying around + DatabaseConnection.db.autocommit = True + return DatabaseConnection.db + + else: + if (DatabaseConnection.dbNoAutoCommit is None): + DatabaseConnection.dbNoAutoCommit = self._createNewConnection() + return DatabaseConnection.dbNoAutoCommit + + def _createNewConnection(self): + """Returns a connection to the database + + Returns: + psycopg2.Connection + """ + return psycopg2.connect(host=self.host, + database=self.database, + user=self.username, + password=self.password, + port=self.port, + sslmode='disable', + cursor_factory=psycopg2.extras.DictCursor) diff --git a/server/trackdirect/database/DatabaseObjectFinder.py b/server/trackdirect/database/DatabaseObjectFinder.py new file mode 100644 index 0000000..680f992 --- /dev/null +++ b/server/trackdirect/database/DatabaseObjectFinder.py @@ -0,0 +1,81 @@ +import logging +from twisted.python import log +import datetime +import time + + +class DatabaseObjectFinder(): + """The DatabaseObjectFinder class can be used to check if a database table exists or not + """ + + existingTables = {} + + def __init__(self, db): + """The __init__ method. + + Args: + db (psycopg2.Connection): Database connection + """ + self.db = db + + def setTableExists(self, tablename): + """Mark a table as existing + + Args: + tablename (str): table to be marked as existing + """ + DatabaseObjectFinder.existingTables[tablename] = True + + def checkTableExists(self, tablename): + """Returns true if specified table exists in database + + Args: + tablename (str): Table that we want's to know if it exists or not + + Returns: + Returns true if specified table exists in database otherwise false + """ + todayDateStr = datetime.datetime.utcfromtimestamp( + int(time.time())).strftime('%Y%m%d') + yesterdayDateStr = datetime.datetime.utcfromtimestamp( + int(time.time()) - 86400).strftime('%Y%m%d') + + if (todayDateStr in tablename or yesterdayDateStr in tablename): + # We only trust cache for the latest two days + if (tablename in DatabaseObjectFinder.existingTables): + # we know table exists + return True + + cur = self.db.cursor() + cur.execute(""" + SELECT COUNT(*) + FROM information_schema.tables + WHERE table_name = '{0}' + """.format(tablename.replace('\'', '\'\''))) + if cur.fetchone()[0] == 1: + DatabaseObjectFinder.existingTables[tablename] = True + cur.close() + return True + else: + cur.close() + return False + + def checkIndexExists(self, index): + """Returns true if specified index exists in database + + Args: + index (str): index that we want's to know if it exists or not + + Returns: + Returns true if specified index exists in database otherwise false + """ + cur = self.db.cursor() + cur.execute("""select to_regclass('{0}') \"name\"""".format( + index.replace('\'', '\'\''))) + record = cur.fetchone() + if record and record['name'] == index.replace('\'', '\'\''): + cur.close() + return True + else: + cur.close() + return False diff --git a/server/trackdirect/database/PacketOgnTableCreator.py b/server/trackdirect/database/PacketOgnTableCreator.py new file mode 100644 index 0000000..4636405 --- /dev/null +++ b/server/trackdirect/database/PacketOgnTableCreator.py @@ -0,0 +1,108 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras +import datetime +import time +import calendar + +from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder +from trackdirect.exceptions.TrackDirectMissingTableError import TrackDirectMissingTableError + + +class PacketOgnTableCreator(): + """The PacketOgnTableCreator class handles packet OGN table name logic + + Note: + Packets are stored in different tables depending on what day they are received, + new packet tables are created by this class. + """ + + def __init__(self, db): + """The __init__ method. + + Args: + db (psycopg2.Connection): Database connection + """ + self.db = db + self.dbObjectFinder = DatabaseObjectFinder(db) + self.logger = logging.getLogger('trackdirect') + self.createIfMissing = True + + def disableCreateIfMissing(self): + """Disable feature that creates new tables if missing + """ + self.createIfMissing = False + + def getPacketOgnTable(self, packetTimestamp): + """Returns the name of the OGN packet table + + Args: + packetTimestamp (int): Unix timestamp that we need the table for + + Returns: + the name of the OGN packet table as a string + """ + date = datetime.datetime.utcfromtimestamp( + packetTimestamp).strftime('%Y%m%d') + packetOgnTable = 'packet' + date + '_ogn' + if (not self.dbObjectFinder.checkTableExists(packetOgnTable)): + if(self.createIfMissing): + minTimestamp = packetTimestamp // (24*60*60) * (24*60*60) + maxTimestamp = minTimestamp + (24*60*60) + self._createPacketOgnTable( + packetOgnTable, minTimestamp, maxTimestamp) + self.dbObjectFinder.setTableExists(packetOgnTable) + else: + raise TrackDirectMissingTableError( + 'Database table does not exists') + return packetOgnTable + + def _createPacketOgnTable(self, tablename, minTimestamp, maxTimestamp): + """Create a packet OGN table with the specified name + + Args: + tablename (str): Name of the packet OGN table to create + packetTablename (str): Name of the related packet table + minTimestamp (int): Min Unix timestamp for this table + maxTimestamp (int): Max Unix timestamp for this table + """ + try: + # Note that we have no reference constraint to the packet table (we will keep rows in this table longer than rows in packet table) + cur = self.db.cursor() + sql = """ + create table %s () inherits (packet_ogn)""" % (tablename) + cur.execute(sql) + + sql = """alter table %s add constraint timestamp_range_check check(timestamp >= %d and timestamp < %d)""" % (tablename, minTimestamp, maxTimestamp) + cur.execute(sql) + + sql = """create index %s_pkey on %s using btree (id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_packet_id_idx on %s(packet_id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_station_id_idx on %s(station_id, timestamp)""" % ( + tablename, tablename) + cur.execute(sql) + + cur.close() + + except (psycopg2.IntegrityError, psycopg2.ProgrammingError) as e: + # Probably the other collector created the table at the same time (might happen when you run multiple collectors), just go on... + if ('already exists' not in str(e)): + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return + + except Exception as e: + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return diff --git a/server/trackdirect/database/PacketPathTableCreator.py b/server/trackdirect/database/PacketPathTableCreator.py new file mode 100644 index 0000000..8eda761 --- /dev/null +++ b/server/trackdirect/database/PacketPathTableCreator.py @@ -0,0 +1,111 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras +import datetime +import time +import calendar + +from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder +from trackdirect.exceptions.TrackDirectMissingTableError import TrackDirectMissingTableError + + +class PacketPathTableCreator(): + """The PacketPathTableCreator class handles packet table name logic + + Note: + Packets are stored in different tables depending on what day they are received, + new packet tables are created by this class. + """ + + def __init__(self, db): + """The __init__ method. + + Args: + db (psycopg2.Connection): Database connection + """ + self.db = db + self.dbObjectFinder = DatabaseObjectFinder(db) + self.logger = logging.getLogger('trackdirect') + self.createIfMissing = True + + def disableCreateIfMissing(self): + """Disable feature that creates new tables if missing + """ + self.createIfMissing = False + + def getPacketPathTable(self, packetTimestamp): + """Returns the name of the path packet table + + Args: + packetTimestamp (int): Unix timestamp that we need the table for + + Returns: + Returns the name of the path packet table as a string + """ + date = datetime.datetime.utcfromtimestamp( + packetTimestamp).strftime('%Y%m%d') + packetTable = 'packet' + date + packetPathTable = 'packet' + date + '_path' + if (not self.dbObjectFinder.checkTableExists(packetPathTable)): + if(self.createIfMissing): + minTimestamp = packetTimestamp // (24*60*60) * (24*60*60) + maxTimestamp = minTimestamp + (24*60*60) + self._createPacketPathTable( + packetPathTable, minTimestamp, maxTimestamp) + self.dbObjectFinder.setTableExists(packetPathTable) + else: + raise TrackDirectMissingTableError( + 'Database table does not exists') + return packetPathTable + + def _createPacketPathTable(self, tablename, minTimestamp, maxTimestamp): + """Create a packet path table with the specified name + + Args: + tablename (str): Name of the packet path table to create + minTimestamp (int): Min Unix timestamp for this table + maxTimestamp (int): Max Unix timestamp for this table + """ + try: + cur = self.db.cursor() + sql = """ + create table %s () inherits (packet_path)""" % (tablename) + cur.execute(sql) + + sql = """alter table %s add constraint timestamp_range_check check(timestamp >= %d and timestamp < %d)""" % (tablename, minTimestamp, maxTimestamp) + cur.execute(sql) + + sql = """create index %s_pkey on %s using btree (id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_packet_id_idx on %s(packet_id, number)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_station_id_idx on %s(station_id, timestamp)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_sending_station_id_idx on %s(sending_station_id, timestamp)""" % ( + tablename, tablename) + cur.execute(sql) + + cur.close() + + except (psycopg2.IntegrityError, psycopg2.ProgrammingError) as e: + # Probably the other collector created the table at the same time (might happen when you run multiple collectors), just go on... + if ('already exists' not in str(e)): + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return + + except Exception as e: + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return diff --git a/server/trackdirect/database/PacketTableCreator.py b/server/trackdirect/database/PacketTableCreator.py new file mode 100644 index 0000000..26077c3 --- /dev/null +++ b/server/trackdirect/database/PacketTableCreator.py @@ -0,0 +1,157 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras +import datetime +import time +import calendar + +from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder +from trackdirect.exceptions.TrackDirectMissingTableError import TrackDirectMissingTableError + + +class PacketTableCreator(): + """The PacketTableCreator class handles packet table name logic + + Note: + Packets are stored in different tables depending on what day they are received, + new packet tables are created by this class. + """ + + def __init__(self, db): + """The __init__ method. + + Args: + db (psycopg2.Connection): Database connection + """ + self.db = db + self.dbObjectFinder = DatabaseObjectFinder(db) + self.logger = logging.getLogger('trackdirect') + self.createIfMissing = True + + def disableCreateIfMissing(self): + """Disable feature that creates new tables if missing + """ + self.createIfMissing = False + + def enableCreateIfMissing(self): + """Enable feature that creates new tables if missing + """ + self.createIfMissing = True + + def getPacketTable(self, packetTimestamp): + """Returns the name of the packet table + + Args: + packetTimestamp (int): Unix timestamp that we need the table for + + Returns: + Returns the name of the packet table as a string + """ + date = datetime.datetime.utcfromtimestamp( + packetTimestamp).strftime('%Y%m%d') + packetTable = 'packet' + date + if (not self.dbObjectFinder.checkTableExists(packetTable)): + if(self.createIfMissing): + minTimestamp = packetTimestamp // (24*60*60) * (24*60*60) + maxTimestamp = minTimestamp + (24*60*60) + self._createPacketTable( + packetTable, minTimestamp, maxTimestamp) + self.dbObjectFinder.setTableExists(packetTable) + else: + raise TrackDirectMissingTableError( + 'Database table ' + packetTable + ' does not exists') + return packetTable + + def getPacketTables(self, startTimestamp, endTimestamp=None): + """Returns an array of packet table names based on the specified timestamp range + + Note: + If table does not exist we will not include the packet table name in array + + Args: + startTimestamp (int): Start unix timestamp for requested packet tables + endTimestamp (int): End unix timestamp for requested packet tables + + Returns: + Array of packet table names + """ + if (endTimestamp is None): + endTimestamp = int(time.time()) + + # We allways want to include + endDateTime = datetime.datetime.utcfromtimestamp(int(endTimestamp)) + endDateTime = endDateTime.replace( + hour=0, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1) + endTimestamp = calendar.timegm(endDateTime.timetuple()) + + result = [] + if (startTimestamp == 0): + # Go back 1 year + ts = int(time.time()) - (60*60*24*366) + else: + ts = startTimestamp + while (ts < endTimestamp): + date = datetime.datetime.utcfromtimestamp( + int(ts)).strftime('%Y%m%d') + datePacketTable = 'packet' + date + if (self.dbObjectFinder.checkTableExists(datePacketTable)): + result.append(datePacketTable) + + ts = ts + 86400 # 1 day in seconds + return result + + def _createPacketTable(self, tablename, minTimestamp, maxTimestamp): + """Create a packet table with the specified name + + Args: + tablename (str): Name of the packet table to create + minTimestamp (int): Min Unix timestamp for this table + maxTimestamp (int): Max Unix timestamp for this table + """ + try: + cur = self.db.cursor() + sql = """ + create table %s () inherits (packet)""" % (tablename) + cur.execute(sql) + + sql = """alter table %s add constraint timestamp_range_check check(timestamp >= %d and timestamp < %d)""" % (tablename, minTimestamp, maxTimestamp) + cur.execute(sql) + + # The regular primary key index + sql = """create index %s_pkey on %s using btree (id)""" % ( + tablename, tablename) + cur.execute(sql) + + # This index is magic for multiple methods in PacketRepository + sql = """create index %s_station_id_idx on %s(station_id, map_id, marker_id, timestamp)""" % ( + tablename, tablename) + cur.execute(sql) + + # This should be good when using the time-travel functionality + sql = """create index %s_map_sector_idx on %s(map_sector, timestamp, map_id)""" % ( + tablename, tablename) + cur.execute(sql) + + # Used by remover + sql = """create index %s_sender_id_idx on %s(sender_id)""" % ( + tablename, tablename) + cur.execute(sql) + + cur.close() + + except (psycopg2.IntegrityError, psycopg2.ProgrammingError) as e: + # Probably the other collector created the table at the same time (might happen when you run multiple collectors), just go on... + if ('already exists' not in str(e)): + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return + + except Exception as e: + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return diff --git a/server/trackdirect/database/PacketTelemetryTableCreator.py b/server/trackdirect/database/PacketTelemetryTableCreator.py new file mode 100644 index 0000000..54967a0 --- /dev/null +++ b/server/trackdirect/database/PacketTelemetryTableCreator.py @@ -0,0 +1,124 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras +import datetime +import time +import calendar + +from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder +from trackdirect.exceptions.TrackDirectMissingTableError import TrackDirectMissingTableError + + +class PacketTelemetryTableCreator(): + """The PacketTelemetryTableCreator class handles packet telemetry table name logic + + Note: + Packets are stored in different tables depending on what day they are received, + new packet tables are created by this class. + """ + + def __init__(self, db): + """The __init__ method. + + Args: + db (psycopg2.Connection): Database connection + """ + self.db = db + self.dbObjectFinder = DatabaseObjectFinder(db) + self.logger = logging.getLogger('trackdirect') + self.createIfMissing = True + + def disableCreateIfMissing(self): + """Disable feature that creates new tables if missing + """ + self.createIfMissing = False + + def getPacketTelemetryTable(self, packetTimestamp): + """Returns the name of the telemetry packet table + + Args: + packetTimestamp (int): Unix timestamp that we need the table for + + Returns: + Returns the name of the telemetry packet table as a string + """ + date = datetime.datetime.utcfromtimestamp( + packetTimestamp).strftime('%Y%m%d') + packetTelemetryTable = 'packet' + date + '_telemetry' + if (not self.dbObjectFinder.checkTableExists(packetTelemetryTable)): + if(self.createIfMissing): + minTimestamp = packetTimestamp // (24*60*60) * (24*60*60) + maxTimestamp = minTimestamp + (24*60*60) + self._createPacketTelemetryTable( + packetTelemetryTable, minTimestamp, maxTimestamp) + self.dbObjectFinder.setTableExists(packetTelemetryTable) + else: + raise TrackDirectMissingTableError( + 'Database table does not exists') + return packetTelemetryTable + + def _createPacketTelemetryTable(self, tablename, minTimestamp, maxTimestamp): + """Create a packet telemetry table with the specified name + + Args: + tablename (str): Name of the packet telemetry table to create + packetTablename (str): Name of the related packet table + minTimestamp (int): Min Unix timestamp for this table + maxTimestamp (int): Max Unix timestamp for this table + """ + try: + # Note that we have no reference constraint to the packet table (we might keep rows in this table longer than rows in packet table) + cur = self.db.cursor() + sql = """ + create table %s () inherits (packet_telemetry)""" % (tablename) + cur.execute(sql) + + sql = """alter table %s add constraint timestamp_range_check check(timestamp >= %d and timestamp < %d)""" % (tablename, minTimestamp, maxTimestamp) + cur.execute(sql) + + sql = """create index %s_pkey on %s using btree (id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_packet_id_idx on %s(packet_id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_station_id_idx on %s(station_id, timestamp, seq)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_telemetry_param_id on %s(station_telemetry_param_id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_telemetry_unit_id on %s(station_telemetry_unit_id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_telemetry_eqns_id on %s(station_telemetry_eqns_id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_telemetry_bits_id on %s(station_telemetry_bits_id)""" % ( + tablename, tablename) + cur.execute(sql) + + cur.close() + + except (psycopg2.IntegrityError, psycopg2.ProgrammingError) as e: + # Probably the other collector created the table at the same time (might happen when you run multiple collectors), just go on... + if ('already exists' not in str(e)): + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return + + except Exception as e: + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return diff --git a/server/trackdirect/database/PacketWeatherTableCreator.py b/server/trackdirect/database/PacketWeatherTableCreator.py new file mode 100644 index 0000000..f2ced64 --- /dev/null +++ b/server/trackdirect/database/PacketWeatherTableCreator.py @@ -0,0 +1,107 @@ +import logging +from twisted.python import log +import psycopg2 +import psycopg2.extras +import datetime +import time +import calendar + +from trackdirect.database.DatabaseObjectFinder import DatabaseObjectFinder +from trackdirect.exceptions.TrackDirectMissingTableError import TrackDirectMissingTableError + + +class PacketWeatherTableCreator(): + """The PacketWeatherTableCreator class handles packet weather table name logic + + Note: + Packets are stored in different tables depending on what day they are received, + new packet tables are created by this class. + """ + + def __init__(self, db): + """The __init__ method. + + Args: + db (psycopg2.Connection): Database connection + """ + self.db = db + self.dbObjectFinder = DatabaseObjectFinder(db) + self.logger = logging.getLogger('trackdirect') + self.createIfMissing = True + + def disableCreateIfMissing(self): + """Disable feature that creates new tables if missing + """ + self.createIfMissing = False + + def getPacketWeatherTable(self, packetTimestamp): + """Returns the name of the weather packet table + + Args: + packetTimestamp (int): Unix timestamp that we need the table for + + Returns: + the name of the weather packet table as a string + """ + date = datetime.datetime.utcfromtimestamp( + packetTimestamp).strftime('%Y%m%d') + packetWeatherTable = 'packet' + date + '_weather' + if (not self.dbObjectFinder.checkTableExists(packetWeatherTable)): + if(self.createIfMissing): + minTimestamp = packetTimestamp // (24*60*60) * (24*60*60) + maxTimestamp = minTimestamp + (24*60*60) + self._createPacketWeatherTable( + packetWeatherTable, minTimestamp, maxTimestamp) + self.dbObjectFinder.setTableExists(packetWeatherTable) + else: + raise TrackDirectMissingTableError( + 'Database table does not exists') + return packetWeatherTable + + def _createPacketWeatherTable(self, tablename, minTimestamp, maxTimestamp): + """Create a packet weather table with the specified name + + Args: + tablename (str): Name of the packet weather table to create + minTimestamp (int): Min Unix timestamp for this table + maxTimestamp (int): Max Unix timestamp for this table + """ + try: + # Note that we have no reference constraint to the packet table (we might keep rows in this table longer than rows in packet table) + cur = self.db.cursor() + sql = """ + create table %s () inherits (packet_weather)""" % (tablename) + cur.execute(sql) + + sql = """alter table %s add constraint timestamp_range_check check(timestamp >= %d and timestamp < %d)""" % (tablename, minTimestamp, maxTimestamp) + cur.execute(sql) + + sql = """create index %s_pkey on %s using btree (id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_packet_id_idx on %s(packet_id)""" % ( + tablename, tablename) + cur.execute(sql) + + sql = """create index %s_station_id_idx on %s(station_id, timestamp)""" % ( + tablename, tablename) + cur.execute(sql) + + cur.close() + + except (psycopg2.IntegrityError, psycopg2.ProgrammingError) as e: + # Probably the other collector created the table at the same time (might happen when you run multiple collectors), just go on... + if ('already exists' not in str(e)): + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return + + except Exception as e: + self.logger.error(e, exc_info=1) + + # Do some sleep and let the other process create all related tables (if other table failes we will do it after sleep) + time.sleep(10) + return diff --git a/server/trackdirect/database/__init__.py b/server/trackdirect/database/__init__.py new file mode 100644 index 0000000..984c177 --- /dev/null +++ b/server/trackdirect/database/__init__.py @@ -0,0 +1,2 @@ +__version__ = "1.0" +__author__ = "Per Qvarforth"