implement telemetry report parsing
This commit is contained in:
parent
c2a0f18ce0
commit
a2e7d508ca
|
|
@ -5,3 +5,4 @@ dist
|
|||
*.egg-info
|
||||
env3
|
||||
env2
|
||||
.idea
|
||||
|
|
|
|||
|
|
@ -45,25 +45,25 @@ from aprslib.parsing.thirdparty import *
|
|||
from aprslib.parsing.weather import *
|
||||
|
||||
unsupported_formats = {
|
||||
'#':'raw weather report',
|
||||
'$':'raw gps',
|
||||
'%':'agrelo',
|
||||
'&':'reserved',
|
||||
'(':'unused',
|
||||
')':'item report',
|
||||
'*':'complete weather report',
|
||||
'+':'reserved',
|
||||
'-':'unused',
|
||||
'.':'reserved',
|
||||
'<':'station capabilities',
|
||||
'?':'general query format',
|
||||
'T':'telemetry report',
|
||||
'[':'maidenhead locator beacon',
|
||||
'\\':'unused',
|
||||
']':'unused',
|
||||
'^':'unused',
|
||||
'#': 'raw weather report',
|
||||
'$': 'raw gps',
|
||||
'%': 'agrelo',
|
||||
'&': 'reserved',
|
||||
'(': 'unused',
|
||||
')': 'item report',
|
||||
'*': 'complete weather report',
|
||||
'+': 'reserved',
|
||||
'-': 'unused',
|
||||
'.': 'reserved',
|
||||
'<': 'station capabilities',
|
||||
'?': 'general query format',
|
||||
'[': 'maidenhead locator beacon',
|
||||
'\\': 'unused',
|
||||
']': 'unused',
|
||||
'^': 'unused',
|
||||
}
|
||||
|
||||
|
||||
def _unicode_packet(packet):
|
||||
# attempt utf-8
|
||||
try:
|
||||
|
|
@ -114,7 +114,7 @@ def parse(packet):
|
|||
|
||||
parsed = {
|
||||
'raw': packet,
|
||||
}
|
||||
}
|
||||
|
||||
# parse head
|
||||
try:
|
||||
|
|
@ -149,7 +149,7 @@ def parse(packet):
|
|||
parsed.update({
|
||||
'format': 'beacon',
|
||||
'text': packet_type + body,
|
||||
})
|
||||
})
|
||||
|
||||
logger.debug("Parsed ok.")
|
||||
return parsed
|
||||
|
|
@ -208,6 +208,10 @@ def _try_toparse_body(packet_type, body, parsed):
|
|||
|
||||
body, result = parse_position(packet_type, body)
|
||||
|
||||
elif packet_type == "T":
|
||||
logger.debug("Attempting to parse as telemetry report")
|
||||
|
||||
body, result = parse_telemetry_report(body)
|
||||
|
||||
# we are done
|
||||
parsed.update(result)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,9 +4,10 @@ from aprslib.exceptions import ParseError
|
|||
from aprslib.parsing import logger
|
||||
|
||||
__all__ = [
|
||||
'parse_comment_telemetry',
|
||||
'parse_telemetry_config',
|
||||
]
|
||||
'parse_comment_telemetry',
|
||||
'parse_telemetry_config',
|
||||
'parse_telemetry_report'
|
||||
]
|
||||
|
||||
|
||||
def parse_comment_telemetry(text):
|
||||
|
|
@ -23,19 +24,19 @@ def parse_comment_telemetry(text):
|
|||
|
||||
temp = [0] * 7
|
||||
for i in range(7):
|
||||
temp[i] = base91.to_decimal(telemetry[i*2:i*2+2])
|
||||
temp[i] = base91.to_decimal(telemetry[i * 2:i * 2 + 2])
|
||||
|
||||
parsed.update({
|
||||
'telemetry': {
|
||||
'seq': temp[0],
|
||||
'vals': temp[1:6]
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
if temp[6] != '':
|
||||
parsed['telemetry'].update({
|
||||
'bits': "{0:08b}".format(temp[6] & 0xFF)[::-1]
|
||||
})
|
||||
})
|
||||
|
||||
return (text, parsed)
|
||||
|
||||
|
|
@ -62,14 +63,14 @@ def parse_telemetry_config(body):
|
|||
|
||||
parsed.update({
|
||||
't%s' % form: defvals
|
||||
})
|
||||
})
|
||||
elif form == "EQNS":
|
||||
eqns = body.rstrip().split(',')[:15]
|
||||
teqns = [0, 1, 0] * 5
|
||||
|
||||
for idx, val in enumerate(eqns):
|
||||
if not re.match(r"^([-]?\d*\.?\d+|)$", val):
|
||||
raise ParseError("value at %d is not a number in %s" % (idx+1, form))
|
||||
raise ParseError("value at %d is not a number in %s" % (idx + 1, form))
|
||||
else:
|
||||
try:
|
||||
val = int(val)
|
||||
|
|
@ -79,11 +80,11 @@ def parse_telemetry_config(body):
|
|||
teqns[idx] = val
|
||||
|
||||
# group values in 5 list of 3
|
||||
teqns = [teqns[i*3:(i+1)*3] for i in range(5)]
|
||||
teqns = [teqns[i * 3:(i + 1) * 3] for i in range(5)]
|
||||
|
||||
parsed.update({
|
||||
't%s' % form: teqns
|
||||
})
|
||||
})
|
||||
elif form == "BITS":
|
||||
match = re.findall(r"^([01]{8}),(.{0,23})$", body.rstrip())
|
||||
if not match:
|
||||
|
|
@ -94,7 +95,26 @@ def parse_telemetry_config(body):
|
|||
parsed.update({
|
||||
't%s' % form: bits,
|
||||
'title': title.strip(' ')
|
||||
})
|
||||
})
|
||||
|
||||
return (body, parsed)
|
||||
|
||||
|
||||
def parse_telemetry_report(text):
|
||||
temp = text.split(",")
|
||||
parsed = {}
|
||||
|
||||
if len(temp) == 7:
|
||||
|
||||
seq = int(temp[0].replace('#', ''))
|
||||
values = list(map(float, temp[1:6]))
|
||||
|
||||
parsed.update({
|
||||
'telemetry': {
|
||||
'seq': seq,
|
||||
'vals': values,
|
||||
'bits': temp[6]
|
||||
}
|
||||
})
|
||||
|
||||
return '', parsed
|
||||
|
|
|
|||
|
|
@ -257,6 +257,32 @@ Regular
|
|||
'to': u'TOCALL',
|
||||
'via': ''}
|
||||
|
||||
|
||||
Telemetry report
|
||||
-----------------------
|
||||
|
||||
|
||||
.. code:: python
|
||||
|
||||
>>> aprslib.parse("FROMCALL>APDW16,WIDE1-1,qAR,TOCALL:T#165,13.21,0.39,5.10,14.94,36.12,11111100")
|
||||
|
||||
{
|
||||
'raw': 'FROMCALL>APDW16,WIDE1-1,qAR,TOCALL:T#165,13.21,0.39,5.10,14.94,36.12,11111100',
|
||||
'from': 'FROMCALL',
|
||||
'to': 'APDW16',
|
||||
'path': ['WIDE1-1', 'qAR', 'TOCALL'],
|
||||
'via': 'TOCALL',
|
||||
'telemetry':
|
||||
{
|
||||
'seq': 165,
|
||||
'vals': [13.21, 0.39, 5.1, 14.94, 36.12],
|
||||
'bits': '11111100'
|
||||
},
|
||||
'format': 'beacon',
|
||||
'text': 'T#165,13.21,0.39,5.10,14.94,36.12,11111100'
|
||||
}
|
||||
|
||||
|
||||
Telemetry configuration
|
||||
-----------------------
|
||||
|
||||
|
|
@ -296,3 +322,5 @@ Telemetry configuration
|
|||
'to': 'TOCALL',
|
||||
'via': ''}
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,22 @@
|
|||
import unittest
|
||||
|
||||
from aprslib.parsing import parse_telemetry_report
|
||||
|
||||
|
||||
class ParseTelemetryReport(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.maxDiff = None
|
||||
|
||||
def test_valid_telemetry_report(self):
|
||||
packet = "#111,13.64,0.37,5.10,16.96,33.38,11110000"
|
||||
expected = {'telemetry':
|
||||
{'bits': '11110000',
|
||||
'seq': 111,
|
||||
'vals': [13.64, 0.37, 5.1, 16.96, 33.38]}}
|
||||
|
||||
_, result = parse_telemetry_report(packet)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Reference in New Issue