removed _ prefix from most parse functions

This commit is contained in:
Rossen Georgiev 2016-01-27 07:47:53 +00:00
parent a27c6f3ffe
commit 68190860b7
13 changed files with 123 additions and 142 deletions

View File

@ -1,5 +1,5 @@
from aprslib import parse from aprslib import parse
from aprslib.parsing.common import _parse_header from aprslib.parsing.common import parse_header
class APRSPacket(object): class APRSPacket(object):
format = 'raw' format = 'raw'
@ -42,7 +42,7 @@ class APRSPacket(object):
if not isinstance(obj, dict): if not isinstance(obj, dict):
if self.format == 'raw': if self.format == 'raw':
header, self.body = obj.split(":", 1) header, self.body = obj.split(":", 1)
obj = _parse_header(header) obj = parse_header(header)
else: else:
obj = parse(obj) obj = parse(obj)

View File

@ -98,7 +98,7 @@ def parse(packet):
# parse head # parse head
try: try:
parsed.update(_parse_header(head)) parsed.update(parse_header(head))
except ParseError as msg: except ParseError as msg:
raise ParseError(str(msg), packet) raise ParseError(str(msg), packet)
@ -111,7 +111,7 @@ def parse(packet):
# attempt to parse the body # attempt to parse the body
try: try:
_try_to_parse_body(packet_type, body, parsed) _try_toparse_body(packet_type, body, parsed)
# capture ParseErrors and attach the packet # capture ParseErrors and attach the packet
except (UnknownFormat, ParseError) as exp: except (UnknownFormat, ParseError) as exp:
@ -135,7 +135,7 @@ def parse(packet):
return parsed return parsed
def _try_to_parse_body(packet_type, body, parsed): def _try_toparse_body(packet_type, body, parsed):
result = {} result = {}
# NOT SUPPORTED FORMATS # NOT SUPPORTED FORMATS
@ -165,43 +165,43 @@ def _try_to_parse_body(packet_type, body, parsed):
elif packet_type == ',': elif packet_type == ',':
logger.debug("Packet is invalid format") logger.debug("Packet is invalid format")
body, result = _parse_invalid(body) body, result = parse_invalid(body)
# user defined # user defined
elif packet_type == '{': elif packet_type == '{':
logger.debug("Packet is user-defined") logger.debug("Packet is user-defined")
body, result = _parse_user_defined(body) body, result = parse_user_defined(body)
# Status report # Status report
elif packet_type == '>': elif packet_type == '>':
logger.debug("Packet is just a status message") logger.debug("Packet is just a status message")
body, result = _parse_status(packet_type, body) body, result = parse_status(packet_type, body)
# Mic-encoded packet # Mic-encoded packet
elif packet_type in "`'": elif packet_type in "`'":
logger.debug("Attempting to parse as mic-e packet") logger.debug("Attempting to parse as mic-e packet")
body, result = _parse_mice(parsed['to'], body) body, result = parse_mice(parsed['to'], body)
# Message packet # Message packet
elif packet_type == ':': elif packet_type == ':':
logger.debug("Attempting to parse as message packet") logger.debug("Attempting to parse as message packet")
body, result = _parse_message(body) body, result = parse_message(body)
# Positionless weather report # Positionless weather report
elif packet_type == '_': elif packet_type == '_':
logger.debug("Attempting to parse as positionless weather report") logger.debug("Attempting to parse as positionless weather report")
body, result = _parse_weather(body) body, result = parse_weather(body)
# postion report (regular or compressed) # postion report (regular or compressed)
elif (packet_type in '!=/@;' or elif (packet_type in '!=/@;' or
0 <= body.find('!') < 40): # page 28 of spec (PDF) 0 <= body.find('!') < 40): # page 28 of spec (PDF)
body, result = _parse_position(packet_type, body) body, result = parse_position(packet_type, body)
# we are done # we are done
parsed.update(result) parsed.update(result)

View File

@ -4,16 +4,16 @@ from datetime import datetime
from aprslib import base91 from aprslib import base91
from aprslib.exceptions import ParseError from aprslib.exceptions import ParseError
from aprslib.parsing import logger from aprslib.parsing import logger
from aprslib.parsing.telemetry import _parse_comment_telemetry from aprslib.parsing.telemetry import parse_comment_telemetry
__all__ = [ __all__ = [
'_validate_callsign', 'validate_callsign',
'_parse_header', 'parse_header',
'_parse_timestamp', 'parse_timestamp',
'_parse_comment', 'parse_comment',
] ]
def _validate_callsign(callsign, prefix=""): def validate_callsign(callsign, prefix=""):
prefix = '%s: ' % prefix if bool(prefix) else '' prefix = '%s: ' % prefix if bool(prefix) else ''
match = re.findall(r"^([A-Z0-9]{1,6})(-(\d{1,2}))?$", callsign) match = re.findall(r"^([A-Z0-9]{1,6})(-(\d{1,2}))?$", callsign)
@ -27,27 +27,16 @@ def _validate_callsign(callsign, prefix=""):
raise ParseError("%sssid not in 0-15 range" % prefix) raise ParseError("%sssid not in 0-15 range" % prefix)
def _parse_header(head): def parse_header(head):
""" """
Parses the header part of packet Parses the header part of packet
Returns a dict Returns a dict
""" """
# CALL1>CALL2,CALL3,CALL4,CALL5:
# |from-|--to-|------path-------|
#
try: try:
(fromcall, path) = head.split('>', 1) (fromcall, path) = head.split('>', 1)
except: except:
raise ParseError("invalid packet header") raise ParseError("invalid packet header")
# looking at aprs.fi, the rules for from/src callsign
# are a lot looser, causing a lot of packets to fail
# this check.
#
# if len(fromcall) == 0:
# raise ParseError("no fromcallsign in header")
# _validate_callsign(fromcall, "fromcallsign")
if (not 1 <= len(fromcall) <= 9 or if (not 1 <= len(fromcall) <= 9 or
not re.findall(r"^[a-z0-9]{0,9}(\-[a-z0-9]{1,8})?$", fromcall, re.I)): not re.findall(r"^[a-z0-9]{0,9}(\-[a-z0-9]{1,8})?$", fromcall, re.I)):
@ -61,7 +50,7 @@ def _parse_header(head):
tocall = path[0] tocall = path[0]
path = path[1:] path = path[1:]
_validate_callsign(tocall, "tocallsign") validate_callsign(tocall, "tocallsign")
for digi in path: for digi in path:
if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", digi, re.I): if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", digi, re.I):
@ -73,12 +62,6 @@ def _parse_header(head):
'path': path, 'path': path,
} }
# viacall is the callsign that gated the packet to the net
# it's located behind the q-contructed
#
# CALL1>CALL2,CALL3,qAR,CALL5:
# .....................|-via-|
#
viacall = "" viacall = ""
if len(path) >= 2 and re.match(r"^q..$", path[-2]): if len(path) >= 2 and re.match(r"^q..$", path[-2]):
viacall = path[-1] viacall = path[-1]
@ -88,7 +71,7 @@ def _parse_header(head):
return parsed return parsed
def _parse_timestamp(body, packet_type=''): def parse_timestamp(body, packet_type=''):
parsed = {} parsed = {}
match = re.findall(r"^((\d{6})(.))$", body[0:7]) match = re.findall(r"^((\d{6})(.))$", body[0:7])
@ -127,9 +110,7 @@ def _parse_timestamp(body, packet_type=''):
return (body, parsed) return (body, parsed)
def _parse_comment(body, parsed): def parse_comment(body, parsed):
# attempt to parse remaining part of the packet (comment field)
# try CRS/SPD
match = re.findall(r"^([0-9]{3})/([0-9]{3})", body) match = re.findall(r"^([0-9]{3})/([0-9]{3})", body)
if match: if match:
cse, spd = match[0] cse, spd = match[0]
@ -167,11 +148,11 @@ def _parse_comment(body, parsed):
parsed.update({'altitude': int(altitude)*0.3048}) parsed.update({'altitude': int(altitude)*0.3048})
body, telemetry = _parse_comment_telemetry(body) body, telemetry = parse_comment_telemetry(body)
parsed.update(telemetry) parsed.update(telemetry)
# parse DAO extention # parse DAO extention
body = _parse_dao(body, parsed) body = parse_dao(body, parsed)
if len(body) > 0 and body[0] == "/": if len(body) > 0 and body[0] == "/":
body = body[1:] body = body[1:]
@ -179,7 +160,7 @@ def _parse_comment(body, parsed):
parsed.update({'comment': body.strip(' ')}) parsed.update({'comment': body.strip(' ')})
def _parse_dao(body, parsed): def parse_dao(body, parsed):
match = re.findall("^(.*)\!([\x21-\x7b][\x20-\x7b]{2})\!(.*?)$", body) match = re.findall("^(.*)\!([\x21-\x7b][\x20-\x7b]{2})\!(.*?)$", body)
if match: if match:
body, dao, rest = match[0] body, dao, rest = match[0]

View File

@ -1,9 +1,9 @@
import re import re
from aprslib.parsing import logger from aprslib.parsing import logger
from aprslib.parsing.telemetry import _parse_telemetry_config from aprslib.parsing.telemetry import parse_telemetry_config
__all__ = [ __all__ = [
'_parse_message', 'parse_message',
] ]
# MESSAGE PACKET # MESSAGE PACKET
@ -18,7 +18,7 @@ __all__ = [
# :N3MIM:UNIT.Volts,deg.F,deg.F,Mbar,Kfeet,Clik,OPEN!,on,on,high # :N3MIM:UNIT.Volts,deg.F,deg.F,Mbar,Kfeet,Clik,OPEN!,on,on,high
# :N3MIM:EQNS.0,2.6,0,0,.53,-32,3,4.39,49,-32,3,18,1,2,3 # :N3MIM:EQNS.0,2.6,0,0,.53,-32,3,4.39,49,-32,3,18,1,2,3
# :N3MIM:BITS.10110101,PROJECT TITLE... # :N3MIM:BITS.10110101,PROJECT TITLE...
def _parse_message(body): def parse_message(body):
parsed = {} parsed = {}
# the while loop is used to easily break out once a match is found # the while loop is used to easily break out once a match is found
@ -63,7 +63,7 @@ def _parse_message(body):
parsed.update({'addresse': addresse.rstrip(' ')}) parsed.update({'addresse': addresse.rstrip(' ')})
# check if it's a telemetry configuration message # check if it's a telemetry configuration message
body, result = _parse_telemetry_config(body) body, result = parse_telemetry_config(body)
if result: if result:
parsed.update(result) parsed.update(result)
break break

View File

@ -2,11 +2,11 @@ import re
import math import math
from aprslib import base91 from aprslib import base91
from aprslib.exceptions import ParseError from aprslib.exceptions import ParseError
from aprslib.parsing.common import _parse_dao from aprslib.parsing.common import parse_dao
from aprslib.parsing.telemetry import _parse_comment_telemetry from aprslib.parsing.telemetry import parse_comment_telemetry
__all__ = [ __all__ = [
'_parse_mice', 'parse_mice',
] ]
# Mic-e message type table # Mic-e message type table
@ -37,7 +37,7 @@ MTYPE_TABLE_CUSTOM = {
# 'lllc/s$/......... Mic-E no message capability # 'lllc/s$/......... Mic-E no message capability
# 'lllc/s$/>........ Mic-E message capability # 'lllc/s$/>........ Mic-E message capability
# `lllc/s$/>........ Mic-E old posit # `lllc/s$/>........ Mic-E old posit
def _parse_mice(dstcall, body): def parse_mice(dstcall, body):
parsed = {'format': 'mic-e'} parsed = {'format': 'mic-e'}
dstcall = dstcall.split('-')[0] dstcall = dstcall.split('-')[0]
@ -208,11 +208,11 @@ def _parse_mice(dstcall, body):
body = body + extra body = body + extra
# attempt to parse comment telemetry # attempt to parse comment telemetry
body, telemetry = _parse_comment_telemetry(body) body, telemetry = parse_comment_telemetry(body)
parsed.update(telemetry) parsed.update(telemetry)
# parse DAO extention # parse DAO extention
body = _parse_dao(body, parsed) body = parse_dao(body, parsed)
# rest is a comment # rest is a comment
parsed.update({'comment': body.strip(' ')}) parsed.update({'comment': body.strip(' ')})

View File

@ -1,11 +1,11 @@
import re import re
from aprslib.exceptions import ParseError from aprslib.exceptions import ParseError
from aprslib.parsing.common import _parse_timestamp from aprslib.parsing.common import parse_timestamp
__all__ = [ __all__ = [
'_parse_status', 'parse_status',
'_parse_invalid', 'parse_invalid',
'_parse_user_defined', 'parse_user_defined',
] ]
@ -13,8 +13,8 @@ __all__ = [
# #
# >DDHHMMzComments # >DDHHMMzComments
# >Comments # >Comments
def _parse_status(packet_type, body): def parse_status(packet_type, body):
body, result = _parse_timestamp(body, packet_type) body, result = parse_timestamp(body, packet_type)
result.update({ result.update({
'format': 'status', 'format': 'status',
@ -27,7 +27,7 @@ def _parse_status(packet_type, body):
# INVALID # INVALID
# #
# ,......................... # ,.........................
def _parse_invalid(body): def parse_invalid(body):
return ('', { return ('', {
'format': 'invalid', 'format': 'invalid',
'body': body 'body': body
@ -38,7 +38,7 @@ def _parse_invalid(body):
# #
# {A1................ # {A1................
# {{................. # {{.................
def _parse_user_defined(body): def parse_user_defined(body):
return ('', { return ('', {
'format': 'user-defined', 'format': 'user-defined',
'id': body[0], 'id': body[0],

View File

@ -3,16 +3,16 @@ import re
from aprslib import base91 from aprslib import base91
from aprslib.exceptions import ParseError from aprslib.exceptions import ParseError
from aprslib.parsing import logger from aprslib.parsing import logger
from aprslib.parsing.common import _parse_timestamp, _parse_comment from aprslib.parsing.common import parse_timestamp, parse_comment
from aprslib.parsing.weather import _parse_weather_data from aprslib.parsing.weather import parse_weather_data
__all__ = [ __all__ = [
'_parse_position', 'parse_position',
'_parse_compressed', 'parse_compressed',
'_parse_normal', 'parse_normal',
] ]
def _parse_position(packet_type, body): def parse_position(packet_type, body):
parsed = {} parsed = {}
if packet_type not in '!=/@;': if packet_type not in '!=/@;':
@ -37,20 +37,20 @@ def _parse_position(packet_type, body):
# decode timestamp # decode timestamp
if packet_type in "/@;": if packet_type in "/@;":
body, result = _parse_timestamp(body, packet_type) body, result = parse_timestamp(body, packet_type)
parsed.update(result) parsed.update(result)
if len(body) == 0 and 'timestamp' in parsed: if len(body) == 0 and 'timestamp' in parsed:
raise ParseError("invalid position report format", packet) raise ParseError("invalid position report format", packet)
# decode body # decode body
body, result = _parse_compressed(body) body, result = parse_compressed(body)
parsed.update(result) parsed.update(result)
if len(result) > 0: if len(result) > 0:
logger.debug("Parsed as compressed position report") logger.debug("Parsed as compressed position report")
else: else:
body, result = _parse_normal(body) body, result = parse_normal(body)
parsed.update(result) parsed.update(result)
if len(result) > 0: if len(result) > 0:
@ -61,14 +61,14 @@ def _parse_position(packet_type, body):
# Page 62 of the spec # Page 62 of the spec
if parsed['symbol'] == '_': if parsed['symbol'] == '_':
logger.debug("Attempting to parse weather report from comment") logger.debug("Attempting to parse weather report from comment")
body, result = _parse_weather_data(body) body, result = parse_weather_data(body)
parsed.update({ parsed.update({
'comment': body.strip(' '), 'comment': body.strip(' '),
'weather': result, 'weather': result,
}) })
else: else:
# decode comment # decode comment
_parse_comment(body, parsed) parse_comment(body, parsed)
if packet_type == ';': if packet_type == ';':
parsed.update({ parsed.update({
@ -78,7 +78,7 @@ def _parse_position(packet_type, body):
return ('', parsed) return ('', parsed)
def _parse_compressed(body): def parse_compressed(body):
parsed = {} parsed = {}
if re.match(r"^[\/\\A-Za-j][!-|]{8}[!-{}][ -|]{3}", body): if re.match(r"^[\/\\A-Za-j][!-|]{8}[!-{}][ -|]{3}", body):
@ -129,7 +129,7 @@ def _parse_compressed(body):
return (body, parsed) return (body, parsed)
def _parse_normal(body): def parse_normal(body):
parsed = {} parsed = {}
match = re.findall(r"^(\d{2})([0-9 ]{2}\.[0-9 ]{2})([NnSs])([\/\\0-9A-Z])" match = re.findall(r"^(\d{2})([0-9 ]{2}\.[0-9 ]{2})([NnSs])([\/\\0-9A-Z])"

View File

@ -4,12 +4,12 @@ from aprslib.exceptions import ParseError
from aprslib.parsing import logger from aprslib.parsing import logger
__all__ = [ __all__ = [
'_parse_comment_telemetry', 'parse_comment_telemetry',
'_parse_telemetry_config', 'parse_telemetry_config',
] ]
def _parse_comment_telemetry(text): def parse_comment_telemetry(text):
""" """
Looks for base91 telemetry found in comment field Looks for base91 telemetry found in comment field
Returns [remaining_text, telemetry] Returns [remaining_text, telemetry]
@ -40,7 +40,7 @@ def _parse_comment_telemetry(text):
return (text, parsed) return (text, parsed)
def _parse_telemetry_config(body): def parse_telemetry_config(body):
parsed = {} parsed = {}
match = re.findall(r"^(PARM|UNIT|EQNS|BITS)\.(.*)$", body) match = re.findall(r"^(PARM|UNIT|EQNS|BITS)\.(.*)$", body)

View File

@ -2,8 +2,8 @@ import re
from aprslib.exceptions import ParseError from aprslib.exceptions import ParseError
__all__ = [ __all__ = [
'_parse_weather', 'parse_weather',
'_parse_weather_data', 'parse_weather_data',
] ]
# constants # constants
@ -41,7 +41,7 @@ val_map = {
'#': lambda x: int(x), '#': lambda x: int(x),
} }
def _parse_weather_data(body): def parse_weather_data(body):
parsed = {} parsed = {}
# parse weather data # parse weather data
@ -58,12 +58,12 @@ def _parse_weather_data(body):
return (body, parsed) return (body, parsed)
def _parse_weather(body): def parse_weather(body):
match = re.match("^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body) match = re.match("^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
if not match: if not match:
raise ParseError("invalid positionless weather report format") raise ParseError("invalid positionless weather report format")
comment, weather = _parse_weather_data(body[8:]) comment, weather = parse_weather_data(body[8:])
parsed = { parsed = {
'format': 'wx', 'format': 'wx',

View File

@ -53,7 +53,7 @@ class ParseTestCase(unittest.TestCase):
def test_empty_body(self): def test_empty_body(self):
self.assertRaises(ParseError, parse, "A>B:") self.assertRaises(ParseError, parse, "A>B:")
def test_parse_header_exception(self): def testparse_header_exception(self):
self.assertRaises(ParseError, parse, "A:asd") self.assertRaises(ParseError, parse, "A:asd")
def test_empty_body_of_format_that_is_not_status(self): def test_empty_body_of_format_that_is_not_status(self):
@ -104,9 +104,9 @@ class ParseBranchesTestCase(unittest.TestCase):
self.assertEqual(result, expected) self.assertEqual(result, expected)
def test_mice_format_branch(self): def test_mice_format_branch(self):
self.m.StubOutWithMock(parsing, "_parse_mice") self.m.StubOutWithMock(parsing, "parse_mice")
parsing._parse_mice("B", "test").AndReturn(('', {'format': ''})) parsing.parse_mice("B", "test").AndReturn(('', {'format': ''}))
parsing._parse_mice("D", "test").AndReturn(('', {'format': ''})) parsing.parse_mice("D", "test").AndReturn(('', {'format': ''}))
self.m.ReplayAll() self.m.ReplayAll()
parse("A>B:`test") parse("A>B:`test")
@ -115,8 +115,8 @@ class ParseBranchesTestCase(unittest.TestCase):
self.m.VerifyAll() self.m.VerifyAll()
def test_message_format_branch(self): def test_message_format_branch(self):
self.m.StubOutWithMock(parsing, "_parse_message") self.m.StubOutWithMock(parsing, "parse_message")
parsing._parse_message("test").AndReturn(('', {'format': ''})) parsing.parse_message("test").AndReturn(('', {'format': ''}))
self.m.ReplayAll() self.m.ReplayAll()
parse("A>B::test") parse("A>B::test")

View File

@ -1,6 +1,6 @@
import unittest2 as unittest import unittest2 as unittest
from aprslib.parsing import _parse_comment_telemetry from aprslib.parsing import parse_comment_telemetry
from aprslib import base91 from aprslib import base91
from random import randint from random import randint
@ -52,7 +52,7 @@ class ParseCommentTelemetry(unittest.TestCase):
testData = self.genTelem(i, vals, bits) testData = self.genTelem(i, vals, bits)
extra, resData = _parse_comment_telemetry(self.telemString(testData)) extra, resData = parse_comment_telemetry(self.telemString(testData))
resData = resData['telemetry'] resData = resData['telemetry']
# clean up extra data, so we can compare # clean up extra data, so we can compare
@ -79,7 +79,7 @@ class ParseCommentTelemetry(unittest.TestCase):
] ]
for datum in testData: for datum in testData:
extra, telem = _parse_comment_telemetry("".join(datum)) extra, telem = parse_comment_telemetry("".join(datum))
self.assertEqual(datum[0]+datum[2], extra) self.assertEqual(datum[0]+datum[2], extra)
@ -107,12 +107,12 @@ class ParseCommentTelemetry(unittest.TestCase):
] ]
for datum in testData: for datum in testData:
extra, telem = _parse_comment_telemetry(datum) extra, telem = parse_comment_telemetry(datum)
self.assertEqual(datum, extra) self.assertEqual(datum, extra)
def test_output_format(self): def test_output_format(self):
parsedOutput = _parse_comment_telemetry("|aabb|") parsedOutput = parse_comment_telemetry("|aabb|")
self.assertTrue(isinstance(parsedOutput, tuple)) self.assertTrue(isinstance(parsedOutput, tuple))
self.assertTrue(len(parsedOutput) == 2) self.assertTrue(len(parsedOutput) == 2)

View File

@ -2,17 +2,17 @@ import unittest2 as unittest
import string import string
from random import randint, randrange, sample from random import randint, randrange, sample
from aprslib.parsing import _parse_header from aprslib.parsing import parse_header
from aprslib.parsing import _validate_callsign from aprslib.parsing import validate_callsign
from aprslib.exceptions import ParseError from aprslib.exceptions import ParseError
class ValidateCallsign(unittest.TestCase): class ValidateCallsign(unittest.TestCase):
def test_valid_input(self): def testvalid_input(self):
chars = string.ascii_letters.upper() + string.digits chars = string.ascii_letters.upper() + string.digits
def random_valid_callsigns(): def randomvalid_callsigns():
for x in range(0, 500): for x in range(0, 500):
call = "".join(sample(chars, randrange(1, 6))) call = "".join(sample(chars, randrange(1, 6)))
@ -21,13 +21,13 @@ class ValidateCallsign(unittest.TestCase):
yield call yield call
for call in random_valid_callsigns(): for call in randomvalid_callsigns():
try: try:
_validate_callsign(call) validate_callsign(call)
except ParseError: except ParseError:
self.fail( self.fail(
"%s('%s') raised ParseError" % "%s('%s') raised ParseError" %
(_validate_callsign.__name__, call) (validate_callsign.__name__, call)
) )
def test_invalid_input(self): def test_invalid_input(self):
@ -43,12 +43,12 @@ class ValidateCallsign(unittest.TestCase):
] ]
for call in testData: for call in testData:
self.assertRaises(ParseError, _validate_callsign, call) self.assertRaises(ParseError, validate_callsign, call)
class ParseHeader(unittest.TestCase): class ParseHeader(unittest.TestCase):
def test_valid_input_and_format(self): def testvalid_input_and_format(self):
# empty path header # empty path header
expected = { expected = {
"from": "A", "from": "A",
@ -56,7 +56,7 @@ class ParseHeader(unittest.TestCase):
"via": "", "via": "",
"path": [] "path": []
} }
result = _parse_header("A>B") result = parse_header("A>B")
self.assertEqual(expected, result) self.assertEqual(expected, result)
@ -67,7 +67,7 @@ class ParseHeader(unittest.TestCase):
"via": "", "via": "",
"path": list('CDE') "path": list('CDE')
} }
result2 = _parse_header("A>B,C,D,E") result2 = parse_header("A>B,C,D,E")
self.assertEqual(expected2, result2) self.assertEqual(expected2, result2)
@ -78,7 +78,7 @@ class ParseHeader(unittest.TestCase):
"via": "E", "via": "E",
"path": ['C', 'D', 'qAR', 'E'] "path": ['C', 'D', 'qAR', 'E']
} }
result3 = _parse_header("A>B,C,D,qAR,E") result3 = parse_header("A>B,C,D,qAR,E")
self.assertEqual(expected3, result3) self.assertEqual(expected3, result3)
@ -89,11 +89,11 @@ class ParseHeader(unittest.TestCase):
"via": "C", "via": "C",
"path": [qCon, 'C'] "path": [qCon, 'C']
} }
result4 = _parse_header("A>B,%s,C" % qCon) result4 = parse_header("A>B,%s,C" % qCon)
self.assertEqual(expected4, result4) self.assertEqual(expected4, result4)
def test_valid_fromcallsigns(self): def testvalid_fromcallsigns(self):
testData = [ testData = [
"A>CALL", "A>CALL",
"4>CALL", "4>CALL",
@ -117,10 +117,10 @@ class ParseHeader(unittest.TestCase):
for head in testData: for head in testData:
try: try:
_parse_header(head) parse_header(head)
except ParseError as msg: except ParseError as msg:
self.fail("{0}('{1}') PraseError, {2}" self.fail("{0}('{1}') PraseError, {2}"
.format(_parse_header.__name__, head, msg)) .format(parse_header.__name__, head, msg))
def test_invalid_format(self): def test_invalid_format(self):
testData = [ testData = [
@ -144,9 +144,9 @@ class ParseHeader(unittest.TestCase):
for head in testData: for head in testData:
try: try:
_parse_header(head) parse_header(head)
self.fail("{0} didn't raise exception for: {1}" self.fail("{0} didn't raise exception for: {1}"
.format(_parse_header.__name__, head)) .format(parse_header.__name__, head))
except ParseError: except ParseError:
continue continue

View File

@ -1,6 +1,6 @@
import unittest2 as unittest import unittest2 as unittest
from aprslib.parsing import _parse_weather_data from aprslib.parsing import parse_weather_data
from aprslib.parsing import parse from aprslib.parsing import parse
wind_multiplier = 0.44704 wind_multiplier = 0.44704
@ -16,7 +16,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": (9 * wind_multiplier), "wind_speed": (9 * wind_multiplier),
"wind_direction": 9 "wind_direction": 9
} }
result = _parse_weather_data("009/009") result = parse_weather_data("009/009")
self.assertEqual(expected, result) self.assertEqual(expected, result)
# Daft result but possible # Daft result but possible
@ -24,7 +24,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": (999 * wind_multiplier), "wind_speed": (999 * wind_multiplier),
"wind_direction": 999 "wind_direction": 999
} }
result = _parse_weather_data("999/999") result = parse_weather_data("999/999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
#Positionless packet #Positionless packet
@ -32,7 +32,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": float(9 * wind_multiplier), "wind_speed": float(9 * wind_multiplier),
"wind_direction": 9 "wind_direction": 9
} }
result = _parse_weather_data("c009s009") result = parse_weather_data("c009s009")
self.assertEqual(expected, result) self.assertEqual(expected, result)
# Daft result but possible # Daft result but possible
@ -40,7 +40,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": (999 * wind_multiplier), "wind_speed": (999 * wind_multiplier),
"wind_direction": 999 "wind_direction": 999
} }
result = _parse_weather_data("c999s999") result = parse_weather_data("c999s999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_temp(self): def test_temp(self):
@ -48,150 +48,150 @@ class ParseCommentWeather(unittest.TestCase):
expected = "", { expected = "", {
"temperature": float((-99.0 - 32) / 1.8) "temperature": float((-99.0 - 32) / 1.8)
} }
result = _parse_weather_data("t-99") result = parse_weather_data("t-99")
self.assertEqual(expected, result) self.assertEqual(expected, result)
# Misc # Misc
expected = "", { expected = "", {
"temperature": -40.0 "temperature": -40.0
} }
result = _parse_weather_data("t-40") result = parse_weather_data("t-40")
self.assertEqual(expected, result) self.assertEqual(expected, result)
# Zero F # Zero F
expected = "", { expected = "", {
"temperature": -17.77777777777778 "temperature": -17.77777777777778
} }
result = _parse_weather_data("t000") result = parse_weather_data("t000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
# Daft, but possible # Daft, but possible
expected = "", { expected = "", {
"temperature": 537.2222222222222 "temperature": 537.2222222222222
} }
result = _parse_weather_data("t999") result = parse_weather_data("t999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_rain_1h(self): def test_rain_1h(self):
expected = "", { expected = "", {
"rain_1h": 0.0 "rain_1h": 0.0
} }
result = _parse_weather_data("r000") result = parse_weather_data("r000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"rain_1h": float(999 * mm_multiplier) "rain_1h": float(999 * mm_multiplier)
} }
result = _parse_weather_data("r999") result = parse_weather_data("r999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_rain_24h(self): def test_rain_24h(self):
expected = "", { expected = "", {
"rain_24h": 0.0 "rain_24h": 0.0
} }
result = _parse_weather_data("p000") result = parse_weather_data("p000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"rain_24h": float(999 * mm_multiplier) "rain_24h": float(999 * mm_multiplier)
} }
result = _parse_weather_data("p999") result = parse_weather_data("p999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_rain_since_midnight(self): def test_rain_since_midnight(self):
expected = "", { expected = "", {
"rain_since_midnight": 0.0 "rain_since_midnight": 0.0
} }
result = _parse_weather_data("P000") result = parse_weather_data("P000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"rain_since_midnight": float(999 * mm_multiplier) "rain_since_midnight": float(999 * mm_multiplier)
} }
result = _parse_weather_data("P999") result = parse_weather_data("P999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_humidity(self): def test_humidity(self):
expected = "", { expected = "", {
"humidity": 0.0 "humidity": 0.0
} }
result = _parse_weather_data("h00") result = parse_weather_data("h00")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"humidity": 99 "humidity": 99
} }
result = _parse_weather_data("h99") result = parse_weather_data("h99")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_pressure(self): def test_pressure(self):
expected = "", { expected = "", {
"pressure": 0.0 "pressure": 0.0
} }
result = _parse_weather_data("b00000") result = parse_weather_data("b00000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"pressure": 9999.9 "pressure": 9999.9
} }
result = _parse_weather_data("b99999") result = parse_weather_data("b99999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_luminosity(self): def test_luminosity(self):
expected = "", { expected = "", {
"luminosity": 000 "luminosity": 000
} }
result = _parse_weather_data("L000") result = parse_weather_data("L000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"luminosity": 999 "luminosity": 999
} }
result = _parse_weather_data("L999") result = parse_weather_data("L999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"luminosity": 1123 "luminosity": 1123
} }
result = _parse_weather_data("l123") result = parse_weather_data("l123")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"luminosity": 1999 "luminosity": 1999
} }
result = _parse_weather_data("l999") result = parse_weather_data("l999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_snow(self): def test_snow(self):
expected = "", { expected = "", {
"snow": 000 "snow": 000
} }
result = _parse_weather_data("s...s000") result = parse_weather_data("s...s000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"snow": float(5.5 * 25.4) "snow": float(5.5 * 25.4)
} }
result = _parse_weather_data("s...s5.5") result = parse_weather_data("s...s5.5")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"snow": float(999 * 25.4) "snow": float(999 * 25.4)
} }
result = _parse_weather_data("s...s999") result = parse_weather_data("s...s999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
def test_rain_raw(self): def test_rain_raw(self):
expected = "", { expected = "", {
"rain_raw": 000 "rain_raw": 000
} }
result = _parse_weather_data("#000") result = parse_weather_data("#000")
self.assertEqual(expected, result) self.assertEqual(expected, result)
expected = "", { expected = "", {
"rain_raw": 999 "rain_raw": 999
} }
result = _parse_weather_data("#999") result = parse_weather_data("#999")
self.assertEqual(expected, result) self.assertEqual(expected, result)
# Not possible in real world (rain and snow measurements) # Not possible in real world (rain and snow measurements)
@ -253,7 +253,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": 1 * wind_multiplier "wind_speed": 1 * wind_multiplier
} }
result = _parse_weather_data("319/001g004t048r...p P000h19b10294eCumulusWMR100") result = parse_weather_data("319/001g004t048r...p P000h19b10294eCumulusWMR100")
self.assertEqual(expected, result) self.assertEqual(expected, result)
if __name__ == '__main__': if __name__ == '__main__':