parse: rework message rejack parsing + update tests
This commit is contained in:
parent
8928424a9c
commit
303fe826bd
|
|
@ -69,108 +69,66 @@ def parse_message(body):
|
|||
break
|
||||
|
||||
# regular message
|
||||
else:
|
||||
logger.debug("Packet is just a regular message")
|
||||
parsed.update({'format': 'message'})
|
||||
has_matched = False
|
||||
# ---------------------------
|
||||
logger.debug("Packet is just a regular message")
|
||||
parsed.update({'format': 'message'})
|
||||
|
||||
# APRS supports two different message formats:
|
||||
# - the standard format which is described in 'aprs101.pdf':
|
||||
# http://www.aprs.org/doc/APRS101.PDF
|
||||
# - an addendum from 1999 which introduces a new format:
|
||||
# http://www.aprs.org/aprs11/replyacks.txt
|
||||
#
|
||||
# A message (ack/rej as well as a standard msg text body) can either have:
|
||||
# - no message number at all
|
||||
# - a message number in the old format (1..5 characters / digits)
|
||||
# - a message number in the new format (2 characters / digits) without trailing 'ack msg no'
|
||||
# - a message number in the new format with trailing 'free ack msg no' (2 characters / digits)
|
||||
# APRS supports two different message formats:
|
||||
# - the standard format which is described in 'aprs101.pdf':
|
||||
# http://www.aprs.org/doc/APRS101.PDF
|
||||
# - an addendum from 1999 which introduces a new format:
|
||||
# http://www.aprs.org/aprs11/replyacks.txt
|
||||
#
|
||||
# A message (ack/rej as well as a standard msg text body) can either have:
|
||||
# - no message number at all
|
||||
# - a message number in the old format (1..5 characters / digits)
|
||||
# - a message number in the new format (2 characters / digits) without trailing 'ack msg no'
|
||||
# - a message number in the new format with trailing 'free ack msg no' (2 characters / digits)
|
||||
|
||||
#
|
||||
# ack / rej parser
|
||||
#
|
||||
# search for: ack/rej new format with msgNo present and ackMsgNo present
|
||||
if not has_matched:
|
||||
match = re.search(r"^(ack|rej)([A-Za-z0-9]{2})}([A-Za-z0-9]{2})$", body)
|
||||
if match:
|
||||
parsed.update({
|
||||
'response': match.group(1),
|
||||
'msgNo': match.group(2),
|
||||
'ackMsgNo': match.group(3)
|
||||
})
|
||||
if "message_text" in parsed:
|
||||
parsed.pop("message_text")
|
||||
has_matched = True
|
||||
|
||||
# search for: ack/rej new format with msgNo present and ackMsgNo not present
|
||||
if not has_matched:
|
||||
match = re.search(r"^(ack|rej)([A-Za-z0-9]{2})}$", body)
|
||||
if match:
|
||||
parsed.update({
|
||||
'response': match.group(1),
|
||||
'msgNo': match.group(2),
|
||||
})
|
||||
if "message_text" in parsed:
|
||||
parsed.pop("message_text")
|
||||
has_matched = True
|
||||
|
||||
# search for: ack/rej standard format as per aprs101.pdf chapter 14
|
||||
match = re.search(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
|
||||
if match:
|
||||
parsed.update({
|
||||
'response': match.group(1),
|
||||
'msgNo': match.group(2),
|
||||
})
|
||||
if "message_text" in parsed:
|
||||
parsed.pop("message_text")
|
||||
has_matched = True
|
||||
|
||||
# escape from the eternal 'while'
|
||||
if has_matched:
|
||||
break
|
||||
|
||||
#
|
||||
# regular message body parser
|
||||
#
|
||||
if not has_matched:
|
||||
# one-time truncate to 67 chars
|
||||
body = body[0:67].strip()
|
||||
|
||||
# new message format: http://www.aprs.org/aprs11/replyacks.txt
|
||||
# search for: msgText/msgNo/ackMsgNo all present
|
||||
match = re.search(r"{([A-Za-z0-9]{2})}([A-Za-z0-9]{2})$", body)
|
||||
if match:
|
||||
msgNo = match.group(1)
|
||||
ackMsgNo = match.group(2)
|
||||
body = body[:len(body) - 3 - len(msgNo) - len(ackMsgNo)]
|
||||
parsed.update({'msgNo': msgNo})
|
||||
parsed.update({'ackMsgNo': ackMsgNo})
|
||||
has_matched = True
|
||||
|
||||
# new message format: http://www.aprs.org/aprs11/replyacks.txt
|
||||
# search for: msgText/msgNo present and ackMsgNo not present
|
||||
if not has_matched:
|
||||
match = re.search(r"{([A-Za-z0-9]{2})}$", body)
|
||||
if match:
|
||||
msgNo = match.group(1)
|
||||
body = body[:len(body) - 2 - len(msgNo)]
|
||||
parsed.update({'msgNo': msgNo})
|
||||
has_matched = True
|
||||
|
||||
# old message format - see aprs101.pdf.
|
||||
# search for: msgNo present
|
||||
if not has_matched:
|
||||
match = re.search(r"{([A-Za-z0-9]{1,5})$", body)
|
||||
if match:
|
||||
msgNo = match.group(1)
|
||||
body = body[:len(body) - 1 - len(msgNo)]
|
||||
parsed.update({'msgNo': msgNo})
|
||||
has_matched = True
|
||||
|
||||
# update the potentially changed message body in our dictionary
|
||||
parsed.update({'message_text': body[0:67].strip()})
|
||||
|
||||
# break free from the eternal 'while'
|
||||
# ack / rej
|
||||
# ---------------------------
|
||||
# NEW REPLAY-ACK
|
||||
# format: :AAAABBBBC:ackMM}AA
|
||||
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
|
||||
if match:
|
||||
parsed['response'], parsed['msgNo'], ackMsgNo = match[0]
|
||||
if ackMsgNo:
|
||||
parsed['ackMsgNo'] = ackMsgNo
|
||||
break
|
||||
|
||||
# ack/rej standard format as per aprs101.pdf chapter 14
|
||||
# format: :AAAABBBBC:ack12345
|
||||
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
|
||||
if match:
|
||||
parsed['response'], parsed['msgNo'] = match[0]
|
||||
break
|
||||
|
||||
# regular message body parser
|
||||
# ---------------------------
|
||||
parsed['message_text'] = body.strip(' ')
|
||||
|
||||
# check for ACKs
|
||||
# new message format: http://www.aprs.org/aprs11/replyacks.txt
|
||||
# format: :AAAABBBBC:text.....{MM}AA
|
||||
match = re.findall(r"{([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
|
||||
if match:
|
||||
msgNo, ackMsgNo = match[0]
|
||||
parsed['message_text'] = body[:len(body) - 4 - len(ackMsgNo)].strip(' ')
|
||||
parsed['msgNo'] = msgNo
|
||||
if ackMsgNo:
|
||||
parsed['ackMsgNo'] = ackMsgNo
|
||||
break
|
||||
|
||||
# old message format - see aprs101.pdf.
|
||||
# search for: msgNo present
|
||||
match = re.findall(r"{([A-Za-z0-9]{1,5})$", body)
|
||||
if match:
|
||||
msgNo = match[0]
|
||||
parsed['message_text'] = body[:len(body) - 1 - len(msgNo)].strip(' ')
|
||||
parsed['msgNo'] = msgNo
|
||||
break
|
||||
|
||||
# break free from the eternal 'while'
|
||||
break
|
||||
|
||||
return ('', parsed)
|
||||
|
|
|
|||
|
|
@ -1,112 +0,0 @@
|
|||
import unittest
|
||||
from aprslib.parsing.message import parse_message
|
||||
|
||||
# ack/rej assertion tests
|
||||
class AckRejTests(unittest.TestCase):
|
||||
|
||||
# errorneus rej
|
||||
def test_errorneus_rej(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :red12345")
|
||||
assert ("msgNo" not in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("message_text" in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert (packet["message_text"] == "red12345")
|
||||
|
||||
# reject with "old" msgno
|
||||
def test_reject_old_msgno(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :rej123")
|
||||
assert ("msgNo" in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("response" in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert ("message_text" not in packet)
|
||||
assert (packet["response"] == "rej")
|
||||
assert (packet["msgNo"] == "123")
|
||||
|
||||
# ack with new msgNo but no ackMsgNo
|
||||
def test_ack_new_msgno_but_no_ack_msgno(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :ackAB}")
|
||||
assert ("msgNo" in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("response" in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert ("message_text" not in packet)
|
||||
assert (packet["response"] == "ack")
|
||||
assert (packet["msgNo"] == "AB")
|
||||
|
||||
# ack with new msgNo and ackMsgNo
|
||||
def test_ack_new_msgno_and_ackmsgno(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :ackAB}CD")
|
||||
assert ("msgNo" in packet)
|
||||
assert ("ackMsgNo" in packet)
|
||||
assert ("response" in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert ("message_text" not in packet)
|
||||
assert (packet["response"] == "ack")
|
||||
assert (packet["msgNo"] == "AB")
|
||||
assert (packet["ackMsgNo"] == "CD")
|
||||
|
||||
# message text body tests
|
||||
class MessageTestBodyTests(unittest.TestCase):
|
||||
|
||||
# message body without msg no
|
||||
def test_message_without_msgno(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :HelloWorld ")
|
||||
assert ("msgNo" not in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("message_text" in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert (packet["message_text"] == "HelloWorld")
|
||||
|
||||
# message body with msg no - old format
|
||||
def test_message_body_with_no_msgno_oldformat(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :HelloWorld {ABCDE")
|
||||
assert ("msgNo" in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("response" not in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert ("message_text" in packet)
|
||||
assert (packet["message_text"] == "HelloWorld")
|
||||
assert (packet["msgNo"] == "ABCDE")
|
||||
|
||||
# message body with msgNo (new format) and ackMsgNo missing
|
||||
def test_message_body_with_msgno_and_ackmsgno_missing_newformat(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :HelloWorld {AB}")
|
||||
assert ("msgNo" in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("response" not in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert ("message_text" in packet)
|
||||
assert (packet["message_text"] == "HelloWorld")
|
||||
assert (packet["msgNo"] == "AB")
|
||||
|
||||
# message body with msgNo and ackMsgNo (new format)
|
||||
def test_message_body_with_msgno_and_ackmsgno_newformat(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :HelloWorld {AB}CD")
|
||||
assert ("msgNo" in packet)
|
||||
assert ("ackMsgNo" in packet)
|
||||
assert ("response" not in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert ("message_text" in packet)
|
||||
assert (packet["message_text"] == "HelloWorld")
|
||||
assert (packet["msgNo"] == "AB")
|
||||
assert (packet["ackMsgNo"] == "CD")
|
||||
|
||||
# message body with really long message
|
||||
def test_message_body_with_long_message(self):
|
||||
with self.assertRaises(AssertionError):
|
||||
packet = parse_message("DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT :00000000001111111111222222222233333333334444444444555555555566666666667777777777")
|
||||
assert ("msgNo" not in packet)
|
||||
assert ("ackMsgNo" not in packet)
|
||||
assert ("message_text" in packet)
|
||||
assert (packet["format"] == "message")
|
||||
assert (packet["message_text"] == "0000000000111111111122222222223333333333444444444455555555556666666")
|
||||
|
|
@ -0,0 +1,124 @@
|
|||
import unittest
|
||||
from aprslib.parsing.message import parse_message
|
||||
|
||||
# ack/rej assertion tests
|
||||
class AckRejTests(unittest.TestCase):
|
||||
|
||||
# errorneus rej
|
||||
def test_errorneus_rej(self):
|
||||
unparsed, result = parse_message("WXBOT :red12345")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'message_text': 'red12345',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# reject with "old" msgno
|
||||
def test_reject_old_msgno(self):
|
||||
unparsed, result = parse_message("WXBOT :rej123")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'msgNo': '123',
|
||||
'response': 'rej'
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# ack with new msgNo but no ackMsgNo
|
||||
def test_ack_new_msgno_but_no_ack_msgno(self):
|
||||
unparsed, result = parse_message("WXBOT :ackAB}")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'response': 'ack',
|
||||
'msgNo': 'AB',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# ack with new msgNo and ackMsgNo
|
||||
def test_ack_new_msgno_and_ackmsgno(self):
|
||||
unparsed, result = parse_message("WXBOT :ackAB}CD")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'response': 'ack',
|
||||
'msgNo': 'AB',
|
||||
'ackMsgNo': 'CD',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# message text body tests
|
||||
class MessageTestBodyTests(unittest.TestCase):
|
||||
|
||||
# message body without msg no
|
||||
def test_message_without_msgno(self):
|
||||
unparsed, result = parse_message("WXBOT :HelloWorld ")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'message_text': 'HelloWorld',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# message body with msg no - old format
|
||||
def test_message_body_with_no_msgno_oldformat(self):
|
||||
unparsed, result = parse_message("WXBOT :HelloWorld {ABCDE")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'message_text': 'HelloWorld',
|
||||
'msgNo': 'ABCDE',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# message body with msgNo (new format) and ackMsgNo missing
|
||||
def test_message_body_with_msgno_and_ackmsgno_missing_newformat(self):
|
||||
unparsed, result = parse_message("WXBOT :HelloWorld {AB}")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'message_text': 'HelloWorld',
|
||||
'msgNo': 'AB',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# message body with msgNo and ackMsgNo (new format)
|
||||
def test_message_body_with_msgno_and_ackmsgno_newformat(self):
|
||||
unparsed, result = parse_message("WXBOT :HelloWorld {AB}CD")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'message_text': 'HelloWorld',
|
||||
'msgNo': 'AB',
|
||||
'ackMsgNo': 'CD',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
# message body with really long message
|
||||
def test_message_body_with_long_message(self):
|
||||
unparsed, result = parse_message("WXBOT :00000000001111111111222222222233333333334444444444555555555566666666667777777777")
|
||||
expected = {
|
||||
'format': 'message',
|
||||
'addresse': 'WXBOT',
|
||||
'message_text': '00000000001111111111222222222233333333334444444444555555555566666666667777777777',
|
||||
}
|
||||
|
||||
self.assertEqual(unparsed, '')
|
||||
self.assertEqual(expected, result)
|
||||
Loading…
Reference in New Issue