added tests for parsing.common #15
This commit is contained in:
parent
10ce45030d
commit
44dd7da2ed
|
|
@ -11,6 +11,9 @@ __all__ = [
|
|||
'parse_header',
|
||||
'parse_timestamp',
|
||||
'parse_comment',
|
||||
'parse_data_extentions',
|
||||
'parse_comment_altitude',
|
||||
'parse_dao',
|
||||
]
|
||||
|
||||
def validate_callsign(callsign, prefix=""):
|
||||
|
|
@ -44,7 +47,7 @@ def parse_header(head):
|
|||
|
||||
path = path.split(',')
|
||||
|
||||
if len(path) < 1 or len(path[0]) == 0:
|
||||
if len(path[0]) == 0:
|
||||
raise ParseError("no tocallsign in header")
|
||||
|
||||
tocall = path[0]
|
||||
|
|
@ -99,59 +102,28 @@ def parse_timestamp(body, packet_type=''):
|
|||
|
||||
timestamp = utc.strptime(timestamp, "%Y%m%d%H%M%S")
|
||||
timestamp = time.mktime(timestamp.timetuple())
|
||||
|
||||
parsed.update({'raw_timestamp': rawts})
|
||||
except Exception as exp:
|
||||
timestamp = 0
|
||||
logger.debug(exp)
|
||||
|
||||
parsed.update({'timestamp': int(timestamp)})
|
||||
parsed.update({
|
||||
'raw_timestamp': rawts,
|
||||
'timestamp': int(timestamp),
|
||||
})
|
||||
|
||||
return (body, parsed)
|
||||
|
||||
|
||||
def parse_comment(body, parsed):
|
||||
match = re.findall(r"^([0-9]{3})/([0-9]{3})", body)
|
||||
if match:
|
||||
cse, spd = match[0]
|
||||
body = body[7:]
|
||||
parsed.update({
|
||||
'course': int(cse),
|
||||
'speed': int(spd)*1.852 # knots to kms
|
||||
})
|
||||
body, result = parse_data_extentions(body)
|
||||
parsed.update(result)
|
||||
|
||||
# try BRG/NRQ/
|
||||
match = re.findall(r"^([0-9]{3})/([0-9]{3})", body)
|
||||
if match:
|
||||
brg, nrq = match[0]
|
||||
body = body[7:]
|
||||
parsed.update({'bearing': int(brg), 'nrq': int(nrq)})
|
||||
else:
|
||||
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))\/", body)
|
||||
if match:
|
||||
ext, phg = match[0]
|
||||
body = body[len(ext):]
|
||||
parsed.update({'phg': phg})
|
||||
else:
|
||||
match = re.findall(r"^(RNG(\d{4}))\/", body)
|
||||
if match:
|
||||
ext, rng = match[0]
|
||||
body = body[len(ext):]
|
||||
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
|
||||
body, result = parse_comment_altitude(body)
|
||||
parsed.update(result)
|
||||
|
||||
# try find altitude in comment /A=dddddd
|
||||
match = re.findall(r"^(.*?)/A=(\-\d{5}|\d{6})(.*)$", body)
|
||||
body, result = parse_comment_telemetry(body)
|
||||
parsed.update(result)
|
||||
|
||||
if match:
|
||||
body, altitude, rest = match[0]
|
||||
body += rest
|
||||
|
||||
parsed.update({'altitude': int(altitude)*0.3048})
|
||||
|
||||
body, telemetry = parse_comment_telemetry(body)
|
||||
parsed.update(telemetry)
|
||||
|
||||
# parse DAO extention
|
||||
body = parse_dao(body, parsed)
|
||||
|
||||
if len(body) > 0 and body[0] == "/":
|
||||
|
|
@ -160,22 +132,64 @@ def parse_comment(body, parsed):
|
|||
parsed.update({'comment': body.strip(' ')})
|
||||
|
||||
|
||||
def parse_dao(body, parsed):
|
||||
match = re.findall("^(.*)\!([\x21-\x7b][\x20-\x7b]{2})\!(.*?)$", body)
|
||||
def parse_data_extentions(body):
|
||||
parsed = {}
|
||||
match = re.findall(r"^([0-9 \.]{3})/([0-9 \.]{3})", body)
|
||||
|
||||
if match:
|
||||
body, dao, rest = match[0]
|
||||
cse, spd = match[0]
|
||||
body = body[7:]
|
||||
parsed.update({
|
||||
'course': int(cse) if cse.strip(' .') != '' else 0,
|
||||
'speed': int(spd)*1.852 if spd.strip(' .') != '' else 0,
|
||||
})
|
||||
|
||||
match = re.findall(r"^/([0-9]{3})/([0-9]{3})", body)
|
||||
if match:
|
||||
brg, nrq = match[0]
|
||||
body = body[8:]
|
||||
parsed.update({'bearing': int(brg), 'nrq': int(nrq)})
|
||||
else:
|
||||
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))", body)
|
||||
if match:
|
||||
ext, phg = match[0]
|
||||
body = body[len(ext):]
|
||||
parsed.update({'phg': phg})
|
||||
else:
|
||||
match = re.findall(r"^RNG(\d{4})", body)
|
||||
if match:
|
||||
rng = match[0]
|
||||
body = body[7:]
|
||||
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
|
||||
|
||||
return body, parsed
|
||||
|
||||
def parse_comment_altitude(body):
|
||||
parsed = {}
|
||||
match = re.findall(r"^(.*?)/A=(\-\d{5}|\d{6})(.*)$", body)
|
||||
if match:
|
||||
body, altitude, rest = match[0]
|
||||
body += rest
|
||||
parsed.update({'altitude': int(altitude)*0.3048})
|
||||
|
||||
return body, parsed
|
||||
|
||||
|
||||
def parse_dao(body, parsed):
|
||||
match = re.findall("^(.*)\!([\x21-\x7b])([\x20-\x7b]{2})\!(.*?)$", body)
|
||||
if match:
|
||||
body, daobyte, dao, rest = match[0]
|
||||
body += rest
|
||||
|
||||
parsed.update({'daodatumbyte': dao[0].upper()})
|
||||
|
||||
parsed.update({'daodatumbyte': daobyte.upper()})
|
||||
lat_offset = lon_offset = 0
|
||||
|
||||
if re.match("^[A-Z]", dao):
|
||||
lat_offset = int(dao[1]) * 0.001 / 60
|
||||
lon_offset = int(dao[2]) * 0.001 / 60
|
||||
elif re.match("^[a-z]", dao):
|
||||
lat_offset = base91.to_decimal(dao[1]) / 91.0 * 0.01 / 60
|
||||
lon_offset = base91.to_decimal(dao[2]) / 91.0 * 0.01 / 60
|
||||
if daobyte == 'W' and dao.isdigit():
|
||||
lat_offset = int(dao[0]) * 0.001 / 60
|
||||
lon_offset = int(dao[1]) * 0.001 / 60
|
||||
elif daobyte == 'w' and ' ' not in dao:
|
||||
lat_offset = (base91.to_decimal(dao[0]) / 91.0) * 0.01 / 60
|
||||
lon_offset = (base91.to_decimal(dao[1]) / 91.0) * 0.01 / 60
|
||||
|
||||
parsed['latitude'] += lat_offset if parsed['latitude'] >= 0 else -lat_offset
|
||||
parsed['longitude'] += lon_offset if parsed['longitude'] >= 0 else -lon_offset
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
import unittest2 as unittest
|
||||
import string
|
||||
from random import randint, randrange, sample
|
||||
from datetime import datetime
|
||||
|
||||
from aprslib.parsing import parse_header
|
||||
from aprslib.parsing import validate_callsign
|
||||
from aprslib import base91
|
||||
from aprslib.parsing.common import *
|
||||
from aprslib.exceptions import ParseError
|
||||
|
||||
|
||||
|
|
@ -47,6 +48,10 @@ class ValidateCallsign(unittest.TestCase):
|
|||
|
||||
|
||||
class ParseHeader(unittest.TestCase):
|
||||
def test_no_tocall(self):
|
||||
with self.assertRaises(ParseError):
|
||||
parse_header("AAA>")
|
||||
parse_header("AAA>,")
|
||||
|
||||
def testvalid_input_and_format(self):
|
||||
# empty path header
|
||||
|
|
@ -151,5 +156,276 @@ class ParseHeader(unittest.TestCase):
|
|||
continue
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
class TimestampTC(unittest.TestCase):
|
||||
def test_timestamp_invalid(self):
|
||||
body = "000000ntext"
|
||||
remaining, parsed = parse_timestamp(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'timestamp': 0,
|
||||
'raw_timestamp': '000000n',
|
||||
})
|
||||
|
||||
def test_status_timestamp_invalid(self):
|
||||
body = "000000htext"
|
||||
remaining, parsed = parse_timestamp(body, '>')
|
||||
|
||||
self.assertEqual(remaining, body)
|
||||
self.assertEqual(parsed, {
|
||||
'timestamp': 0,
|
||||
'raw_timestamp': '000000h',
|
||||
})
|
||||
|
||||
def test_timestamp_valid(self):
|
||||
timestamp = 1453891611
|
||||
date = datetime.utcfromtimestamp(timestamp)
|
||||
|
||||
# hhmmss format
|
||||
body = date.strftime("%H%M%Shtext")
|
||||
remaining, parsed = parse_timestamp(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'timestamp': timestamp,
|
||||
'raw_timestamp': body[:7],
|
||||
})
|
||||
|
||||
# ddhhmm format
|
||||
body = date.strftime("%d%H%Mztext")
|
||||
remaining, parsed = parse_timestamp(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'timestamp': timestamp - date.second,
|
||||
'raw_timestamp': body[:7],
|
||||
})
|
||||
|
||||
# ddhhmm format, local time, we parse as zulu
|
||||
body = date.strftime("%d%H%M/text")
|
||||
remaining, parsed = parse_timestamp(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'timestamp': timestamp - date.second,
|
||||
'raw_timestamp': body[:7],
|
||||
})
|
||||
|
||||
def test_invalid_date(self):
|
||||
body = "999999ztext"
|
||||
|
||||
remaining, parsed = parse_timestamp(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'timestamp': 0,
|
||||
'raw_timestamp': '999999z',
|
||||
})
|
||||
|
||||
|
||||
class CommentTC(unittest.TestCase):
|
||||
def test_comment(self):
|
||||
body = "test body"
|
||||
parsed = {}
|
||||
parse_comment(body, parsed)
|
||||
|
||||
self.assertEqual(parsed, {'comment': body})
|
||||
|
||||
body = "/test body"
|
||||
parsed = {}
|
||||
parse_comment(body, parsed)
|
||||
|
||||
self.assertEqual(parsed, {'comment': body[1:]})
|
||||
|
||||
body = " test body "
|
||||
parsed = {}
|
||||
parse_comment(body, parsed)
|
||||
|
||||
self.assertEqual(parsed, {'comment': body.strip(' ')})
|
||||
|
||||
|
||||
class DataExtentionsTC(unittest.TestCase):
|
||||
def test_course_speed(self):
|
||||
body = "123/100/text"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, '/text')
|
||||
self.assertEqual(parsed, {
|
||||
'course': 123,
|
||||
'speed': 100*1.852,
|
||||
})
|
||||
|
||||
def test_empty_course_speed(self):
|
||||
body = " / /text"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, '/text')
|
||||
self.assertEqual(parsed, {
|
||||
'course': 0,
|
||||
'speed': 0,
|
||||
})
|
||||
|
||||
body = ".../.../text"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, '/text')
|
||||
self.assertEqual(parsed, {
|
||||
'course': 0,
|
||||
'speed': 0,
|
||||
})
|
||||
|
||||
def test_course_speed_bearing_nrq(self):
|
||||
body = "123/100/234/345text"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'course': 123,
|
||||
'speed': 100*1.852,
|
||||
'bearing': 234,
|
||||
'nrq': 345,
|
||||
})
|
||||
|
||||
def test_PHG(self):
|
||||
body = "PHG1234Atext"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'phg': '1234A',
|
||||
})
|
||||
|
||||
body = "PHG1234text"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'phg': '1234',
|
||||
})
|
||||
|
||||
def test_range(self):
|
||||
body = "RNG1000text"
|
||||
remaining, parsed = parse_data_extentions(body)
|
||||
|
||||
self.assertEqual(remaining, 'text')
|
||||
self.assertEqual(parsed, {
|
||||
'rng': 1000*1.609344,
|
||||
})
|
||||
|
||||
|
||||
|
||||
class CommentAltitudeTC(unittest.TestCase):
|
||||
def test_valid_inputs(self):
|
||||
body = "asdasd/A=10000078zxc"
|
||||
remaining, parsed = parse_comment_altitude(body)
|
||||
|
||||
self.assertEqual(remaining, "asdasd78zxc")
|
||||
self.assertEqual(parsed, {'altitude': 30480})
|
||||
|
||||
body = "asdasd/A=-1000078zxc"
|
||||
remaining, parsed = parse_comment_altitude(body)
|
||||
|
||||
self.assertEqual(remaining, "asdasd78zxc")
|
||||
self.assertEqual(parsed, {'altitude': -3048})
|
||||
|
||||
def test_invalid(self):
|
||||
tests = [
|
||||
"",
|
||||
"aaaaaaaaaaaaaaaaaaaaa",
|
||||
"sdfsdfsdf/A=00000aaaa",
|
||||
"sdfsdfsdf/A=0000aaaaa",
|
||||
"sdfsdfsdf/A=000aaaaaa",
|
||||
"sdfsdfsdf/A=00aaaaaaa",
|
||||
"sdfsdfsdf/A=0aaaaaaaa",
|
||||
"sdfsdfsdf/A=aaaaaaaaa",
|
||||
"sdfsdfsdf/Aa000000aaa",
|
||||
"sdfsdfsdf/A=+00000aaa",
|
||||
"sdfsdfsdf/A=00a000aaa",
|
||||
]
|
||||
|
||||
for body in tests:
|
||||
remaining, parsed = parse_comment_altitude(body)
|
||||
self.assertEqual(remaining, body)
|
||||
self.assertEqual(parsed, {})
|
||||
|
||||
|
||||
class DAO_TC(unittest.TestCase):
|
||||
def test_wgs84_human_readable(self):
|
||||
body = "!W36!"
|
||||
parsed = {'latitude': 0, 'longitude': 0}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, '')
|
||||
self.assertEqual(parsed, {
|
||||
'daodatumbyte': 'W',
|
||||
'latitude': 0.00005,
|
||||
'longitude': 0.0001,
|
||||
})
|
||||
|
||||
def test_wgs84_human_readable_nagarive(self):
|
||||
body = "!W36!"
|
||||
parsed = {'latitude': -1, 'longitude': -1}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, '')
|
||||
self.assertEqual(parsed, {
|
||||
'daodatumbyte': 'W',
|
||||
'latitude': -1.00005,
|
||||
'longitude': -1.0001,
|
||||
})
|
||||
|
||||
def test_wgs84_human_readable_blank(self):
|
||||
body = "!W !"
|
||||
parsed = {'latitude': 1, 'longitude': 2}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, '')
|
||||
self.assertEqual(parsed, {
|
||||
'daodatumbyte': 'W',
|
||||
'latitude': 1,
|
||||
'longitude': 2,
|
||||
})
|
||||
|
||||
def test_wgs84_base91(self):
|
||||
body = "!w??!"
|
||||
parsed = {'latitude': 0, 'longitude': 0}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, '')
|
||||
self.assertEqual(parsed, {
|
||||
'daodatumbyte': 'W',
|
||||
'latitude': 5.4945054945054945e-05,
|
||||
'longitude': 5.4945054945054945e-05,
|
||||
})
|
||||
|
||||
def test_wgs84_base91_blank(self):
|
||||
body = "!w !"
|
||||
parsed = {'latitude': 1, 'longitude': 2}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, '')
|
||||
self.assertEqual(parsed, {
|
||||
'daodatumbyte': 'W',
|
||||
'latitude': 1,
|
||||
'longitude': 2,
|
||||
})
|
||||
|
||||
def test_dao_matching(self):
|
||||
body = "aaa!W12!bbb!W23!ccc!W45!ddd"
|
||||
parsed = {'latitude': 0, 'longitude': 0}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, 'aaa!W12!bbb!W23!cccddd')
|
||||
|
||||
def test_other_datum_bytes(self):
|
||||
for datum in [chr(x) for x in range(0x21,0x7c)]:
|
||||
body = "!" + datum + " !"
|
||||
parsed = {'latitude': 1, 'longitude': 2}
|
||||
remaining = parse_dao(body, parsed)
|
||||
|
||||
self.assertEqual(remaining, '')
|
||||
self.assertEqual(parsed, {
|
||||
'daodatumbyte': datum.upper(),
|
||||
'latitude': 1,
|
||||
'longitude': 2,
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue