Merge pull request #66 from rossengeorgiev/joergschultzelutter-master

Updated: Support for the 'more recent' reply/ack msg format from 1999 #61
This commit is contained in:
Rossen 2021-11-27 19:33:42 +00:00 committed by GitHub
commit e17efe2a29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 179 additions and 17 deletions

View File

@ -69,28 +69,66 @@ def parse_message(body):
break
# regular message
else:
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
# ---------------------------
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
match = re.search(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
if match:
parsed.update({
'response': match.group(1),
'msgNo': match.group(2),
})
else:
body = body[0:70]
# APRS supports two different message formats:
# - the standard format which is described in 'aprs101.pdf':
# http://www.aprs.org/doc/APRS101.PDF
# - an addendum from 1999 which introduces a new format:
# http://www.aprs.org/aprs11/replyacks.txt
#
# A message (ack/rej as well as a standard msg text body) can either have:
# - no message number at all
# - a message number in the old format (1..5 characters / digits)
# - a message number in the new format (2 characters / digits) without trailing 'ack msg no'
# - a message number in the new format with trailing 'free ack msg no' (2 characters / digits)
match = re.search(r"{([A-Za-z0-9]{1,5})$", body)
if match:
msgNo = match.group(1)
body = body[:len(body) - 1 - len(msgNo)]
# ack / rej
# ---------------------------
# NEW REPLAY-ACK
# format: :AAAABBBBC:ackMM}AA
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
if match:
parsed['response'], parsed['msgNo'], ackMsgNo = match[0]
if ackMsgNo:
parsed['ackMsgNo'] = ackMsgNo
break
parsed.update({'msgNo': msgNo})
# ack/rej standard format as per aprs101.pdf chapter 14
# format: :AAAABBBBC:ack12345
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
if match:
parsed['response'], parsed['msgNo'] = match[0]
break
parsed.update({'message_text': body.strip(' ')})
# regular message body parser
# ---------------------------
parsed['message_text'] = body.strip(' ')
# check for ACKs
# new message format: http://www.aprs.org/aprs11/replyacks.txt
# format: :AAAABBBBC:text.....{MM}AA
match = re.findall(r"{([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
if match:
msgNo, ackMsgNo = match[0]
parsed['message_text'] = body[:len(body) - 4 - len(ackMsgNo)].strip(' ')
parsed['msgNo'] = msgNo
if ackMsgNo:
parsed['ackMsgNo'] = ackMsgNo
break
# old message format - see aprs101.pdf.
# search for: msgNo present
match = re.findall(r"{([A-Za-z0-9]{1,5})$", body)
if match:
msgNo = match[0]
parsed['message_text'] = body[:len(body) - 1 - len(msgNo)].strip(' ')
parsed['msgNo'] = msgNo
break
# break free from the eternal 'while'
break
return ('', parsed)

View File

@ -0,0 +1,124 @@
import unittest
from aprslib.parsing.message import parse_message
# ack/rej assertion tests
class AckRejTests(unittest.TestCase):
# errorneus rej
def test_errorneus_rej(self):
unparsed, result = parse_message("WXBOT :red12345")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'red12345',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# reject with "old" msgno
def test_reject_old_msgno(self):
unparsed, result = parse_message("WXBOT :rej123")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'msgNo': '123',
'response': 'rej'
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# ack with new msgNo but no ackMsgNo
def test_ack_new_msgno_but_no_ack_msgno(self):
unparsed, result = parse_message("WXBOT :ackAB}")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'response': 'ack',
'msgNo': 'AB',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# ack with new msgNo and ackMsgNo
def test_ack_new_msgno_and_ackmsgno(self):
unparsed, result = parse_message("WXBOT :ackAB}CD")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'response': 'ack',
'msgNo': 'AB',
'ackMsgNo': 'CD',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message text body tests
class MessageTestBodyTests(unittest.TestCase):
# message body without msg no
def test_message_without_msgno(self):
unparsed, result = parse_message("WXBOT :HelloWorld ")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msg no - old format
def test_message_body_with_no_msgno_oldformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {ABCDE")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'ABCDE',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msgNo (new format) and ackMsgNo missing
def test_message_body_with_msgno_and_ackmsgno_missing_newformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {AB}")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'AB',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msgNo and ackMsgNo (new format)
def test_message_body_with_msgno_and_ackmsgno_newformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {AB}CD")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'AB',
'ackMsgNo': 'CD',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with really long message
def test_message_body_with_long_message(self):
unparsed, result = parse_message("WXBOT :00000000001111111111222222222233333333334444444444555555555566666666667777777777")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': '00000000001111111111222222222233333333334444444444555555555566666666667777777777',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)