Importing MQTT support and other changes from jketterl.

This commit is contained in:
Marat Fayzullin 2024-04-11 22:37:49 -04:00
parent fd78e35aa5
commit 7ba5a75318
13 changed files with 206 additions and 22 deletions

View File

@ -290,7 +290,6 @@ class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
self.setSdr()
def _sendProfiles(self, *args):
# profiles = [{"id": pid, "name": name} for pid, name in SdrService.getAvailableProfiles().items()]
profiles = [{"id": pid, "name": name} for pid, name in SdrService.getAvailableProfileNames().items()]
self.write_profiles(profiles)

View File

@ -20,6 +20,8 @@ class Controller(object):
headers = {}
if content_type is not None:
headers["Content-Type"] = content_type
if content_type.startswith("text/"):
headers["Content-Type"] += "; charset=utf-8"
if last_modified is not None:
headers["Last-Modified"] = last_modified.astimezone(tz=timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
if max_age is not None:

View File

@ -2,7 +2,8 @@ from owrx.controllers.settings import SettingsFormController, SettingsBreadcrumb
from owrx.form.section import Section
from owrx.form.input.converter import OptionalConverter
from owrx.form.input.aprs import AprsBeaconSymbols, AprsAntennaDirections
from owrx.form.input import TextInput, CheckboxInput, DropdownInput, NumberInput
from owrx.form.input import TextInput, CheckboxInput, DropdownInput, NumberInput, PasswordInput
from owrx.form.input.validator import AddressAndOptionalPortValidator
from owrx.breadcrumb import Breadcrumb, BreadcrumbItem
@ -28,7 +29,7 @@ class ReportingController(SettingsFormController):
infotext="This callsign will be used to send data to the APRS-IS network",
),
TextInput("aprs_igate_server", "APRS-IS server"),
TextInput("aprs_igate_password", "APRS-IS network password"),
PasswordInput("aprs_igate_password", "APRS-IS network password"),
CheckboxInput(
"aprs_igate_beacon",
"Send the receiver position to the APRS-IS network",
@ -90,4 +91,42 @@ class ReportingController(SettingsFormController):
infotext="This callsign will be used to send spots to wsprnet.org",
),
),
Section(
"MQTT settings",
CheckboxInput(
"mqtt_enabled",
"Enable publishing decodes to MQTT",
),
TextInput(
"mqtt_host",
"Broker address",
infotext="Addresss of the MQTT broker to send decodes to (address[:port])",
validator=AddressAndOptionalPortValidator(),
),
TextInput(
"mqtt_client_id",
"Client ID",
converter=OptionalConverter(),
),
TextInput(
"mqtt_user",
"Username",
converter=OptionalConverter(),
),
PasswordInput(
"mqtt_password",
"Password",
converter=OptionalConverter(),
),
CheckboxInput(
"mqtt_use_ssl",
"Use SSL",
),
TextInput(
"mqtt_topic",
"MQTT topic",
infotext="MQTT topic to publish decodes to (default: openwebrx/decodes)",
converter=OptionalConverter(),
),
)
]

View File

@ -92,10 +92,11 @@ class FeatureDetector(object):
"acars": ["acarsdec"],
"page": ["multimon"],
"selcall": ["multimon"],
"rds": ["redsea"],
"dab": ["csdreti", "dablin"],
"wxsat": ["satdump"],
"png": ["imagemagick"],
"rds": ["redsea"],
"dab": ["csdreti", "dablin"],
"mqtt": ["paho_mqtt"],
}
def feature_availability(self):
@ -690,6 +691,19 @@ class FeatureDetector(object):
"""
return self.command_is_runnable("dablin -h")
def has_paho_mqtt(self):
"""
OpenWebRX can pass decoded signal data to an MQTT broker for processing in third-party applications. To be able
to do this, the [paho-mqtt](https://pypi.org/project/paho-mqtt/) library is required.
If you are using Debian or Ubuntu, you can install the `python3-paho-mqtt` package.
"""
try:
from paho.mqtt import __version__
return True
except ImportError:
return False
def has_acarsdec(self):
"""
OpenWebRX supports decoding ACARS airplane communications by using the

View File

@ -73,12 +73,12 @@ class SpectrumThread(SdrSourceEventClient):
threading.Thread(target=self.dsp.pump(self.reader.read, self.sdrSource.writeSpectrumData)).start()
def stopDsp(self):
if self.dsp is None:
return
self.dsp.stop()
self.dsp = None
self.reader.stop()
self.reader = None
if self.dsp is not None:
self.dsp.stop()
self.dsp = None
if self.reader is not None:
self.reader.stop()
self.reader = None
def stop(self):
self.stopDsp()

View File

@ -112,6 +112,13 @@ class TextInput(Input):
return TextConverter()
class PasswordInput(TextInput):
def input_properties(self, value, errors):
props = super().input_properties(value, errors)
props["type"] = "password"
return props
class NumberInput(Input):
def __init__(self, id, label, infotext=None, append="", converter: Converter = None, validator: Validator = None):
super().__init__(id, label, infotext, converter=converter, validator=validator)

View File

@ -54,3 +54,18 @@ class RangeListValidator(Validator):
def _rangeStr(self):
return "[{}]".format(", ".join(str(r) for r in self.rangeList))
class AddressAndOptionalPortValidator(Validator):
def validate(self, key, value) -> None:
parts = value.split(":")
if len(parts) > 2:
raise ValidationError(key, "Value contains too many colons")
if len(parts) > 1:
try:
port = int(parts[1])
except ValueError:
raise ValidationError(key, "Port number must be numeric")
if not 0 <= port <= 65535:
raise ValidationError(key, "Port number out of range")

View File

@ -1,8 +1,9 @@
import threading
from owrx.config import Config
from owrx.reporting.reporter import Reporter
from owrx.reporting.reporter import Reporter, FilteredReporter
from owrx.reporting.pskreporter import PskReporter
from owrx.reporting.wsprnet import WsprnetReporter
from owrx.feature import FeatureDetector
import logging
logger = logging.getLogger(__name__)
@ -12,9 +13,12 @@ class ReportingEngine(object):
creationLock = threading.Lock()
sharedInstance = None
# concrete classes if they can be imported without the risk of optional dependencies
# tuples if the import needs to be detected by a feature check
reporterClasses = {
"pskreporter_enabled": PskReporter,
"wsprnet_enabled": WsprnetReporter,
"pskreporter": PskReporter,
"wsprnet": WsprnetReporter,
"mqtt": ("owrx.reporting.mqtt", "MqttReporter")
}
@staticmethod
@ -32,12 +36,22 @@ class ReportingEngine(object):
def __init__(self):
self.reporters = []
self.configSub = Config.get().filter(*ReportingEngine.reporterClasses.keys()).wire(self.setupReporters)
configKeys = ["{}_enabled".format(n) for n in self.reporterClasses.keys()]
self.configSub = Config.get().filter(*configKeys).wire(self.setupReporters)
self.setupReporters()
def setupReporters(self, *args):
config = Config.get()
for configKey, reporterClass in ReportingEngine.reporterClasses.items():
for typeStr, reporterClass in self.reporterClasses.items():
configKey = "{}_enabled".format(typeStr)
if isinstance(reporterClass, tuple):
# feature check
if FeatureDetector().is_available(typeStr):
package, className = reporterClass
module = __import__(package, fromlist=[className])
reporterClass = getattr(module, className)
else:
continue
if configKey in config and config[configKey]:
if not any(isinstance(r, reporterClass) for r in self.reporters):
self.reporters += [reporterClass()]
@ -53,5 +67,8 @@ class ReportingEngine(object):
def spot(self, spot):
for r in self.reporters:
if spot["mode"] in r.getSupportedModes():
r.spot(spot)
if not isinstance(r, FilteredReporter) or spot["mode"] in r.getSupportedModes():
try:
r.spot(spot)
except Exception:
logger.exception("error sending spot to reporter")

66
owrx/reporting/mqtt.py Normal file
View File

@ -0,0 +1,66 @@
from paho.mqtt.client import Client
from owrx.reporting.reporter import Reporter
from owrx.config import Config
from owrx.property import PropertyDeleted
import json
import threading
import time
import logging
logger = logging.getLogger(__name__)
class MqttReporter(Reporter):
DEFAULT_TOPIC = "openwebrx/decodes"
def __init__(self):
pm = Config.get()
self.topic = self.DEFAULT_TOPIC
self.client = self._getClient()
self.subscriptions = [
pm.wireProperty("mqtt_topic", self._setTopic),
pm.filter("mqtt_host", "mqtt_user", "mqtt_password", "mqtt_client_id", "mqtt_use_ssl").wire(self._reconnect)
]
def _getClient(self):
pm = Config.get()
clientId = pm["mqtt_client_id"] if "mqtt_client_id" in pm else ""
client = Client(clientId)
if "mqtt_user" in pm and "mqtt_password" in pm:
client.username_pw_set(pm["mqtt_user"], pm["mqtt_password"])
port = 1883
if pm["mqtt_use_ssl"]:
client.tls_set()
port = 8883
parts = pm["mqtt_host"].split(":")
host = parts[0]
if len(parts) > 1:
port = int(parts[1])
client.connect(host=host, port=port)
threading.Thread(target=client.loop_forever).start()
return client
def _setTopic(self, topic):
if topic is PropertyDeleted:
self.topic = self.DEFAULT_TOPIC
else:
self.topic = topic
def _reconnect(self, *args, **kwargs):
old = self.client
self.client = self._getClient()
old.disconnect()
def stop(self):
self.client.disconnect()
while self.subscriptions:
self.subscriptions.pop().cancel()
def spot(self, spot):
self.client.publish(self.topic, payload=json.dumps(spot))

View File

@ -9,12 +9,12 @@ from owrx.config import Config
from owrx.version import openwebrx_version
from owrx.locator import Locator
from owrx.metrics import Metrics, CounterMetric
from owrx.reporting.reporter import Reporter
from owrx.reporting.reporter import FilteredReporter
logger = logging.getLogger(__name__)
class PskReporter(Reporter):
class PskReporter(FilteredReporter):
"""
This class implements the reporting interface to send received signals to pskreporter.info.

View File

@ -10,6 +10,8 @@ class Reporter(ABC):
def spot(self, spot):
pass
class FilteredReporter(Reporter):
@abstractmethod
def getSupportedModes(self):
return []

View File

@ -1,4 +1,4 @@
from owrx.reporting.reporter import Reporter
from owrx.reporting.reporter import FilteredReporter
from owrx.version import openwebrx_version
from owrx.config import Config
from owrx.locator import Locator
@ -68,7 +68,7 @@ class Worker(threading.Thread):
request.urlopen("http://wsprnet.org/post/", data, timeout=60)
class WsprnetReporter(Reporter):
class WsprnetReporter(FilteredReporter):
def __init__(self):
# max 100 entries
self.queue = Queue(100)

View File

@ -1,4 +1,5 @@
from owrx.source.soapy import SoapyConnectorSource, SoapyConnectorDeviceDescription
from owrx.form.input import Input, TextInput
from owrx.form.input.validator import Range
from typing import List
@ -7,10 +8,32 @@ class PlutoSdrSource(SoapyConnectorSource):
def getDriver(self):
return "plutosdr"
def getEventNames(self):
return super().getEventNames() + ["hostname"]
def buildSoapyDeviceParameters(self, parsed, values):
params = super().buildSoapyDeviceParameters(parsed, values)
if "hostname" in values:
params = [p for p in params if "hostname" not in p]
params += [{"hostname": values["hostname"]}]
return params
class PlutoSdrDeviceDescription(SoapyConnectorDeviceDescription):
def getName(self):
return "PlutoSDR"
def getInputs(self) -> List[Input]:
return super().getInputs() + [
TextInput(
"hostname",
"Hostname",
infotext="Use this for PlutoSDR devices attached to the network"
)
]
def getDeviceOptionalKeys(self):
return super().getDeviceOptionalKeys() + ["hostname"]
def getSampleRateRanges(self) -> List[Range]:
return [Range(520833, 61440000)]