Compare commits

..

1 Commits

Author SHA1 Message Date
Rossen 0f16217fd8
just a test 2021-11-24 23:00:35 +00:00
20 changed files with 107 additions and 657 deletions

2
.gitignore vendored
View File

@ -3,5 +3,3 @@
*.pyc
dist
*.egg-info
env3
env2

10
.scrutinizer.yml Normal file
View File

@ -0,0 +1,10 @@
checks:
python:
code_rating: true
filter:
excluded_paths:
- 'tests/*'
tools:
external_code_coverage:
timeout: 300
runs: 9

View File

@ -1,4 +0,0 @@
sonar.sources=aprslib
sonar.tests=tests
sonar.sourceEncoding=UTF-8

View File

@ -41,4 +41,5 @@ dist: clean
python setup.py bdist_wheel --universal
upload: dist
python setup.py register -r pypi
twine upload -r pypi dist/*

View File

@ -41,8 +41,8 @@ from datetime import date as _date
__date__ = str(_date.today())
del _date
__version__ = "0.7.2"
version_info = (0, 7, 2)
__version__ = "0.6.47"
version_info = (0, 6, 47)
__author__ = "Rossen Georgiev"
__all__ = ['IS', 'parse', 'passcode']

View File

@ -333,13 +333,11 @@ class IS(object):
self.logger.error("socket.recv(): returned empty")
raise ConnectionDrop("connection dropped")
except socket.error as e:
# ignore error when blocking=false, and we attempt to read empty socket
if ("Resource temporarily unavailable" in str(e)
and not blocking
and len(self.buf) == 0):
break
else:
self.logger.error("socket error on recv(): %s" % str(e))
self.logger.error("socket error on recv(): %s" % str(e))
if "Resource temporarily unavailable" in str(e):
if not blocking:
if len(self.buf) == 0:
break
self.buf += short_buf

View File

@ -41,28 +41,8 @@ from aprslib.parsing.position import *
from aprslib.parsing.mice import *
from aprslib.parsing.message import *
from aprslib.parsing.telemetry import *
from aprslib.parsing.thirdparty import *
from aprslib.parsing.weather import *
unsupported_formats = {
'#':'raw weather report',
'$':'raw gps',
'%':'agrelo',
'&':'reserved',
'(':'unused',
')':'item report',
'*':'complete weather report',
'+':'reserved',
'-':'unused',
'.':'reserved',
'<':'station capabilities',
'?':'general query format',
'T':'telemetry report',
'[':'maidenhead locator beacon',
'\\':'unused',
']':'unused',
'^':'unused',
}
def _unicode_packet(packet):
# attempt utf-8
@ -158,13 +138,28 @@ def parse(packet):
def _try_toparse_body(packet_type, body, parsed):
result = {}
if packet_type in unsupported_formats:
raise UnknownFormat("Format is not supported: '{}' {}".format(packet_type, unsupported_formats[packet_type]))
# 3rd party traffic
elif packet_type == '}':
logger.debug("Packet is third-party")
body, result = parse_thirdparty(body)
# NOT SUPPORTED FORMATS
#
# # - raw weather report
# $ - raw gps
# % - agrelo
# & - reserved
# ( - unused
# ) - item report
# * - complete weather report
# + - reserved
# - - unused
# . - reserved
# < - station capabilities
# ? - general query format
# T - telemetry report
# [ - maidenhead locator beacon
# \ - unused
# ] - unused
# ^ - unused
# } - 3rd party traffic
if packet_type in '#$%)*<?T[}':
raise UnknownFormat("format is not supported")
# user defined
elif packet_type == ',':

View File

@ -1,5 +1,4 @@
import re
from math import sqrt
from datetime import datetime
from aprslib import base91
from aprslib.exceptions import ParseError
@ -134,26 +133,17 @@ def parse_comment(body, parsed):
def parse_data_extentions(body):
parsed = {}
match = re.findall(r"^([0-9 .]{3})/([0-9 .]{3})", body)
# course speed bearing nrq
# Page 27 of the spec
# format: 111/222/333/444text
match = re.findall(r"^([0-9 \.]{3})/([0-9 \.]{3})", body)
if match:
cse, spd = match[0]
body = body[7:]
if cse.isdigit() and cse != "000":
parsed.update({'course': int(cse) if 1 <= int(cse) <= 360 else 0})
if spd.isdigit() and spd != "000":
parsed.update({'course': int(cse) if cse.isdigit() and 1 <= int(cse) <= 360 else 0})
if spd.isdigit():
parsed.update({'speed': int(spd)*1.852})
# DF Report format
# Page 29 of teh spec
match = re.findall(r"^/([0-9 \.]{3})/([0-9 \.]{3})", body)
match = re.findall(r"^/([0-9 .]{3})/([0-9 .]{3})", body)
if match:
# cse=000 means stations is fixed, Page 29 of the spec
if cse == '000':
parsed.update({'course': 0})
brg, nrq = match[0]
body = body[8:]
if brg.isdigit():
@ -161,45 +151,17 @@ def parse_data_extentions(body):
if nrq.isdigit():
parsed.update({'nrq': int(nrq)})
else:
# PHG format: PHGabcd....
# RHGR format: RHGabcdr/....
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d)([0-9A-Z]\/)?)", body)
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))", body)
if match:
ext, phg, phgr = match[0]
ext, phg = match[0]
body = body[len(ext):]
parsed.update({
'phg': phg,
'phg_power': int(phg[0]) ** 2, # watts
'phg_height': (10 * (2 ** (ord(phg[1]) - 0x30))) * 0.3048, # in meters
'phg_gain': 10 ** (int(phg[2]) / 10.0), # dB
})
phg_dir = int(phg[3])
if phg_dir == 0:
phg_dir = 'omni'
elif phg_dir == 9:
phg_dir = 'invalid'
else:
phg_dir = 45 * phg_dir
parsed['phg_dir'] = phg_dir
# range in km
parsed['phg_range'] = sqrt(2 * (parsed['phg_height'] / 0.3048)
* sqrt((parsed['phg_power'] / 10.0)
* (parsed['phg_gain'] / 2.0)
)
) * 1.60934
if phgr:
# PHG rate per hour
parsed['phg'] += phgr[0]
parsed.update({'phg_rate': int(phgr[0], 16)}) # as decimal
parsed.update({'phg': phg})
else:
match = re.findall(r"^RNG(\d{4})", body)
if match:
rng = match[0]
body = body[7:]
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
match = re.findall(r"^RNG(\d{4})", body)
if match:
rng = match[0]
body = body[7:]
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
return body, parsed

View File

@ -69,66 +69,28 @@ def parse_message(body):
break
# regular message
# ---------------------------
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
else:
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
# APRS supports two different message formats:
# - the standard format which is described in 'aprs101.pdf':
# http://www.aprs.org/doc/APRS101.PDF
# - an addendum from 1999 which introduces a new format:
# http://www.aprs.org/aprs11/replyacks.txt
#
# A message (ack/rej as well as a standard msg text body) can either have:
# - no message number at all
# - a message number in the old format (1..5 characters / digits)
# - a message number in the new format (2 characters / digits) without trailing 'ack msg no'
# - a message number in the new format with trailing 'free ack msg no' (2 characters / digits)
match = re.search(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
if match:
parsed.update({
'response': match.group(1),
'msgNo': match.group(2),
})
else:
body = body[0:70]
# ack / rej
# ---------------------------
# NEW REPLAY-ACK
# format: :AAAABBBBC:ackMM}AA
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
if match:
parsed['response'], parsed['msgNo'], ackMsgNo = match[0]
if ackMsgNo:
parsed['ackMsgNo'] = ackMsgNo
break
match = re.search(r"{([A-Za-z0-9]{1,5})$", body)
if match:
msgNo = match.group(1)
body = body[:len(body) - 1 - len(msgNo)]
# ack/rej standard format as per aprs101.pdf chapter 14
# format: :AAAABBBBC:ack12345
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
if match:
parsed['response'], parsed['msgNo'] = match[0]
break
parsed.update({'msgNo': msgNo})
# regular message body parser
# ---------------------------
parsed['message_text'] = body.strip(' ')
parsed.update({'message_text': body.strip(' ')})
# check for ACKs
# new message format: http://www.aprs.org/aprs11/replyacks.txt
# format: :AAAABBBBC:text.....{MM}AA
match = re.findall(r"{([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
if match:
msgNo, ackMsgNo = match[0]
parsed['message_text'] = body[:len(body) - 4 - len(ackMsgNo)].strip(' ')
parsed['msgNo'] = msgNo
if ackMsgNo:
parsed['ackMsgNo'] = ackMsgNo
break
# old message format - see aprs101.pdf.
# search for: msgNo present
match = re.findall(r"{([A-Za-z0-9]{1,5})$", body)
if match:
msgNo = match[0]
parsed['message_text'] = body[:len(body) - 1 - len(msgNo)].strip(' ')
parsed['msgNo'] = msgNo
break
# break free from the eternal 'while'
break
return ('', parsed)

View File

@ -3,7 +3,7 @@ import re
from aprslib import base91
from aprslib.exceptions import ParseError
from aprslib.parsing import logger
from aprslib.parsing.common import parse_timestamp, parse_comment, parse_data_extentions
from aprslib.parsing.common import parse_timestamp, parse_comment
from aprslib.parsing.weather import parse_weather_data
__all__ = [
@ -60,11 +60,6 @@ def parse_position(packet_type, body):
# check comment for weather information
# Page 62 of the spec
if parsed['symbol'] == '_':
# attempt to parse winddir/speed
# Page 92 of the spec
body, result = parse_data_extentions(body)
parsed.update(result)
logger.debug("Attempting to parse weather report from comment")
body, result = parse_weather_data(body)
parsed.update({

View File

@ -1,21 +0,0 @@
import re
from aprslib.parsing.__init__ import parse
from aprslib.exceptions import UnknownFormat
from aprslib.exceptions import ParseError
__all__ = [
'parse_thirdparty',
]
def parse_thirdparty(body):
parsed = {'format':'thirdparty'}
# Parse sub-packet
try:
subpacket = parse(body)
except (UnknownFormat,ParseError) as ukf:
raise
parsed.update({'subpacket':subpacket})
return('',parsed)

View File

@ -14,7 +14,7 @@ key_map = {
'g': 'wind_gust',
'c': 'wind_direction',
't': 'temperature',
's': 'wind_speed',
'S': 'wind_speed',
'r': 'rain_1h',
'p': 'rain_24h',
'P': 'rain_since_midnight',
@ -22,22 +22,22 @@ key_map = {
'b': 'pressure',
'l': 'luminosity',
'L': 'luminosity',
'S': 'snow',
's': 'snow',
'#': 'rain_raw',
}
val_map = {
'g': lambda x: int(x) * wind_multiplier,
'c': lambda x: int(x),
's': lambda x: int(x) * wind_multiplier,
'S': lambda x: int(x) * wind_multiplier,
't': lambda x: (float(x) - 32) / 1.8,
'r': lambda x: int(x) * rain_multiplier,
'p': lambda x: int(x) * rain_multiplier,
'P': lambda x: int(x) * rain_multiplier,
'h': lambda x: 100 if int(x) == 0 else int(x),
'h': lambda x: int(x),
'b': lambda x: float(x) / 10,
'l': lambda x: int(x) + 1000,
'L': lambda x: int(x),
'S': lambda x: float(x) * 25.4,
's': lambda x: float(x) * 25.4,
'#': lambda x: int(x),
}
@ -46,24 +46,20 @@ def parse_weather_data(body):
# parse weather data
body = re.sub(r"^([0-9]{3})/([0-9]{3})", "c\\1s\\2", body)
#body = body.replace('s', 'S', 1)
body = body.replace('s', 'S', 1)
# match as many parameters from the start, rest is comment
data = re.match(r"^([csgtrpPlLS#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})+", body)
data = re.findall(r"([cSgtrpPlLs#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", body)
data = map(lambda x: (key_map[x[0]] , val_map[x[0]](x[1:])), data)
if data:
data = data.group()
# split out data from comment
body = body[len(data):]
# parse all weather parameters
data = re.findall(r"([csgtrpPlLS#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", data)
data = map(lambda x: (key_map[x[0]] , val_map[x[0]](x[1:])), data)
parsed.update(dict(data))
parsed.update(dict(data))
# strip weather data
body = re.sub(r"([cSgtrpPlLs#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})", '', body)
return (body, parsed)
def parse_weather(body):
match = re.match(r"^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
match = re.match("^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
if not match:
raise ParseError("invalid positionless weather report format")

View File

@ -1,6 +1,4 @@
twine
wheel
mox3
coverage>=5.0; python_version == '2.7' or python_version >= '3.5'

View File

@ -0,0 +1 @@

View File

@ -65,8 +65,8 @@ class ParseTestCase(unittest.TestCase):
self.fail("empty status packet shouldn't raise exception")
def test_unsupported_formats_raising(self):
for packet_type in parsing.unsupported_formats:
with self.assertRaises(UnknownFormat):
with self.assertRaises(UnknownFormat):
for packet_type in '#$%)*,<?T[_{}':
packet = "A>B:%saaa" % packet_type
try:

View File

@ -256,47 +256,30 @@ class DataExtentionsTC(unittest.TestCase):
'speed': 100*1.852,
})
def test_course_speed_spaces(self):
def test_empty_course_speed(self):
body = " / /text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {})
self.assertEqual(parsed, {
'course': 0,
})
def test_course_speed_dots(self):
body = ".../.../text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {})
self.assertEqual(parsed, {
'course': 0,
})
def test_course_speed_zeros(self):
body = "000/000/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {})
def test_course_speed_valid_chars_but_invalid_values(self):
body = "22./33 /text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {})
def test_course_speed_invalid_chars_spd(self):
body = "222/33a/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '222/33a/text')
self.assertEqual(parsed, {})
def test_course_speed_invalid_chars_cse(self):
body = "22a/333/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '22a/333/text')
self.assertEqual(parsed, {})
self.assertEqual(parsed, {
'course': 0,
})
def test_empty_bearing_nrq(self):
body = "111/100/ /...text"
@ -317,17 +300,6 @@ class DataExtentionsTC(unittest.TestCase):
'speed': 100*1.852,
})
def test_course_speed_bearing_nrq_empty_cse_speed(self):
body = "000/000/234/345text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'course': 0,
'bearing': 234,
'nrq': 345,
})
def test_course_speed_bearing_nrq(self):
body = "123/100/234/345text"
remaining, parsed = parse_data_extentions(body)
@ -340,78 +312,23 @@ class DataExtentionsTC(unittest.TestCase):
'nrq': 345,
})
def test_PHGR_1(self):
body = "PHG51325/text"
def test_PHG(self):
body = "PHG1234Atext"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'phg': '51325',
'phg_rate': 5,
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
'phg': '1234A',
})
def test_PHGR_2(self):
body = "PHG5132F/text"
body = "PHG1234text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'phg': '5132F',
'phg_rate': 15,
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
'phg': '1234',
})
def test_PHG_1(self):
body = "PHG5132Atext"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'Atext')
self.assertEqual(parsed, {
'phg': '5132',
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
})
def test_PHG_2(self):
body = "PHG5132text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'phg': '5132',
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
})
def test_PHG_dir_omni(self):
body = "PHG0000text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed['phg_dir'], 'omni')
def test_PHG_dir_invalid(self):
body = "PHG0009text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed['phg_dir'], 'invalid')
def test_range(self):
body = "RNG1000text"
remaining, parsed = parse_data_extentions(body)

View File

@ -1,124 +0,0 @@
import unittest
from aprslib.parsing.message import parse_message
# ack/rej assertion tests
class AckRejTests(unittest.TestCase):
# errorneus rej
def test_errorneus_rej(self):
unparsed, result = parse_message("WXBOT :red12345")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'red12345',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# reject with "old" msgno
def test_reject_old_msgno(self):
unparsed, result = parse_message("WXBOT :rej123")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'msgNo': '123',
'response': 'rej'
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# ack with new msgNo but no ackMsgNo
def test_ack_new_msgno_but_no_ack_msgno(self):
unparsed, result = parse_message("WXBOT :ackAB}")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'response': 'ack',
'msgNo': 'AB',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# ack with new msgNo and ackMsgNo
def test_ack_new_msgno_and_ackmsgno(self):
unparsed, result = parse_message("WXBOT :ackAB}CD")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'response': 'ack',
'msgNo': 'AB',
'ackMsgNo': 'CD',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message text body tests
class MessageTestBodyTests(unittest.TestCase):
# message body without msg no
def test_message_without_msgno(self):
unparsed, result = parse_message("WXBOT :HelloWorld ")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msg no - old format
def test_message_body_with_no_msgno_oldformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {ABCDE")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'ABCDE',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msgNo (new format) and ackMsgNo missing
def test_message_body_with_msgno_and_ackmsgno_missing_newformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {AB}")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'AB',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msgNo and ackMsgNo (new format)
def test_message_body_with_msgno_and_ackmsgno_newformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {AB}CD")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'AB',
'ackMsgNo': 'CD',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with really long message
def test_message_body_with_long_message(self):
unparsed, result = parse_message("WXBOT :00000000001111111111222222222233333333334444444444555555555566666666667777777777")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': '00000000001111111111222222222233333333334444444444555555555566666666667777777777',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)

View File

@ -1,170 +0,0 @@
import unittest
from datetime import datetime
from aprslib.parsing import parse_position
class ParsePositionDataExtAndWeather(unittest.TestCase):
def setUp(self):
self.maxDiff = None
def timestamp_from_partial(self, day, hour, minute):
now = datetime.now()
corrected = now.replace(day=day, hour=hour, minute=minute, second=0, microsecond=0)
return int((corrected - datetime(1970, 1, 1)).total_seconds())
def test_position_packet_only_weather_valid(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_data_ext_and_weather_valid(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_090/001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'course': 90,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_speed(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_090/...g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'course': 90,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_ /001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_speed_and_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_.../...g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_ /001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
if __name__ == '__main__':
unittest.main()

View File

@ -113,17 +113,11 @@ class ParseCommentWeather(unittest.TestCase):
def test_humidity(self):
expected = "", {
"humidity": 100
"humidity": 0.0
}
result = parse_weather_data("h00")
self.assertEqual(expected, result)
expected = "", {
"humidity": 1
}
result = parse_weather_data("h01")
self.assertEqual(expected, result)
expected = "", {
"humidity": 99
}
@ -204,11 +198,11 @@ class ParseCommentWeather(unittest.TestCase):
def test_positionless_packet(self):
expected = {
'comment': '.ABS1.2CDF',
'comment': 'wRSW',
'format': 'wx',
'from': 'A',
'path': [],
'raw': 'A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5.ABS1.2CDF',
'raw': 'A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW',
'to': 'B',
'via': '',
'wx_raw_timestamp': '10090556',
@ -226,11 +220,11 @@ class ParseCommentWeather(unittest.TestCase):
}
}
packet = "A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5.ABS1.2CDF"
packet = "A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW"
self.assertEqual(expected, parse(packet))
packet2 = "A>B:_10090556c220s112g t r h b .ABS1.2CDF"
packet2 = "A>B:_10090556c220s112g t r h b wRSW"
expected['raw'] = packet2
expected['weather'] = {
"wind_direction": 220,
@ -239,7 +233,7 @@ class ParseCommentWeather(unittest.TestCase):
self.assertEqual(expected, parse(packet2))
packet3 = "A>B:_10090556c220s112g...t...r...p...P...b......ABS1.2CDF"
packet3 = "A>B:_10090556c220s112g...t...r...p...P...b.....wRSW"
expected['raw'] = packet3
expected['weather'] = {
"wind_direction": 220,

View File

@ -1,58 +0,0 @@
import unittest
from aprslib import parse
from aprslib.exceptions import ParseError, UnknownFormat
class thirdpartyTC(unittest.TestCase):
def test_empty_subpacket(self):
self.assertRaises(ParseError, parse, "A>B:}")
def test_no_body(self):
self.assertRaises(ParseError, parse, "A>B:}C>D")
def test_empty_body(self):
self.assertRaises(ParseError, parse, "A>B:}C>D:")
def testparse_header_exception(self):
self.assertRaises(ParseError, parse, "A>B:}C:asd")
def test_empty_body_of_format_that_is_not_status(self):
self.assertRaises(ParseError, parse, "A>B:}C>D:!")
try:
parse("A>B:}C>D:>")
except:
self.fail("empty status packet shouldn't raise exception")
def test_unsupported_formats_raising(self):
with self.assertRaises(UnknownFormat):
for packet_type in '#$%)*,<?T[_{':
packet = "A>B:}C>D:%saaa" % packet_type
try:
parse(packet)
except UnknownFormat as exp:
self.assertEqual(exp.packet, packet)
raise
def test_valid_thirdparty_msg(self):
packet = "A-1>APRS,B-2,WIDE1*:}C>APU25N,TCPIP,A-1*::DEF :ack56"
result = parse(packet)
self.assertEqual(result['via'],'')
self.assertEqual(result['to'],'APRS')
self.assertEqual(result['from'],'A-1')
self.assertEqual(result['format'],'thirdparty')
self.assertEqual(result['raw'],packet)
self.assertEqual(result['path'],['B-2', 'WIDE1*'])
self.assertEqual(result['subpacket']['raw'],packet[21:])
self.assertEqual(result['subpacket']['via'],'')
self.assertEqual(result['subpacket']['msgNo'],'56')
self.assertEqual(result['subpacket']['from'],'C')
self.assertEqual(result['subpacket']['path'],['TCPIP', 'A-1*'])
self.assertEqual(result['subpacket']['response'],'ack')
self.assertEqual(result['subpacket']['format'],'message')
self.assertEqual(result['subpacket']['to'],'APU25N')
self.assertEqual(result['subpacket']['addresse'],'DEF')