Compare commits

...

6 Commits

Author SHA1 Message Date
ua1zbe cd3724696d Загрузил(а) файлы в 'aprslib/parsing' 2023-03-24 15:55:32 +03:00
ua1zbe 32c7577fb8 Удалить 'aprslib/parsing/weather.py' 2023-03-24 15:55:16 +03:00
Rossen c2a0f18ce0 fix timestamp in test_parse_position 2022-09-21 07:10:47 +00:00
Rossen Georgiev ef88e3493a bump to v0.7.2 2022-07-10 13:35:26 +01:00
Rossen Georgiev 5347b43106 test_parse_position: clean up left over vars 2022-07-10 13:34:53 +01:00
Rossen Georgiev 2a27dc4416 parse_position: fix data ext and weather parsing
Fix #72
2022-07-10 13:33:45 +01:00
7 changed files with 236 additions and 24 deletions

2
.gitignore vendored
View File

@ -3,3 +3,5 @@
*.pyc
dist
*.egg-info
env3
env2

View File

@ -41,8 +41,8 @@ from datetime import date as _date
__date__ = str(_date.today())
del _date
__version__ = "0.7.1"
version_info = (0, 7, 1)
__version__ = "0.7.2"
version_info = (0, 7, 2)
__author__ = "Rossen Georgiev"
__all__ = ['IS', 'parse', 'passcode']

View File

@ -136,17 +136,24 @@ def parse_data_extentions(body):
parsed = {}
# course speed bearing nrq
# Page 27 of the spec
# format: 111/222/333/444text
match = re.findall(r"^([0-9 .]{3})/([0-9 .]{3})", body)
match = re.findall(r"^([0-9 \.]{3})/([0-9 \.]{3})", body)
if match:
cse, spd = match[0]
body = body[7:]
parsed.update({'course': int(cse) if cse.isdigit() and 1 <= int(cse) <= 360 else 0})
if spd.isdigit():
if cse.isdigit() and cse != "000":
parsed.update({'course': int(cse) if 1 <= int(cse) <= 360 else 0})
if spd.isdigit() and spd != "000":
parsed.update({'speed': int(spd)*1.852})
match = re.findall(r"^/([0-9 .]{3})/([0-9 .]{3})", body)
# DF Report format
# Page 29 of teh spec
match = re.findall(r"^/([0-9 \.]{3})/([0-9 \.]{3})", body)
if match:
# cse=000 means stations is fixed, Page 29 of the spec
if cse == '000':
parsed.update({'course': 0})
brg, nrq = match[0]
body = body[8:]
if brg.isdigit():

View File

@ -3,7 +3,7 @@ import re
from aprslib import base91
from aprslib.exceptions import ParseError
from aprslib.parsing import logger
from aprslib.parsing.common import parse_timestamp, parse_comment
from aprslib.parsing.common import parse_timestamp, parse_comment, parse_data_extentions
from aprslib.parsing.weather import parse_weather_data
__all__ = [
@ -60,6 +60,11 @@ def parse_position(packet_type, body):
# check comment for weather information
# Page 62 of the spec
if parsed['symbol'] == '_':
# attempt to parse winddir/speed
# Page 92 of the spec
body, result = parse_data_extentions(body)
parsed.update(result)
logger.debug("Attempting to parse weather report from comment")
body, result = parse_weather_data(body)
parsed.update({

View File

@ -14,7 +14,7 @@ key_map = {
'g': 'wind_gust',
'c': 'wind_direction',
't': 'temperature',
'S': 'wind_speed',
's': 'wind_speed',
'r': 'rain_1h',
'p': 'rain_24h',
'P': 'rain_since_midnight',
@ -22,13 +22,13 @@ key_map = {
'b': 'pressure',
'l': 'luminosity',
'L': 'luminosity',
's': 'snow',
'S': 'snow',
'#': 'rain_raw',
}
val_map = {
'g': lambda x: int(x) * wind_multiplier,
'c': lambda x: int(x),
'S': lambda x: int(x) * wind_multiplier,
's': lambda x: int(x) * wind_multiplier,
't': lambda x: (float(x) - 32) / 1.8,
'r': lambda x: int(x) * rain_multiplier,
'p': lambda x: int(x) * rain_multiplier,
@ -37,7 +37,7 @@ val_map = {
'b': lambda x: float(x) / 10,
'l': lambda x: int(x) + 1000,
'L': lambda x: int(x),
's': lambda x: float(x) * 25.4,
'S': lambda x: float(x) * 25.4,
'#': lambda x: int(x),
}
@ -46,17 +46,17 @@ def parse_weather_data(body):
# parse weather data
body = re.sub(r"^([0-9]{3})/([0-9]{3})", "c\\1s\\2", body)
body = body.replace('s', 'S', 1)
#body = body.replace('s', 'S', 1)
# match as many parameters from the start, rest is comment
data = re.match(r"^([cSgtrpPlLs#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})+", body)
data = re.match(r"^([csgtrpPlLS#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})+", body)
if data:
data = data.group()
# split out data from comment
body = body[len(data):]
# parse all weather parameters
data = re.findall(r"([cSgtrpPlLs#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", data)
data = re.findall(r"([csgtrpPlLS#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", data)
data = map(lambda x: (key_map[x[0]] , val_map[x[0]](x[1:])), data)
parsed.update(dict(data))

View File

@ -256,30 +256,47 @@ class DataExtentionsTC(unittest.TestCase):
'speed': 100*1.852,
})
def test_empty_course_speed(self):
def test_course_speed_spaces(self):
body = " / /text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {
'course': 0,
})
self.assertEqual(parsed, {})
def test_course_speed_dots(self):
body = ".../.../text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {
'course': 0,
})
self.assertEqual(parsed, {})
def test_course_speed_zeros(self):
body = "000/000/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {})
def test_course_speed_valid_chars_but_invalid_values(self):
body = "22./33 /text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {
'course': 0,
})
self.assertEqual(parsed, {})
def test_course_speed_invalid_chars_spd(self):
body = "222/33a/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '222/33a/text')
self.assertEqual(parsed, {})
def test_course_speed_invalid_chars_cse(self):
body = "22a/333/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '22a/333/text')
self.assertEqual(parsed, {})
def test_empty_bearing_nrq(self):
body = "111/100/ /...text"
@ -300,6 +317,17 @@ class DataExtentionsTC(unittest.TestCase):
'speed': 100*1.852,
})
def test_course_speed_bearing_nrq_empty_cse_speed(self):
body = "000/000/234/345text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'course': 0,
'bearing': 234,
'nrq': 345,
})
def test_course_speed_bearing_nrq(self):
body = "123/100/234/345text"
remaining, parsed = parse_data_extentions(body)

View File

@ -0,0 +1,170 @@
import unittest
from datetime import datetime
from aprslib.parsing import parse_position
class ParsePositionDataExtAndWeather(unittest.TestCase):
def setUp(self):
self.maxDiff = None
def timestamp_from_partial(self, day, hour, minute):
now = datetime.now()
corrected = now.replace(day=day, hour=hour, minute=minute, second=0, microsecond=0)
return int((corrected - datetime(1970, 1, 1)).total_seconds())
def test_position_packet_only_weather_valid(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_data_ext_and_weather_valid(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_090/001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'course': 90,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_speed(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_090/...g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'course': 90,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_ /001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_speed_and_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_.../...g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_ /001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
if __name__ == '__main__':
unittest.main()