From 12f78fdbe42ef137874ef7fd60affe33f8b5ff9c Mon Sep 17 00:00:00 2001 From: Rossen Georgiev Date: Sat, 6 Dec 2014 17:35:53 +0000 Subject: [PATCH] moved/seprated header parsing + added tests * moved header parsing code into seprate function * improved header parsing * added _validate_callsign() * added tests for _validate_callsign() * added tests for _parse_header() --- aprs/parse.py | 104 +++++++++++++++++++++++++++---------- aprs/version.py | 2 +- tests/test_parse_header.py | 104 +++++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 28 deletions(-) create mode 100644 tests/test_parse_header.py diff --git a/aprs/parse.py b/aprs/parse.py index 3df93fd..d892a73 100644 --- a/aprs/parse.py +++ b/aprs/parse.py @@ -57,44 +57,27 @@ def parse(raw_sentence): if len(raw_sentence) == 0: raise ParseError("packet is empty", raw_sentence) + # typical packet format + # + # CALL1>CALL2,CALL3,CALL4:>longtext...... + # |--------header--------|-----body-------| + # try: - (header, body) = raw_sentence.split(':', 1) + (head, body) = raw_sentence.split(':', 1) except: raise ParseError("packet has no body", raw_sentence) if len(body) == 0: raise ParseError("packet body is empty", raw_sentence) - if not re.match(r"^[ -~]+$", header): - raise ParseError("packet header contains non-ascii characters ", raw_sentence) - - try: - (fromcall, path) = header.split('>', 1) - except: - raise ParseError("invalid packet header", raw_sentence) - - # TODO: validate callsigns?? - - path = path.split(',') - - if len(path) < 1 or len(path[0]) == 0: - raise ParseError("no tocallsign", raw_sentence) - - tocall = path[0] - path = path[1:] - parsed = { 'raw': raw_sentence, - 'from': fromcall, - 'to': tocall, - 'path': path, } try: - viacall = path[-1] if re.match(r"^qA[CXUoOSrRRZI]$", path[-2]) else "" - parsed.update({'via': viacall}) - except: - pass + parsed.update(_parse_header(head)) + except ParseError, msg: + raise ParseError(str(msg), raw_sentence) packet_type = body[0] body = body[1:] @@ -157,7 +140,7 @@ def parse(raw_sentence): logger.debug("Attempting to parse as mic-e packet") parsed.update({'format': 'mic-e'}) - dstcall = tocall.split('-')[0] + dstcall = parsed['to'].split('-')[0] # verify mic-e format if len(dstcall) != 6: @@ -642,6 +625,73 @@ def parse(raw_sentence): return parsed +def _validate_callsign(callsign, prefix=""): + prefix = '%s: ' % prefix if bool(prefix) else '' + + match = re.findall(r"^([A-Z0-9]{1,6})(-(\d{1,2}))?$", callsign) + + if not match: + raise ParseError("%sinvalid callsign" % prefix) + + callsign, x, ssid = match[0] + + if bool(ssid) and int(ssid) > 15: + raise ParseError("%sssid not in 0-15 range" % prefix) + + +def _parse_header(head): + """ + Parses the header part of packet + Returns a dict + """ + # CALL1>CALL2,CALL3,CALL4,CALL5: + # |from-|--to-|------path-------| + # + try: + (fromcall, path) = head.split('>', 1) + except: + raise ParseError("invalid packet header") + + if len(fromcall) == 0: + raise ParseError("no fromcallsign in header") + + _validate_callsign(fromcall, "fromcallsign") + + path = path.split(',') + + if len(path) < 1 or len(path[0]) == 0: + raise ParseError("no tocallsign in header") + + tocall = path[0] + path = path[1:] + + _validate_callsign(tocall, "tocallsign") + + for digi in path: + if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", digi, re.I): + raise ParseError("invalid callsign in path") + + parsed = { + 'from': fromcall, + 'to': tocall, + 'path': path, + } + + # viacall is the callsign that gated the packet to the net + # it's located behind the q-contructed + # + # CALL1>CALL2,CALL3,qAR,CALL5: + # .....................|-via-| + # + viacall = "" + if len(path) > 2 and re.match(r"^q..$", path[-2]): + viacall = path[-1] + + parsed.update({'via': viacall}) + + return parsed + + def _parse_comment_telemetry(text): """ Looks for base91 telemetry found in comment field diff --git a/aprs/version.py b/aprs/version.py index 400a104..085bc85 100644 --- a/aprs/version.py +++ b/aprs/version.py @@ -1 +1 @@ -__version__ = '0.6.7' +__version__ = '0.6.8' diff --git a/tests/test_parse_header.py b/tests/test_parse_header.py new file mode 100644 index 0000000..5ecc93e --- /dev/null +++ b/tests/test_parse_header.py @@ -0,0 +1,104 @@ +import unittest +import string +from random import randint, randrange, sample + +from aprs.parse import _parse_header +from aprs.parse import _validate_callsign +from aprs.exceptions import ParseError + + +class ValidateCallsign(unittest.TestCase): + + def test_valid_input(self): + chars = string.letters.upper() + string.digits + + def random_valid_callsigns(): + for x in xrange(0, 500): + call = "".join(sample(chars, randrange(1, 6))) + + if bool(randint(0, 1)): + call += "-%d" % randint(0, 15) + + yield call + + for call in random_valid_callsigns(): + try: + _validate_callsign(call) + except ParseError: + self.fail( + "%s('%s') raised ParseError" % + (_validate_callsign.__name__, call) + ) + + def test_invalid_input(self): + testData = [ + "", + "-", + "-1", + "---1", + "1234567", + "CALL-", + "CALL-16", + "CALLCALL", + ] + + for call in testData: + self.assertRaises(ParseError, _validate_callsign, call) + + +class ParseHeader(unittest.TestCase): + + def test_valid_input_and_format(self): + # empty path header + expected = { + "to": "B", + "from": "A", + "via": "", + "path": [] + } + result = _parse_header("A>B") + + self.assertEqual(expected, result) + + # with path + expected2 = { + "to": "B", + "from": "A", + "via": "", + "path": list('CDE') + } + result2 = _parse_header("A>B,C,D,E") + + self.assertEqual(expected2, result2) + + # test all currently valid q-cosntructs + + for qCon in map(lambda x: "qA"+x, list("CXUoOSrRRZI")): + expected3 = { + "to": "B", + "from": "A", + "via": "D", + "path": ['C', qCon, 'D'] + } + result3 = _parse_header("A>B,C,%s,D" % qCon) + + self.assertEqual(expected3, result3) + + def test_invalid_format(self): + testData = [ + "", # empty header + ">", # empty fromcall + "A>", # empty tocall + "A>b", # invalid tocall + "aaaaaaaaaaa", # invalid fromcall + "A>aaaaaaaaaaa", # invalid tocall + "A>B,1234567890,C", # invalid call in path + "A>B,C,1234567890,D", # invalid call in path + "A>B,C,1234567890", # invalid call in path + ] + + for head in testData: + self.assertRaises(ParseError, _parse_header, head) + +if __name__ == '__main__': + unittest.main()