initial commit of MQTT for external applications/networks

This commit is contained in:
KF7EEL 2022-03-07 09:11:35 -08:00
parent 3998456d37
commit c8d8641383
4 changed files with 140 additions and 9 deletions

View File

@ -117,11 +117,8 @@ URL: http://localhost:8080/svr
SHARED_SECRET: test
[DATA_CONFIG]
USE_DASHBOARD: True
DATA_DMR_ID: 9099
CALL_TYPE: both
##UNIT_SMS_TS: 2
USER_APRS_SSID: 5
USER_APRS_COMMENT: HBNet APRS Gateway
APRS_SERVER: hbl.ink
@ -130,7 +127,6 @@ APRS_LOGIN_CALL: N0CALL
APRS_LOGIN_PASSCODE: 12345
APRS_FILTER: r/47/-120/500 t/m
# User settings file, MUST configure using absolute path.
USER_SETTINGS_FILE: /tmp/user_settings.txt
@ -143,6 +139,17 @@ IGATE_BEACON_ICON = /I
IGATE_LATITUDE = 4730. N
IGATE_LONGITUDE = 11930. W
# MQTT Configuration
# MQTT is used to accces external applications and other networks
GATEWAY_CALLSIGN:
MQTT_SERVER:
MQTT_PORT:
MQTT_SERVER2:
MQTT_SPORT2:
############################ NOT IMPLEMENTED YET ##############################
# The following settings are for the static positions only, for hotspots or repeaters connected to MASTER stanzas.
# Implementation by IU7IGU

View File

@ -105,6 +105,8 @@ import subprocess
from scapy.all import IP, UDP, raw
import ipaddress
import paho.mqtt.client as mqtt
import string
#################################
@ -126,6 +128,86 @@ btf = -1
ssid = ''
UNIT_MAP = {}
PACKET_MATCH = {}
mqtt_services = {}
##mqtt_shortcut_gen = ''.join(random.choices(string.ascii_uppercase, k=4))
def mqtt_main(broker_url = 'localhost', broker_port = 1883):
global mqtt_client
## print(broker_port)
# On connect, send announcement
def on_connect(client, userdata, flags, rc):
# Annouyncement happens with ten_loop_func
logger.debug('Connected to MQTT server: ' + broker_url)
def on_disconnect(client, userdata, flags, rc):
logger.debug('Disconnected from MQTT server')
# Process received msg here
def on_message(client, userdata, message):
topic_list = str(message.topic).split('/')
dict_payload = json.loads(message.payload.decode())
logger.debug(dict_payload)
if len(topic_list) == 1:
# Add service/network to dict of known services
if topic_list[0] == 'ANNOUNCE' and dict_payload[list(dict_payload.keys())[0]] == "LOST_CONNECTION":
try:
logger.debug('Removed MQTT service')
del mqtt_services[list(dict_payload.keys())[0]]
except Exception as e:
logger.error('Error with MQTT service removal')
logger.error(e)
elif topic_list[0] == 'ANNOUNCE':
logger.info('Service discovered: ' + dict_payload['shortcut'])
logger.debug((dict_payload))
mqtt_services[dict_payload['shortcut']] = dict_payload
logger.debug('Known services: ')
logger.debug(mqtt_services)
elif len(topic_list) > 1:
# Incoming MSG
if topic_list[0] == 'MSG' and list(dict_payload.keys())[1] == 'network' and topic_list[1] == mqtt_shortcut_gen:
try:
if mqtt_services[dict_payload['network']]['type'] == 'network':
send_sms(False, int(topic_list[2]), data_id[0], data_id[0], 'unit', dict_payload['network'] + '/' + list(dict_payload.keys())[0] + ': ' + dict_payload[list(dict_payload.keys())[0]])
if mqtt_services[dict_payload['network']]['type'] == 'app':
send_sms(False, int(topic_list[2]), data_id[0], data_id[0], 'unit', dict_payload['network'] + ': ' + dict_payload[list(dict_payload.keys())[0]])
except Exception as e:
logger.error('Unable to determine if message from network or app, defaulting to network...')
logger.error(e)
send_sms(False, int(topic_list[2]), data_id[0], data_id[0], 'unit', dict_payload['network'] + '/' + list(dict_payload.keys())[0] + ': ' + dict_payload[list(dict_payload.keys())[0]])
mqtt_client = mqtt.Client()
# Last will and testament
mqtt_client.will_set("ANNOUNCE", json.dumps({mqtt_shortcut_gen:"LOST_CONNECTION"}), 0, False)
mqtt_client.on_message = on_message
mqtt_client.on_connect = on_connect
mqtt_client.on_disconnect = on_disconnect
mqtt_client.connect(broker_url, broker_port)
# Subscribe to:
# Incoming messages
mqtt_client.subscribe('MSG/' + mqtt_shortcut_gen + '/#', qos=0)
# Announcements for service/network discovery
mqtt_client.subscribe("ANNOUNCE", qos=0)
mqtt_client.loop_start()
def mqtt_send_msg(network_shortcut, rcv_dmr_id, snd_dmr_id, message):
msg_dict = json.dumps({str(snd_dmr_id):message, 'network':mqtt_shortcut_gen}, indent = 4)
mqtt_client.publish(topic='MSG/' + network_shortcut + '/' + str(rcv_dmr_id), payload=msg_dict, qos=0, retain=False)
logger.info('Sent message to another network via MQTT: ' + network_shortcut)
def mqtt_send_app(network_shortcut, snd_dmr_id, message):
msg_dict = json.dumps({'dmr_id': str(snd_dmr_id), 'message':message, 'network':mqtt_shortcut_gen}, indent = 4)
mqtt_client.publish(topic='APP/' + network_shortcut, payload=msg_dict, qos=0, retain=False)
logger.info('Sent message to external application via MQTT: ' + network_shortcut)
def mqtt_announce():
mqtt_client.publish(topic="ANNOUNCE", payload=json.dumps({'shortcut':mqtt_shortcut_gen, 'type': 'network', 'url':'http://localhost'}, indent = 4), qos=0, retain=False)
def download_aprs_settings(_CONFIG):
user_man_url = _CONFIG['WEB_SERVICE']['URL']
@ -456,9 +538,21 @@ def download_config(CONFIG_FILE, cli_file):
other_split = corrected_config['OTHER']['OTHER_OPTIONS'].split(';')
for i in other_split:
if 'MQTT:' in i:
mqtt_options = i[5:].split(':')
for o in mqtt_options:
final_options = o.split('=')
print(final_options)
if final_options[0] == 'gateway_callsign':
mqtt_server = corrected_config['DATA_CONFIG']['GATEWAY_CALLSIGN'] = final_options[1].upper()
if final_options[0] == 'server':
mqtt_server = corrected_config['DATA_CONFIG']['MQTT_SERVER'] = final_options[1]
if final_options[0] == 'port':
mqtt_port = corrected_config['DATA_CONFIG']['MQTT_PORT'] = final_options[1]
if 'DATA_GATEWAY:' in i:
## print(i)
gateway_options = i[13:].split(':')
## print(gateway_options)
## print(gateway_options)
for o in gateway_options:
## print(o)
@ -766,6 +860,20 @@ def process_sms(_rf_src, sms, call_type, system_name):
logger.error(str(traceback.extract_tb(error_exception.__traceback__)))
packet_assembly = ''
elif '#' == parse_sms[0][0:1]:
## print(mqtt_services.keys())
if parse_sms[0][1:] in mqtt_services.keys():
mqtt_send_msg(str(parse_sms[0])[1:], parse_sms[1], int_id(_rf_src), ' '.join(parse_sms[2:]))
else:
# Add error message
pass
elif '!' == parse_sms[0][0:1]:
## print(mqtt_services.keys())
if parse_sms[0][1:] in mqtt_services.keys():
mqtt_send_app(str(parse_sms[0])[1:], str(int_id(_rf_src)), ' '.join(parse_sms[1:]))
else:
# Add error message
pass
elif '@' in parse_sms[0][0:1] and ' ' in sms: #'M-' not in parse_sms[1][0:2] or '@' not in parse_sms[0][1:]:
#Example SMS text: @ARMDS This is a test.
@ -1121,7 +1229,9 @@ def format_sms(msg, to_id, from_id, call_type, use_header = True):
unk_count = int((len(msg) + 4) * 2).to_bytes(1, 'big')
hdr_seq_num = (ahex(int((int(call_seq_num, 16) + 128)).to_bytes(1, 'big')))
print('-----------')
print(int(call_seq_num, 16) + 128)
print('-------------')
sms_header = '00' + str(ahex(unk_count))[2:-1] + 'a000' + str(hdr_seq_num)[2:-1] + '040d000a'
@ -1740,9 +1850,11 @@ def ten_loop_func():
with open(user_settings_file, 'w') as f:
f.write(str(download_aprs_settings(CONFIG)))
logger.info('Downloading and writing APRS settings file.')
# Announce ID(s) over OBP
for my_id in data_id:
svrd_send_all(b'UNIT' + bytes_4(my_id))
# Announce via MQTT
mqtt_announce()
@ -1928,6 +2040,9 @@ if __name__ == '__main__':
## # User APRS settings
user_settings_file = CONFIG['DATA_CONFIG']['USER_SETTINGS_FILE']
mqtt_shortcut_gen = CONFIG['DATA_CONFIG']['GATEWAY_CALLSIGN'].upper()
#### use_api = CONFIG['DATA_CONFIG']['USE_API']
@ -1992,6 +2107,12 @@ if __name__ == '__main__':
rule_timer = rule_timer_task.start(10)
rule_timer.addErrback(loopingErrHandle)
# Experimental MQTT for external applications, etc.
print(CONFIG['DATA_CONFIG']['MQTT_SERVER'])
mqtt_thread = threading.Thread(target=mqtt_main, args=(CONFIG['DATA_CONFIG']['MQTT_SERVER'],int(CONFIG['DATA_CONFIG']['MQTT_PORT']),))
mqtt_thread.daemon = True
mqtt_thread.start()
# Used for misc timing events
ten_loop_task = task.LoopingCall(ten_loop_func)
ten_loop = ten_loop_task.start(600)

View File

@ -167,11 +167,9 @@ def build_config(_config_file):
elif section == 'DATA_CONFIG':
CONFIG['DATA_CONFIG'].update({
## 'USE_DASHBOARD': config.getboolean(section, 'USE_DASHBOARD'),
'DATA_DMR_ID': config.get(section, 'DATA_DMR_ID'),
'USER_APRS_SSID': config.get(section, 'USER_APRS_SSID'),
'CALL_TYPE': config.get(section, 'CALL_TYPE'),
## 'UNIT_SMS_TS': config.get(section, 'UNIT_SMS_TS'),
'USER_APRS_COMMENT': config.get(section, 'USER_APRS_COMMENT'),
'APRS_LOGIN_CALL': config.get(section, 'APRS_LOGIN_CALL'),
'APRS_LOGIN_PASSCODE': config.get(section, 'APRS_LOGIN_PASSCODE'),
@ -203,6 +201,10 @@ def build_config(_config_file):
## 'DASHBOARD_URL': config.get(section, 'DASHBOARD_URL'),
## 'SERVER_NAME': config.get(section, 'SERVER_NAME'),
## 'RULES_PATH': config.get(section, 'RULES_PATH'),
'GATEWAY_CALLSIGN': config.get(section, 'GATEWAY_CALLSIGN'),
'MQTT_SERVER': config.get(section, 'MQTT_SERVER'),
'MQTT_PORT': config.get(section, 'MQTT_PORT'),
})

View File

@ -12,4 +12,5 @@ resettabletimer
cryptography
setproctitle
scapy
paho.mqtt