added parsing of positionless wx reports + tests

This commit is contained in:
Rossen Georgiev 2016-01-09 14:53:31 +00:00
parent e06fadaecc
commit dc66940fb8
2 changed files with 86 additions and 52 deletions

View File

@ -162,10 +162,9 @@ def parse(packet):
# \ - unused
# ] - unused
# ^ - unused
# _ - positionless weather report
# { - user defined
# } - 3rd party traffic
if packet_type in '#$%)*,<?T[_{}':
if packet_type in '#$%)*,<?T[{}':
raise UnknownFormat("format is not supported", packet)
# STATUS PACKET
@ -197,6 +196,23 @@ def parse(packet):
body, result = _parse_message(body)
parsed.update(result)
# POSITIONLESS WEATHER REPORT
elif packet_type == '_':
logger.debug("Attempting to parse as positionless weather report")
match = re.match("^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
if not match:
raise ParseError("invalid positionless weather report format")
comment, weather = _parse_weather_data(body[8:])
parsed.update({
'format': 'wx',
'wx_raw_timestamp': match.group(1),
'comment': comment.strip(' '),
'weather': weather,
})
# postion report (regular or compressed)
elif (packet_type in '!=/@;' or
0 <= body.find('!') < 40): # page 28 of spec (PDF)
@ -247,7 +263,7 @@ def parse(packet):
# Page 62 of the spec
if parsed['symbol'] == '_':
logger.debug("Attempting to parse weather report from comment")
body, result = _parse_comment_weather(body)
body, result = _parse_weather_data(body)
parsed.update({
'comment': body.strip(' '),
'weather': result,
@ -936,7 +952,7 @@ def _parse_normal(body):
return (body, parsed)
def _parse_comment_weather(body):
def _parse_weather_data(body):
wind_multiplier = 0.44704
rain_multiplier = 0.254

View File

@ -1,11 +1,14 @@
import unittest2 as unittest
from aprslib.parsing import _parse_comment_weather
from aprslib.parsing import _parse_weather_data
from aprslib.parsing import parse
wind_multiplier = 0.44704
mm_multiplier = 0.254
class ParseCommentWeather(unittest.TestCase):
def setUp(self):
self.maxDiff = None
def test_wind(self):
# misc value
@ -13,7 +16,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": (9 * wind_multiplier),
"wind_direction": 9
}
result = _parse_comment_weather("009/009")
result = _parse_weather_data("009/009")
self.assertEqual(expected, result)
# Daft result but possible
@ -21,7 +24,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": (999 * wind_multiplier),
"wind_direction": 999
}
result = _parse_comment_weather("999/999")
result = _parse_weather_data("999/999")
self.assertEqual(expected, result)
#Positionless packet
@ -29,7 +32,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": float(9 * wind_multiplier),
"wind_direction": 9
}
result = _parse_comment_weather("c009s009")
result = _parse_weather_data("c009s009")
self.assertEqual(expected, result)
# Daft result but possible
@ -37,7 +40,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": (999 * wind_multiplier),
"wind_direction": 999
}
result = _parse_comment_weather("c999s999")
result = _parse_weather_data("c999s999")
self.assertEqual(expected, result)
def test_temp(self):
@ -45,184 +48,199 @@ class ParseCommentWeather(unittest.TestCase):
expected = "", {
"temperature": float((-99.0 - 32) / 1.8)
}
result = _parse_comment_weather("t-99")
result = _parse_weather_data("t-99")
self.assertEqual(expected, result)
# Misc
expected = "", {
"temperature": -40.0
}
result = _parse_comment_weather("t-40")
result = _parse_weather_data("t-40")
self.assertEqual(expected, result)
# Zero F
expected = "", {
"temperature": -17.77777777777778
}
result = _parse_comment_weather("t000")
result = _parse_weather_data("t000")
self.assertEqual(expected, result)
# Daft, but possible
expected = "", {
"temperature": 537.2222222222222
}
result = _parse_comment_weather("t999")
result = _parse_weather_data("t999")
self.assertEqual(expected, result)
def test_rain_1h(self):
expected = "", {
"rain_1h": 0.0
}
result = _parse_comment_weather("r000")
result = _parse_weather_data("r000")
self.assertEqual(expected, result)
expected = "", {
"rain_1h": float(999 * mm_multiplier)
}
result = _parse_comment_weather("r999")
result = _parse_weather_data("r999")
self.assertEqual(expected, result)
def test_rain_24h(self):
expected = "", {
"rain_24h": 0.0
}
result = _parse_comment_weather("p000")
result = _parse_weather_data("p000")
self.assertEqual(expected, result)
expected = "", {
"rain_24h": float(999 * mm_multiplier)
}
result = _parse_comment_weather("p999")
result = _parse_weather_data("p999")
self.assertEqual(expected, result)
def test_rain_since_midnight(self):
expected = "", {
"rain_since_midnight": 0.0
}
result = _parse_comment_weather("P000")
result = _parse_weather_data("P000")
self.assertEqual(expected, result)
expected = "", {
"rain_since_midnight": float(999 * mm_multiplier)
}
result = _parse_comment_weather("P999")
result = _parse_weather_data("P999")
self.assertEqual(expected, result)
def test_humidity(self):
expected = "", {
"humidity": 0.0
}
result = _parse_comment_weather("h00")
result = _parse_weather_data("h00")
self.assertEqual(expected, result)
expected = "", {
"humidity": 99
}
result = _parse_comment_weather("h99")
result = _parse_weather_data("h99")
self.assertEqual(expected, result)
def test_pressure(self):
expected = "", {
"pressure": 0.0
}
result = _parse_comment_weather("b00000")
result = _parse_weather_data("b00000")
self.assertEqual(expected, result)
expected = "", {
"pressure": 9999.9
}
result = _parse_comment_weather("b99999")
result = _parse_weather_data("b99999")
self.assertEqual(expected, result)
def test_luminosity(self):
expected = "", {
"luminosity": 000
}
result = _parse_comment_weather("L000")
result = _parse_weather_data("L000")
self.assertEqual(expected, result)
expected = "", {
"luminosity": 999
}
result = _parse_comment_weather("L999")
result = _parse_weather_data("L999")
self.assertEqual(expected, result)
expected = "", {
"luminosity": 1123
}
result = _parse_comment_weather("l123")
result = _parse_weather_data("l123")
self.assertEqual(expected, result)
expected = "", {
"luminosity": 1999
}
result = _parse_comment_weather("l999")
result = _parse_weather_data("l999")
self.assertEqual(expected, result)
def test_snow(self):
expected = "", {
"snow": 000
}
result = _parse_comment_weather("s...s000")
result = _parse_weather_data("s...s000")
self.assertEqual(expected, result)
expected = "", {
"snow": float(5.5 * 25.4)
}
result = _parse_comment_weather("s...s5.5")
result = _parse_weather_data("s...s5.5")
self.assertEqual(expected, result)
expected = "", {
"snow": float(999 * 25.4)
}
result = _parse_comment_weather("s...s999")
result = _parse_weather_data("s...s999")
self.assertEqual(expected, result)
def test_rain_raw(self):
expected = "", {
"rain_raw": 000
}
result = _parse_comment_weather("#000")
result = _parse_weather_data("#000")
self.assertEqual(expected, result)
expected = "", {
"rain_raw": 999
}
result = _parse_comment_weather("#999")
result = _parse_weather_data("#999")
self.assertEqual(expected, result)
# Not possible in real world (rain and snow measurements)
def test_positionless_packet(self):
expected = "10090556wRSW", {
"pressure": 990.0,
"humidity": 50,
"rain_1h": float(010.0 * mm_multiplier),
"rain_24h": float(020.0 * mm_multiplier),
"rain_since_midnight": float(030.0 * mm_multiplier),
"snow": float(5.5 * 25.4),
"temperature": float((77.0 - 32) / 1.8),
"wind_direction": 220,
"wind_gust": 5.0 * wind_multiplier,
"wind_speed": 4.0 * wind_multiplier
expected = {
'comment': 'wRSW',
'format': 'wx',
'from': 'A',
'path': [],
'raw': 'A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW',
'to': 'B',
'via': '',
'wx_raw_timestamp': '10090556',
"weather": {
"pressure": 990.0,
"humidity": 50,
"rain_1h": float(010.0 * mm_multiplier),
"rain_24h": float(020.0 * mm_multiplier),
"rain_since_midnight": float(030.0 * mm_multiplier),
"snow": float(5.5 * 25.4),
"temperature": float((77.0 - 32) / 1.8),
"wind_direction": 220,
"wind_gust": 5.0 * wind_multiplier,
"wind_speed": 4.0 * wind_multiplier
}
}
result = _parse_comment_weather("10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW")
self.assertEqual(expected, result)
packet = "A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW"
expected = "10090556wRSW", {
self.assertEqual(expected, parse(packet))
packet2 = "A>B:_10090556c220s112g t r h b wRSW"
expected['raw'] = packet2
expected['weather'] = {
"wind_direction": 220,
"wind_speed": 112 * wind_multiplier
}
result = _parse_comment_weather("10090556c220s112g t r p P h b wRSW")
self.assertEqual(expected, result)
self.assertEqual(expected, parse(packet2))
expected = "10090556wRSW", {
packet3 = "A>B:_10090556c220s112g...t...r...p...P...b.....wRSW"
expected['raw'] = packet3
expected['weather'] = {
"wind_direction": 220,
"wind_speed": 112 * wind_multiplier
}
result = _parse_comment_weather("10090556c220s112g...t...r...p...P...h..b.....wRSW")
self.assertEqual(expected, result)
self.assertEqual(expected, parse(packet3))
def test_position_packet(self):
expected = "eCumulusWMR100", {
@ -235,7 +253,7 @@ class ParseCommentWeather(unittest.TestCase):
"wind_speed": 1 * wind_multiplier
}
result = _parse_comment_weather("319/001g004t048r...p P000h19b10294eCumulusWMR100")
result = _parse_weather_data("319/001g004t048r...p P000h19b10294eCumulusWMR100")
self.assertEqual(expected, result)
if __name__ == '__main__':