moved/seprated header parsing + added tests

* moved header parsing code into seprate function
* improved header parsing
* added _validate_callsign()
* added tests for _validate_callsign()
* added tests for _parse_header()
This commit is contained in:
Rossen Georgiev 2014-12-06 17:35:53 +00:00
parent 932e10d687
commit 12f78fdbe4
3 changed files with 182 additions and 28 deletions

View File

@ -57,44 +57,27 @@ def parse(raw_sentence):
if len(raw_sentence) == 0:
raise ParseError("packet is empty", raw_sentence)
# typical packet format
#
# CALL1>CALL2,CALL3,CALL4:>longtext......
# |--------header--------|-----body-------|
#
try:
(header, body) = raw_sentence.split(':', 1)
(head, body) = raw_sentence.split(':', 1)
except:
raise ParseError("packet has no body", raw_sentence)
if len(body) == 0:
raise ParseError("packet body is empty", raw_sentence)
if not re.match(r"^[ -~]+$", header):
raise ParseError("packet header contains non-ascii characters ", raw_sentence)
try:
(fromcall, path) = header.split('>', 1)
except:
raise ParseError("invalid packet header", raw_sentence)
# TODO: validate callsigns??
path = path.split(',')
if len(path) < 1 or len(path[0]) == 0:
raise ParseError("no tocallsign", raw_sentence)
tocall = path[0]
path = path[1:]
parsed = {
'raw': raw_sentence,
'from': fromcall,
'to': tocall,
'path': path,
}
try:
viacall = path[-1] if re.match(r"^qA[CXUoOSrRRZI]$", path[-2]) else ""
parsed.update({'via': viacall})
except:
pass
parsed.update(_parse_header(head))
except ParseError, msg:
raise ParseError(str(msg), raw_sentence)
packet_type = body[0]
body = body[1:]
@ -157,7 +140,7 @@ def parse(raw_sentence):
logger.debug("Attempting to parse as mic-e packet")
parsed.update({'format': 'mic-e'})
dstcall = tocall.split('-')[0]
dstcall = parsed['to'].split('-')[0]
# verify mic-e format
if len(dstcall) != 6:
@ -642,6 +625,73 @@ def parse(raw_sentence):
return parsed
def _validate_callsign(callsign, prefix=""):
prefix = '%s: ' % prefix if bool(prefix) else ''
match = re.findall(r"^([A-Z0-9]{1,6})(-(\d{1,2}))?$", callsign)
if not match:
raise ParseError("%sinvalid callsign" % prefix)
callsign, x, ssid = match[0]
if bool(ssid) and int(ssid) > 15:
raise ParseError("%sssid not in 0-15 range" % prefix)
def _parse_header(head):
"""
Parses the header part of packet
Returns a dict
"""
# CALL1>CALL2,CALL3,CALL4,CALL5:
# |from-|--to-|------path-------|
#
try:
(fromcall, path) = head.split('>', 1)
except:
raise ParseError("invalid packet header")
if len(fromcall) == 0:
raise ParseError("no fromcallsign in header")
_validate_callsign(fromcall, "fromcallsign")
path = path.split(',')
if len(path) < 1 or len(path[0]) == 0:
raise ParseError("no tocallsign in header")
tocall = path[0]
path = path[1:]
_validate_callsign(tocall, "tocallsign")
for digi in path:
if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", digi, re.I):
raise ParseError("invalid callsign in path")
parsed = {
'from': fromcall,
'to': tocall,
'path': path,
}
# viacall is the callsign that gated the packet to the net
# it's located behind the q-contructed
#
# CALL1>CALL2,CALL3,qAR,CALL5:
# .....................|-via-|
#
viacall = ""
if len(path) > 2 and re.match(r"^q..$", path[-2]):
viacall = path[-1]
parsed.update({'via': viacall})
return parsed
def _parse_comment_telemetry(text):
"""
Looks for base91 telemetry found in comment field

View File

@ -1 +1 @@
__version__ = '0.6.7'
__version__ = '0.6.8'

104
tests/test_parse_header.py Normal file
View File

@ -0,0 +1,104 @@
import unittest
import string
from random import randint, randrange, sample
from aprs.parse import _parse_header
from aprs.parse import _validate_callsign
from aprs.exceptions import ParseError
class ValidateCallsign(unittest.TestCase):
def test_valid_input(self):
chars = string.letters.upper() + string.digits
def random_valid_callsigns():
for x in xrange(0, 500):
call = "".join(sample(chars, randrange(1, 6)))
if bool(randint(0, 1)):
call += "-%d" % randint(0, 15)
yield call
for call in random_valid_callsigns():
try:
_validate_callsign(call)
except ParseError:
self.fail(
"%s('%s') raised ParseError" %
(_validate_callsign.__name__, call)
)
def test_invalid_input(self):
testData = [
"",
"-",
"-1",
"---1",
"1234567",
"CALL-",
"CALL-16",
"CALLCALL",
]
for call in testData:
self.assertRaises(ParseError, _validate_callsign, call)
class ParseHeader(unittest.TestCase):
def test_valid_input_and_format(self):
# empty path header
expected = {
"to": "B",
"from": "A",
"via": "",
"path": []
}
result = _parse_header("A>B")
self.assertEqual(expected, result)
# with path
expected2 = {
"to": "B",
"from": "A",
"via": "",
"path": list('CDE')
}
result2 = _parse_header("A>B,C,D,E")
self.assertEqual(expected2, result2)
# test all currently valid q-cosntructs
for qCon in map(lambda x: "qA"+x, list("CXUoOSrRRZI")):
expected3 = {
"to": "B",
"from": "A",
"via": "D",
"path": ['C', qCon, 'D']
}
result3 = _parse_header("A>B,C,%s,D" % qCon)
self.assertEqual(expected3, result3)
def test_invalid_format(self):
testData = [
"", # empty header
">", # empty fromcall
"A>", # empty tocall
"A>b", # invalid tocall
"aaaaaaaaaaa", # invalid fromcall
"A>aaaaaaaaaaa", # invalid tocall
"A>B,1234567890,C", # invalid call in path
"A>B,C,1234567890,D", # invalid call in path
"A>B,C,1234567890", # invalid call in path
]
for head in testData:
self.assertRaises(ParseError, _parse_header, head)
if __name__ == '__main__':
unittest.main()