openwebrxplus/csdr/chain/multimon.py

71 lines
2.4 KiB
Python

from csdr.chain.demodulator import ServiceDemodulator, DialFrequencyReceiver
from csdr.module.multimon import MultimonModule
from pycsdr.modules import FmDemod, AudioResampler, Convert, Squelch
from pycsdr.types import Format
from owrx.multimon import MultimonParser, PageParser, SelCallParser
class MultimonDemodulator(ServiceDemodulator, DialFrequencyReceiver):
def __init__(self, decoders: list[str], parser, withSquelch: bool = False):
self.sampleRate = 24000
self.squelch = None
self.parser = parser
workers = [
FmDemod(),
AudioResampler(self.sampleRate, 22050),
Convert(Format.FLOAT, Format.SHORT),
MultimonModule(decoders),
self.parser,
]
# If using squelch, insert Squelch() at the start
if withSquelch:
# s-meter readings are available every 1024 samples
# the reporting interval is measured in those 1024-sample blocks
self.readingsPerSec = 4
self.squelch = Squelch(5, int(self.sampleRate / (self.readingsPerSec * 1024)))
workers.insert(0, self.squelch)
# Connect all the workers
super().__init__(workers)
def getFixedAudioRate(self) -> int:
return self.sampleRate
def supportsSquelch(self) -> bool:
return self.squelch != None
def setDialFrequency(self, frequency: int) -> None:
self.parser.setDialFrequency(frequency)
def _convertToLinear(self, db: float) -> float:
return float(math.pow(10, db / 10))
def setSquelchLevel(self, level: float) -> None:
if self.squelch:
self.squelch.setSquelchLevel(self._convertToLinear(level))
class PageDemodulator(MultimonDemodulator):
def __init__(self, service: bool = False):
super().__init__(
["FLEX", "POCSAG512", "POCSAG1200", "POCSAG2400"],
PageParser(service=service)
)
class EasDemodulator(MultimonDemodulator):
def __init__(self, service: bool = False):
super().__init__(["EAS"], MultimonParser(service=service))
class SelCallDemodulator(MultimonDemodulator):
def __init__(self, service: bool = False):
super().__init__(
# These aappear to be rarely used and very similar, so they trigger at once
# "ZVEI1", "ZVEI2", "ZVEI3", "DZVEI", "PZVEI",
["DTMF", "EEA", "EIA", "CCIR"],
SelCallParser(service=service),
withSquelch = True
)