From c8d8641383ead0fc68250d50182346cbc736613d Mon Sep 17 00:00:00 2001 From: KF7EEL Date: Mon, 7 Mar 2022 09:11:35 -0800 Subject: [PATCH] initial commit of MQTT for external applications/networks --- data_gateway-SAMPLE.cfg | 15 +++-- data_gateway.py | 127 +++++++++++++++++++++++++++++++++++++++- data_gateway_config.py | 6 +- requirements.txt | 1 + 4 files changed, 140 insertions(+), 9 deletions(-) diff --git a/data_gateway-SAMPLE.cfg b/data_gateway-SAMPLE.cfg index 1bd123b..e30e4bf 100644 --- a/data_gateway-SAMPLE.cfg +++ b/data_gateway-SAMPLE.cfg @@ -117,11 +117,8 @@ URL: http://localhost:8080/svr SHARED_SECRET: test [DATA_CONFIG] -USE_DASHBOARD: True DATA_DMR_ID: 9099 CALL_TYPE: both -##UNIT_SMS_TS: 2 - USER_APRS_SSID: 5 USER_APRS_COMMENT: HBNet APRS Gateway APRS_SERVER: hbl.ink @@ -130,7 +127,6 @@ APRS_LOGIN_CALL: N0CALL APRS_LOGIN_PASSCODE: 12345 APRS_FILTER: r/47/-120/500 t/m - # User settings file, MUST configure using absolute path. USER_SETTINGS_FILE: /tmp/user_settings.txt @@ -143,6 +139,17 @@ IGATE_BEACON_ICON = /I IGATE_LATITUDE = 4730. N IGATE_LONGITUDE = 11930. W +# MQTT Configuration +# MQTT is used to accces external applications and other networks + +GATEWAY_CALLSIGN: + +MQTT_SERVER: +MQTT_PORT: + +MQTT_SERVER2: +MQTT_SPORT2: + ############################ NOT IMPLEMENTED YET ############################## # The following settings are for the static positions only, for hotspots or repeaters connected to MASTER stanzas. # Implementation by IU7IGU diff --git a/data_gateway.py b/data_gateway.py index e547917..f39f58a 100644 --- a/data_gateway.py +++ b/data_gateway.py @@ -105,6 +105,8 @@ import subprocess from scapy.all import IP, UDP, raw import ipaddress +import paho.mqtt.client as mqtt +import string ################################# @@ -126,6 +128,86 @@ btf = -1 ssid = '' UNIT_MAP = {} PACKET_MATCH = {} +mqtt_services = {} +##mqtt_shortcut_gen = ''.join(random.choices(string.ascii_uppercase, k=4)) + +def mqtt_main(broker_url = 'localhost', broker_port = 1883): + global mqtt_client +## print(broker_port) + # On connect, send announcement + def on_connect(client, userdata, flags, rc): + # Annouyncement happens with ten_loop_func + logger.debug('Connected to MQTT server: ' + broker_url) + def on_disconnect(client, userdata, flags, rc): + logger.debug('Disconnected from MQTT server') + + # Process received msg here + def on_message(client, userdata, message): + topic_list = str(message.topic).split('/') + dict_payload = json.loads(message.payload.decode()) + logger.debug(dict_payload) + if len(topic_list) == 1: + # Add service/network to dict of known services + if topic_list[0] == 'ANNOUNCE' and dict_payload[list(dict_payload.keys())[0]] == "LOST_CONNECTION": + try: + logger.debug('Removed MQTT service') + del mqtt_services[list(dict_payload.keys())[0]] + except Exception as e: + logger.error('Error with MQTT service removal') + logger.error(e) + elif topic_list[0] == 'ANNOUNCE': + logger.info('Service discovered: ' + dict_payload['shortcut']) + logger.debug((dict_payload)) + mqtt_services[dict_payload['shortcut']] = dict_payload + logger.debug('Known services: ') + logger.debug(mqtt_services) + + elif len(topic_list) > 1: + # Incoming MSG + if topic_list[0] == 'MSG' and list(dict_payload.keys())[1] == 'network' and topic_list[1] == mqtt_shortcut_gen: + try: + if mqtt_services[dict_payload['network']]['type'] == 'network': + send_sms(False, int(topic_list[2]), data_id[0], data_id[0], 'unit', dict_payload['network'] + '/' + list(dict_payload.keys())[0] + ': ' + dict_payload[list(dict_payload.keys())[0]]) + if mqtt_services[dict_payload['network']]['type'] == 'app': + send_sms(False, int(topic_list[2]), data_id[0], data_id[0], 'unit', dict_payload['network'] + ': ' + dict_payload[list(dict_payload.keys())[0]]) + except Exception as e: + logger.error('Unable to determine if message from network or app, defaulting to network...') + logger.error(e) + send_sms(False, int(topic_list[2]), data_id[0], data_id[0], 'unit', dict_payload['network'] + '/' + list(dict_payload.keys())[0] + ': ' + dict_payload[list(dict_payload.keys())[0]]) + + + mqtt_client = mqtt.Client() + # Last will and testament + mqtt_client.will_set("ANNOUNCE", json.dumps({mqtt_shortcut_gen:"LOST_CONNECTION"}), 0, False) + mqtt_client.on_message = on_message + mqtt_client.on_connect = on_connect + mqtt_client.on_disconnect = on_disconnect + mqtt_client.connect(broker_url, broker_port) + + # Subscribe to: + # Incoming messages + mqtt_client.subscribe('MSG/' + mqtt_shortcut_gen + '/#', qos=0) + # Announcements for service/network discovery + mqtt_client.subscribe("ANNOUNCE", qos=0) + + mqtt_client.loop_start() + +def mqtt_send_msg(network_shortcut, rcv_dmr_id, snd_dmr_id, message): + msg_dict = json.dumps({str(snd_dmr_id):message, 'network':mqtt_shortcut_gen}, indent = 4) + mqtt_client.publish(topic='MSG/' + network_shortcut + '/' + str(rcv_dmr_id), payload=msg_dict, qos=0, retain=False) + logger.info('Sent message to another network via MQTT: ' + network_shortcut) + +def mqtt_send_app(network_shortcut, snd_dmr_id, message): + msg_dict = json.dumps({'dmr_id': str(snd_dmr_id), 'message':message, 'network':mqtt_shortcut_gen}, indent = 4) + mqtt_client.publish(topic='APP/' + network_shortcut, payload=msg_dict, qos=0, retain=False) + logger.info('Sent message to external application via MQTT: ' + network_shortcut) + +def mqtt_announce(): + mqtt_client.publish(topic="ANNOUNCE", payload=json.dumps({'shortcut':mqtt_shortcut_gen, 'type': 'network', 'url':'http://localhost'}, indent = 4), qos=0, retain=False) + + + + def download_aprs_settings(_CONFIG): user_man_url = _CONFIG['WEB_SERVICE']['URL'] @@ -456,9 +538,21 @@ def download_config(CONFIG_FILE, cli_file): other_split = corrected_config['OTHER']['OTHER_OPTIONS'].split(';') for i in other_split: + if 'MQTT:' in i: + mqtt_options = i[5:].split(':') + for o in mqtt_options: + final_options = o.split('=') + print(final_options) + if final_options[0] == 'gateway_callsign': + mqtt_server = corrected_config['DATA_CONFIG']['GATEWAY_CALLSIGN'] = final_options[1].upper() + if final_options[0] == 'server': + mqtt_server = corrected_config['DATA_CONFIG']['MQTT_SERVER'] = final_options[1] + if final_options[0] == 'port': + mqtt_port = corrected_config['DATA_CONFIG']['MQTT_PORT'] = final_options[1] if 'DATA_GATEWAY:' in i: ## print(i) gateway_options = i[13:].split(':') +## print(gateway_options) ## print(gateway_options) for o in gateway_options: ## print(o) @@ -766,7 +860,21 @@ def process_sms(_rf_src, sms, call_type, system_name): logger.error(str(traceback.extract_tb(error_exception.__traceback__))) packet_assembly = '' - + elif '#' == parse_sms[0][0:1]: +## print(mqtt_services.keys()) + if parse_sms[0][1:] in mqtt_services.keys(): + mqtt_send_msg(str(parse_sms[0])[1:], parse_sms[1], int_id(_rf_src), ' '.join(parse_sms[2:])) + else: + # Add error message + pass + elif '!' == parse_sms[0][0:1]: +## print(mqtt_services.keys()) + if parse_sms[0][1:] in mqtt_services.keys(): + mqtt_send_app(str(parse_sms[0])[1:], str(int_id(_rf_src)), ' '.join(parse_sms[1:])) + else: + # Add error message + pass + elif '@' in parse_sms[0][0:1] and ' ' in sms: #'M-' not in parse_sms[1][0:2] or '@' not in parse_sms[0][1:]: #Example SMS text: @ARMDS This is a test. s = ' ' @@ -1121,7 +1229,9 @@ def format_sms(msg, to_id, from_id, call_type, use_header = True): unk_count = int((len(msg) + 4) * 2).to_bytes(1, 'big') hdr_seq_num = (ahex(int((int(call_seq_num, 16) + 128)).to_bytes(1, 'big'))) - + print('-----------') + print(int(call_seq_num, 16) + 128) + print('-------------') sms_header = '00' + str(ahex(unk_count))[2:-1] + 'a000' + str(hdr_seq_num)[2:-1] + '040d000a' @@ -1740,9 +1850,11 @@ def ten_loop_func(): with open(user_settings_file, 'w') as f: f.write(str(download_aprs_settings(CONFIG))) logger.info('Downloading and writing APRS settings file.') - + # Announce ID(s) over OBP for my_id in data_id: svrd_send_all(b'UNIT' + bytes_4(my_id)) + # Announce via MQTT + mqtt_announce() @@ -1927,6 +2039,9 @@ if __name__ == '__main__': ## # User APRS settings user_settings_file = CONFIG['DATA_CONFIG']['USER_SETTINGS_FILE'] + + mqtt_shortcut_gen = CONFIG['DATA_CONFIG']['GATEWAY_CALLSIGN'].upper() + #### use_api = CONFIG['DATA_CONFIG']['USE_API'] @@ -1992,6 +2107,12 @@ if __name__ == '__main__': rule_timer = rule_timer_task.start(10) rule_timer.addErrback(loopingErrHandle) + # Experimental MQTT for external applications, etc. + print(CONFIG['DATA_CONFIG']['MQTT_SERVER']) + mqtt_thread = threading.Thread(target=mqtt_main, args=(CONFIG['DATA_CONFIG']['MQTT_SERVER'],int(CONFIG['DATA_CONFIG']['MQTT_PORT']),)) + mqtt_thread.daemon = True + mqtt_thread.start() + # Used for misc timing events ten_loop_task = task.LoopingCall(ten_loop_func) ten_loop = ten_loop_task.start(600) diff --git a/data_gateway_config.py b/data_gateway_config.py index 6e908c9..8a767d2 100644 --- a/data_gateway_config.py +++ b/data_gateway_config.py @@ -167,11 +167,9 @@ def build_config(_config_file): elif section == 'DATA_CONFIG': CONFIG['DATA_CONFIG'].update({ -## 'USE_DASHBOARD': config.getboolean(section, 'USE_DASHBOARD'), 'DATA_DMR_ID': config.get(section, 'DATA_DMR_ID'), 'USER_APRS_SSID': config.get(section, 'USER_APRS_SSID'), 'CALL_TYPE': config.get(section, 'CALL_TYPE'), -## 'UNIT_SMS_TS': config.get(section, 'UNIT_SMS_TS'), 'USER_APRS_COMMENT': config.get(section, 'USER_APRS_COMMENT'), 'APRS_LOGIN_CALL': config.get(section, 'APRS_LOGIN_CALL'), 'APRS_LOGIN_PASSCODE': config.get(section, 'APRS_LOGIN_PASSCODE'), @@ -203,6 +201,10 @@ def build_config(_config_file): ## 'DASHBOARD_URL': config.get(section, 'DASHBOARD_URL'), ## 'SERVER_NAME': config.get(section, 'SERVER_NAME'), ## 'RULES_PATH': config.get(section, 'RULES_PATH'), + 'GATEWAY_CALLSIGN': config.get(section, 'GATEWAY_CALLSIGN'), + 'MQTT_SERVER': config.get(section, 'MQTT_SERVER'), + 'MQTT_PORT': config.get(section, 'MQTT_PORT'), + }) diff --git a/requirements.txt b/requirements.txt index 504714a..b71f6eb 100755 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ resettabletimer cryptography setproctitle scapy +paho.mqtt