Compare commits

...

41 Commits

Author SHA1 Message Date
ua1zbe cd3724696d Загрузил(а) файлы в 'aprslib/parsing' 2023-03-24 15:55:32 +03:00
ua1zbe 32c7577fb8 Удалить 'aprslib/parsing/weather.py' 2023-03-24 15:55:16 +03:00
Rossen c2a0f18ce0 fix timestamp in test_parse_position 2022-09-21 07:10:47 +00:00
Rossen Georgiev ef88e3493a bump to v0.7.2 2022-07-10 13:35:26 +01:00
Rossen Georgiev 5347b43106 test_parse_position: clean up left over vars 2022-07-10 13:34:53 +01:00
Rossen Georgiev 2a27dc4416 parse_position: fix data ext and weather parsing
Fix #72
2022-07-10 13:33:45 +01:00
Rossen Georgiev 2b139d1857 bump to v0.7.1 2022-03-12 14:29:26 +00:00
Rossen Georgiev 6b80e9e3ef parse: remove left over print
fix #69
2022-03-12 14:28:35 +00:00
Rossen Georgiev 5e79c81035 prase: weather: use raw string for regex 2022-03-12 14:27:29 +00:00
Rossen Georgiev c9f4b4f779 parse: fix weather_data mangling comments
fix #71
2022-03-12 14:24:28 +00:00
Rossen Georgiev 1b7da6566c parse: fix weather h00 = 100% humidity
fix #70
2022-03-12 13:54:40 +00:00
Rossen Georgiev 8674bbd143 do not log 'Resource temporarily unavailable' when blocking=False
Related #57
2021-12-20 18:05:17 +00:00
Rossen Georgiev ecb9ce71fb add sonarcloud config 2021-11-27 19:44:24 +00:00
Rossen Georgiev 3d316742e6 remove scrutinizer conf 2021-11-27 19:42:56 +00:00
Rossen Georgiev 0d5f9fbf22 tweak build process 2021-11-27 19:42:02 +00:00
Rossen Georgiev d40d7a48a1 bump to v0.7.0 2021-11-27 19:36:15 +00:00
Rossen e17efe2a29
Merge pull request #66 from rossengeorgiev/joergschultzelutter-master
Updated: Support for the 'more recent' reply/ack msg format from 1999 #61
2021-11-27 19:33:42 +00:00
Rossen Georgiev 303fe826bd parse: rework message rejack parsing + update tests 2021-11-27 19:31:07 +00:00
Rossen Georgiev 8928424a9c convert tests to use unittest 2021-11-27 16:25:01 +00:00
Joerg Schultze-Lutter 171ce425b4 Removed unnecessary comments 2021-11-27 16:10:55 +00:00
Joerg Schultze-Lutter 90f05c9a4a Added test script for new APRS 'message' format field 'ackMsgNo' 2021-11-27 16:10:55 +00:00
Joerg Schultze-Lutter bee37df76d Fixed final msg parser bug when no msgno was present 2021-11-27 16:10:55 +00:00
Joerg Schultze-Lutter a1deb2ffa3 ackMsgNo support (http://www.aprs.org/aprs11/replyacks.txt)
Decode the 'new' ackMsgNo for ack/rej responses and standard
APRS messages while still honoring the original format from aprs101.pdf

The following assumptions apply when handling APRS messages in general:

Option 1: no message ID present:
    send no ACK
    outgoing messages have no msg number attachment
        Example data exchange 1:
        DF1JSL-4>APRS,TCPIP*,qAC,T2PRT::WXBOT    :94043
        WXBOT>APRS,qAS,KI6WJP::DF1JSL-4 :Mountain View CA. Today,Sunny High 60

        Example data exchange 2:
        DF1JSL-4>APRS,TCPIP*,qAC,T2SPAIN::EMAIL-2  :blah@gmail.com Hallo
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :Email sent to blah@gmail.com

Option 2: old message number format is present: (example: msg{12345)
    Send ack with message number from original message (ack12345)
    All outgoing messages have trailing msg number ( {abcde ); can be numeric or
    slphanumeric counter. See aprs101.pdf chapter 14
        Example data exchange 1:
        DF1JSL-4>APRS,TCPIP*,qAC,T2SP::EMAIL-2  :blah@gmail.com Hallo{12345
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :ack12345
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :Email sent to blah@gmail.com{891
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::EMAIL-2  :ack891

        Example data exchange 2:
        DF1JSL-4>APRS,TCPIP*,qAC,T2CSNGRAD::EMAIL-2  :blah@gmail.com{ABCDE
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :ackABCDE
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :Email sent to blah@gmail.com{893
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::EMAIL-2  :ack893

Option 3: new messages with message ID but without trailing retry msg ids: msg{AB}
    Do NOT send extra ack
    All outgoing messages have 2-character msg id, followed by message ID from original message
    Example:
    User sends message "Hello{AB}" to MPAD
    MPAD responds "Message content line 1{DE}AB" to user
    MPAD responds "Message content line 2{DF}AB" to user

    AB -> original message
    DE, DF -> message IDs generated by MPAD

        Example data exchange 1:
        DF1JSL-4>APRS,TCPIP*,qAC,T2NUERNBG::WXBOT    :99801{AB}
        WXBOT>APRS,qAS,KI6WJP::DF1JSL-4 :Lemon Creek AK. Today,Scattered Rain/Snow and Patchy Fog 50% {QL}AB
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT    :ackQL}AB
        WXBOT>APRS,qAS,KI6WJP::DF1JSL-4 :High 40{QM}AB
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT    :ackQM}AB

        Example data exchange 2:
        DF1JSL-4>APRS,TCPIP*,qAC,T2SPAIN::EMAIL-2  :blah@gmail.com Hallo{AB}
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :Email sent to blah@gmail.com{OQ}AB
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::EMAIL-2  :ackOQ}AB

Option 4: new messages with message ID and with trailing retry msg ids: msg{AB}CD
    Follow the instructions as per http://www.aprs.org/aprs11/replyacks.txt

        Example data exchange 1:
        DF1JSL-4>APRS,TCPIP*,qAC,T2CZECH::WXBOT    :99801{LM}AA
        WXBOT>APRS,qAS,KI6WJP::DF1JSL-4 :Lemon Creek AK. Today,Scattered Rain/Snow and Patchy Fog 50% {QP}LM
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT    :ackQP}LM
        WXBOT>APRS,qAS,KI6WJP::DF1JSL-4 :High 40{QQ}LM
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::WXBOT    :ackQQ}LM
        Example data exchange 2:
        DF1JSL-4>APRS,TCPIP*,qAC,T2SP::EMAIL-2  :blah@gmail.com Welt{DE}FG
        EMAIL-2>APJIE4,TCPIP*,qAC,AE5PL-JF::DF1JSL-4 :Email sent to blah@gmail.com{OS}DE
        DF1JSL-4>APOSB,TCPIP*,qAS,DF1JSL::EMAIL-2  :ackOS}DE
2021-11-27 16:10:55 +00:00
Rossen Georgiev 8d1822ec2d fix phg_gain calculation being incorrect on py2 2021-11-27 15:33:16 +00:00
Rossen Georgiev 71a97f3438 parsing: calculate power,height,gain,range for PHG 2021-11-27 15:28:55 +00:00
Rossen Georgiev bbe1f4dc61 Merge branch 'TheCranston-master'; close #64 2021-11-27 14:52:04 +00:00
Rossen Georgiev 2e9c4a9c56 parsing: refine PHGR code and update tests 2021-11-27 14:51:11 +00:00
TheCranston dc90983cb6 Update common.py
This comprehends the PHGR custom message that MUST be terminated by "/" and the standard PHG message of 4 chars.  The math "Should" be done for the "R" in terms of direction but that has been omitted.
2021-11-27 14:14:19 +00:00
Rossen Georgiev a3e205f001 Merge branch 'wsmitchell3-dev'; Close #43 2021-11-27 13:45:58 +00:00
Rossen Georgiev cf32c15a44 fix test_unsupported_formats_raising; wasn't checking every type before 2021-11-27 13:44:38 +00:00
Rossen Georgiev 09db998143 refine unsupported packet type message 2021-11-27 13:43:38 +00:00
Bill Mitchell 3c82d6463e Moved unsupported format list to larger scope, use the dictionary keys
to test if a packet is in the unsupported list.
2021-11-27 13:22:46 +00:00
Bill Mitchell e79aafe29a Roll back previous commit 2021-11-27 13:20:56 +00:00
Bill Mitchell 3a12e45342 If parsing fails on body after succeeding on head, return the
partially-parsed packet as part of the error.
2021-11-27 13:20:56 +00:00
Bill Mitchell a14da1d78d When packet type is unsupported, give error message which specifies
the packet type.
2021-11-27 13:20:56 +00:00
Rossen Georgiev 4557e44e02 remove 3rd-party from unsupport list in the comment 2021-11-27 13:19:40 +00:00
Rossen Georgiev 4d47818791 Merge branch 'wsmitchell3-third-party-v2'
Close #45
2021-11-27 13:03:37 +00:00
Rossen Georgiev e1eda0e53a fix unittest import in test_thirdparty 2021-11-27 12:57:23 +00:00
Bill Mitchell 6b96f01db2 Added ParseError to third-party packet error handling 2021-11-27 12:10:20 +00:00
Bill Mitchell 2e22a3c985 Updated testing to include third-party packets 2021-11-27 12:10:20 +00:00
Bill Mitchell 3659faa509 Added third-party packet support 2021-11-27 12:10:20 +00:00
19 changed files with 657 additions and 106 deletions

2
.gitignore vendored
View File

@ -3,3 +3,5 @@
*.pyc
dist
*.egg-info
env3
env2

View File

@ -1,10 +0,0 @@
checks:
python:
code_rating: true
filter:
excluded_paths:
- 'tests/*'
tools:
external_code_coverage:
timeout: 300
runs: 9

4
.sonarcloud.properties Normal file
View File

@ -0,0 +1,4 @@
sonar.sources=aprslib
sonar.tests=tests
sonar.sourceEncoding=UTF-8

View File

@ -41,5 +41,4 @@ dist: clean
python setup.py bdist_wheel --universal
upload: dist
python setup.py register -r pypi
twine upload -r pypi dist/*

View File

@ -41,8 +41,8 @@ from datetime import date as _date
__date__ = str(_date.today())
del _date
__version__ = "0.6.47"
version_info = (0, 6, 47)
__version__ = "0.7.2"
version_info = (0, 7, 2)
__author__ = "Rossen Georgiev"
__all__ = ['IS', 'parse', 'passcode']

View File

@ -333,11 +333,13 @@ class IS(object):
self.logger.error("socket.recv(): returned empty")
raise ConnectionDrop("connection dropped")
except socket.error as e:
self.logger.error("socket error on recv(): %s" % str(e))
if "Resource temporarily unavailable" in str(e):
if not blocking:
if len(self.buf) == 0:
break
# ignore error when blocking=false, and we attempt to read empty socket
if ("Resource temporarily unavailable" in str(e)
and not blocking
and len(self.buf) == 0):
break
else:
self.logger.error("socket error on recv(): %s" % str(e))
self.buf += short_buf

View File

@ -41,8 +41,28 @@ from aprslib.parsing.position import *
from aprslib.parsing.mice import *
from aprslib.parsing.message import *
from aprslib.parsing.telemetry import *
from aprslib.parsing.thirdparty import *
from aprslib.parsing.weather import *
unsupported_formats = {
'#':'raw weather report',
'$':'raw gps',
'%':'agrelo',
'&':'reserved',
'(':'unused',
')':'item report',
'*':'complete weather report',
'+':'reserved',
'-':'unused',
'.':'reserved',
'<':'station capabilities',
'?':'general query format',
'T':'telemetry report',
'[':'maidenhead locator beacon',
'\\':'unused',
']':'unused',
'^':'unused',
}
def _unicode_packet(packet):
# attempt utf-8
@ -138,28 +158,13 @@ def parse(packet):
def _try_toparse_body(packet_type, body, parsed):
result = {}
# NOT SUPPORTED FORMATS
#
# # - raw weather report
# $ - raw gps
# % - agrelo
# & - reserved
# ( - unused
# ) - item report
# * - complete weather report
# + - reserved
# - - unused
# . - reserved
# < - station capabilities
# ? - general query format
# T - telemetry report
# [ - maidenhead locator beacon
# \ - unused
# ] - unused
# ^ - unused
# } - 3rd party traffic
if packet_type in '#$%)*<?T[}':
raise UnknownFormat("format is not supported")
if packet_type in unsupported_formats:
raise UnknownFormat("Format is not supported: '{}' {}".format(packet_type, unsupported_formats[packet_type]))
# 3rd party traffic
elif packet_type == '}':
logger.debug("Packet is third-party")
body, result = parse_thirdparty(body)
# user defined
elif packet_type == ',':

View File

@ -1,4 +1,5 @@
import re
from math import sqrt
from datetime import datetime
from aprslib import base91
from aprslib.exceptions import ParseError
@ -133,17 +134,26 @@ def parse_comment(body, parsed):
def parse_data_extentions(body):
parsed = {}
match = re.findall(r"^([0-9 .]{3})/([0-9 .]{3})", body)
# course speed bearing nrq
# Page 27 of the spec
# format: 111/222/333/444text
match = re.findall(r"^([0-9 \.]{3})/([0-9 \.]{3})", body)
if match:
cse, spd = match[0]
body = body[7:]
parsed.update({'course': int(cse) if cse.isdigit() and 1 <= int(cse) <= 360 else 0})
if spd.isdigit():
if cse.isdigit() and cse != "000":
parsed.update({'course': int(cse) if 1 <= int(cse) <= 360 else 0})
if spd.isdigit() and spd != "000":
parsed.update({'speed': int(spd)*1.852})
match = re.findall(r"^/([0-9 .]{3})/([0-9 .]{3})", body)
# DF Report format
# Page 29 of teh spec
match = re.findall(r"^/([0-9 \.]{3})/([0-9 \.]{3})", body)
if match:
# cse=000 means stations is fixed, Page 29 of the spec
if cse == '000':
parsed.update({'course': 0})
brg, nrq = match[0]
body = body[8:]
if brg.isdigit():
@ -151,17 +161,45 @@ def parse_data_extentions(body):
if nrq.isdigit():
parsed.update({'nrq': int(nrq)})
else:
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))", body)
# PHG format: PHGabcd....
# RHGR format: RHGabcdr/....
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d)([0-9A-Z]\/)?)", body)
if match:
ext, phg = match[0]
ext, phg, phgr = match[0]
body = body[len(ext):]
parsed.update({'phg': phg})
parsed.update({
'phg': phg,
'phg_power': int(phg[0]) ** 2, # watts
'phg_height': (10 * (2 ** (ord(phg[1]) - 0x30))) * 0.3048, # in meters
'phg_gain': 10 ** (int(phg[2]) / 10.0), # dB
})
phg_dir = int(phg[3])
if phg_dir == 0:
phg_dir = 'omni'
elif phg_dir == 9:
phg_dir = 'invalid'
else:
phg_dir = 45 * phg_dir
parsed['phg_dir'] = phg_dir
# range in km
parsed['phg_range'] = sqrt(2 * (parsed['phg_height'] / 0.3048)
* sqrt((parsed['phg_power'] / 10.0)
* (parsed['phg_gain'] / 2.0)
)
) * 1.60934
if phgr:
# PHG rate per hour
parsed['phg'] += phgr[0]
parsed.update({'phg_rate': int(phgr[0], 16)}) # as decimal
else:
match = re.findall(r"^RNG(\d{4})", body)
if match:
rng = match[0]
body = body[7:]
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
match = re.findall(r"^RNG(\d{4})", body)
if match:
rng = match[0]
body = body[7:]
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
return body, parsed

View File

@ -69,28 +69,66 @@ def parse_message(body):
break
# regular message
else:
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
# ---------------------------
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
match = re.search(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
if match:
parsed.update({
'response': match.group(1),
'msgNo': match.group(2),
})
else:
body = body[0:70]
# APRS supports two different message formats:
# - the standard format which is described in 'aprs101.pdf':
# http://www.aprs.org/doc/APRS101.PDF
# - an addendum from 1999 which introduces a new format:
# http://www.aprs.org/aprs11/replyacks.txt
#
# A message (ack/rej as well as a standard msg text body) can either have:
# - no message number at all
# - a message number in the old format (1..5 characters / digits)
# - a message number in the new format (2 characters / digits) without trailing 'ack msg no'
# - a message number in the new format with trailing 'free ack msg no' (2 characters / digits)
match = re.search(r"{([A-Za-z0-9]{1,5})$", body)
if match:
msgNo = match.group(1)
body = body[:len(body) - 1 - len(msgNo)]
# ack / rej
# ---------------------------
# NEW REPLAY-ACK
# format: :AAAABBBBC:ackMM}AA
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
if match:
parsed['response'], parsed['msgNo'], ackMsgNo = match[0]
if ackMsgNo:
parsed['ackMsgNo'] = ackMsgNo
break
parsed.update({'msgNo': msgNo})
# ack/rej standard format as per aprs101.pdf chapter 14
# format: :AAAABBBBC:ack12345
match = re.findall(r"^(ack|rej)([A-Za-z0-9]{1,5})$", body)
if match:
parsed['response'], parsed['msgNo'] = match[0]
break
parsed.update({'message_text': body.strip(' ')})
# regular message body parser
# ---------------------------
parsed['message_text'] = body.strip(' ')
# check for ACKs
# new message format: http://www.aprs.org/aprs11/replyacks.txt
# format: :AAAABBBBC:text.....{MM}AA
match = re.findall(r"{([A-Za-z0-9]{2})}([A-Za-z0-9]{2})?$", body)
if match:
msgNo, ackMsgNo = match[0]
parsed['message_text'] = body[:len(body) - 4 - len(ackMsgNo)].strip(' ')
parsed['msgNo'] = msgNo
if ackMsgNo:
parsed['ackMsgNo'] = ackMsgNo
break
# old message format - see aprs101.pdf.
# search for: msgNo present
match = re.findall(r"{([A-Za-z0-9]{1,5})$", body)
if match:
msgNo = match[0]
parsed['message_text'] = body[:len(body) - 1 - len(msgNo)].strip(' ')
parsed['msgNo'] = msgNo
break
# break free from the eternal 'while'
break
return ('', parsed)

View File

@ -3,7 +3,7 @@ import re
from aprslib import base91
from aprslib.exceptions import ParseError
from aprslib.parsing import logger
from aprslib.parsing.common import parse_timestamp, parse_comment
from aprslib.parsing.common import parse_timestamp, parse_comment, parse_data_extentions
from aprslib.parsing.weather import parse_weather_data
__all__ = [
@ -60,6 +60,11 @@ def parse_position(packet_type, body):
# check comment for weather information
# Page 62 of the spec
if parsed['symbol'] == '_':
# attempt to parse winddir/speed
# Page 92 of the spec
body, result = parse_data_extentions(body)
parsed.update(result)
logger.debug("Attempting to parse weather report from comment")
body, result = parse_weather_data(body)
parsed.update({

View File

@ -0,0 +1,21 @@
import re
from aprslib.parsing.__init__ import parse
from aprslib.exceptions import UnknownFormat
from aprslib.exceptions import ParseError
__all__ = [
'parse_thirdparty',
]
def parse_thirdparty(body):
parsed = {'format':'thirdparty'}
# Parse sub-packet
try:
subpacket = parse(body)
except (UnknownFormat,ParseError) as ukf:
raise
parsed.update({'subpacket':subpacket})
return('',parsed)

View File

@ -14,7 +14,7 @@ key_map = {
'g': 'wind_gust',
'c': 'wind_direction',
't': 'temperature',
'S': 'wind_speed',
's': 'wind_speed',
'r': 'rain_1h',
'p': 'rain_24h',
'P': 'rain_since_midnight',
@ -22,22 +22,22 @@ key_map = {
'b': 'pressure',
'l': 'luminosity',
'L': 'luminosity',
's': 'snow',
'S': 'snow',
'#': 'rain_raw',
}
val_map = {
'g': lambda x: int(x) * wind_multiplier,
'c': lambda x: int(x),
'S': lambda x: int(x) * wind_multiplier,
's': lambda x: int(x) * wind_multiplier,
't': lambda x: (float(x) - 32) / 1.8,
'r': lambda x: int(x) * rain_multiplier,
'p': lambda x: int(x) * rain_multiplier,
'P': lambda x: int(x) * rain_multiplier,
'h': lambda x: int(x),
'h': lambda x: 100 if int(x) == 0 else int(x),
'b': lambda x: float(x) / 10,
'l': lambda x: int(x) + 1000,
'L': lambda x: int(x),
's': lambda x: float(x) * 25.4,
'S': lambda x: float(x) * 25.4,
'#': lambda x: int(x),
}
@ -46,20 +46,24 @@ def parse_weather_data(body):
# parse weather data
body = re.sub(r"^([0-9]{3})/([0-9]{3})", "c\\1s\\2", body)
body = body.replace('s', 'S', 1)
#body = body.replace('s', 'S', 1)
data = re.findall(r"([cSgtrpPlLs#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", body)
data = map(lambda x: (key_map[x[0]] , val_map[x[0]](x[1:])), data)
# match as many parameters from the start, rest is comment
data = re.match(r"^([csgtrpPlLS#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})+", body)
parsed.update(dict(data))
# strip weather data
body = re.sub(r"([cSgtrpPlLs#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})", '', body)
if data:
data = data.group()
# split out data from comment
body = body[len(data):]
# parse all weather parameters
data = re.findall(r"([csgtrpPlLS#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", data)
data = map(lambda x: (key_map[x[0]] , val_map[x[0]](x[1:])), data)
parsed.update(dict(data))
return (body, parsed)
def parse_weather(body):
match = re.match("^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
match = re.match(r"^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
if not match:
raise ParseError("invalid positionless weather report format")

View File

@ -1,4 +1,6 @@
twine
wheel
mox3
coverage>=5.0; python_version == '2.7' or python_version >= '3.5'

View File

@ -65,8 +65,8 @@ class ParseTestCase(unittest.TestCase):
self.fail("empty status packet shouldn't raise exception")
def test_unsupported_formats_raising(self):
with self.assertRaises(UnknownFormat):
for packet_type in '#$%)*,<?T[_{}':
for packet_type in parsing.unsupported_formats:
with self.assertRaises(UnknownFormat):
packet = "A>B:%saaa" % packet_type
try:

View File

@ -256,30 +256,47 @@ class DataExtentionsTC(unittest.TestCase):
'speed': 100*1.852,
})
def test_empty_course_speed(self):
def test_course_speed_spaces(self):
body = " / /text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {
'course': 0,
})
self.assertEqual(parsed, {})
def test_course_speed_dots(self):
body = ".../.../text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {
'course': 0,
})
self.assertEqual(parsed, {})
def test_course_speed_zeros(self):
body = "000/000/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {})
def test_course_speed_valid_chars_but_invalid_values(self):
body = "22./33 /text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '/text')
self.assertEqual(parsed, {
'course': 0,
})
self.assertEqual(parsed, {})
def test_course_speed_invalid_chars_spd(self):
body = "222/33a/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '222/33a/text')
self.assertEqual(parsed, {})
def test_course_speed_invalid_chars_cse(self):
body = "22a/333/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, '22a/333/text')
self.assertEqual(parsed, {})
def test_empty_bearing_nrq(self):
body = "111/100/ /...text"
@ -300,6 +317,17 @@ class DataExtentionsTC(unittest.TestCase):
'speed': 100*1.852,
})
def test_course_speed_bearing_nrq_empty_cse_speed(self):
body = "000/000/234/345text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'course': 0,
'bearing': 234,
'nrq': 345,
})
def test_course_speed_bearing_nrq(self):
body = "123/100/234/345text"
remaining, parsed = parse_data_extentions(body)
@ -312,23 +340,78 @@ class DataExtentionsTC(unittest.TestCase):
'nrq': 345,
})
def test_PHG(self):
body = "PHG1234Atext"
def test_PHGR_1(self):
body = "PHG51325/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'phg': '1234A',
'phg': '51325',
'phg_rate': 5,
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
})
body = "PHG1234text"
def test_PHGR_2(self):
body = "PHG5132F/text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'phg': '1234',
'phg': '5132F',
'phg_rate': 15,
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
})
def test_PHG_1(self):
body = "PHG5132Atext"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'Atext')
self.assertEqual(parsed, {
'phg': '5132',
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
})
def test_PHG_2(self):
body = "PHG5132text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed, {
'phg': '5132',
'phg_power': 25,
'phg_height': 6.096,
'phg_gain': 1.9952623149688795,
'phg_dir': 90,
'phg_range': 12.791023731208883,
})
def test_PHG_dir_omni(self):
body = "PHG0000text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed['phg_dir'], 'omni')
def test_PHG_dir_invalid(self):
body = "PHG0009text"
remaining, parsed = parse_data_extentions(body)
self.assertEqual(remaining, 'text')
self.assertEqual(parsed['phg_dir'], 'invalid')
def test_range(self):
body = "RNG1000text"
remaining, parsed = parse_data_extentions(body)

View File

@ -0,0 +1,124 @@
import unittest
from aprslib.parsing.message import parse_message
# ack/rej assertion tests
class AckRejTests(unittest.TestCase):
# errorneus rej
def test_errorneus_rej(self):
unparsed, result = parse_message("WXBOT :red12345")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'red12345',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# reject with "old" msgno
def test_reject_old_msgno(self):
unparsed, result = parse_message("WXBOT :rej123")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'msgNo': '123',
'response': 'rej'
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# ack with new msgNo but no ackMsgNo
def test_ack_new_msgno_but_no_ack_msgno(self):
unparsed, result = parse_message("WXBOT :ackAB}")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'response': 'ack',
'msgNo': 'AB',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# ack with new msgNo and ackMsgNo
def test_ack_new_msgno_and_ackmsgno(self):
unparsed, result = parse_message("WXBOT :ackAB}CD")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'response': 'ack',
'msgNo': 'AB',
'ackMsgNo': 'CD',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message text body tests
class MessageTestBodyTests(unittest.TestCase):
# message body without msg no
def test_message_without_msgno(self):
unparsed, result = parse_message("WXBOT :HelloWorld ")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msg no - old format
def test_message_body_with_no_msgno_oldformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {ABCDE")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'ABCDE',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msgNo (new format) and ackMsgNo missing
def test_message_body_with_msgno_and_ackmsgno_missing_newformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {AB}")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'AB',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with msgNo and ackMsgNo (new format)
def test_message_body_with_msgno_and_ackmsgno_newformat(self):
unparsed, result = parse_message("WXBOT :HelloWorld {AB}CD")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': 'HelloWorld',
'msgNo': 'AB',
'ackMsgNo': 'CD',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)
# message body with really long message
def test_message_body_with_long_message(self):
unparsed, result = parse_message("WXBOT :00000000001111111111222222222233333333334444444444555555555566666666667777777777")
expected = {
'format': 'message',
'addresse': 'WXBOT',
'message_text': '00000000001111111111222222222233333333334444444444555555555566666666667777777777',
}
self.assertEqual(unparsed, '')
self.assertEqual(expected, result)

View File

@ -0,0 +1,170 @@
import unittest
from datetime import datetime
from aprslib.parsing import parse_position
class ParsePositionDataExtAndWeather(unittest.TestCase):
def setUp(self):
self.maxDiff = None
def timestamp_from_partial(self, day, hour, minute):
now = datetime.now()
corrected = now.replace(day=day, hour=hour, minute=minute, second=0, microsecond=0)
return int((corrected - datetime(1970, 1, 1)).total_seconds())
def test_position_packet_only_weather_valid(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_data_ext_and_weather_valid(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_090/001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'course': 90,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_speed(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_090/...g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'course': 90,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_ /001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_speed_and_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_.../...g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
def test_position_packet_optional_course(self):
packet_type = '@'
packet = "092345z4903.50N/07201.75W_ /001g000t066r000p000...dUII"
expected = {
'messagecapable': True,
'raw_timestamp': '092345z',
'timestamp': self.timestamp_from_partial(9, 23, 45),
'format': 'uncompressed',
'posambiguity': 0,
'symbol': '_',
'symbol_table': '/',
'latitude': 49.05833333333333,
'longitude': -72.02916666666667,
'speed': 1*1.852,
'comment': '...dUII',
'weather': {
'wind_gust': 0.0,
'temperature': 18.88888888888889,
'rain_1h': 0.0,
'rain_24h': 0.0
}
}
_, result = parse_position(packet_type, packet)
self.assertEqual(expected, result)
if __name__ == '__main__':
unittest.main()

View File

@ -113,11 +113,17 @@ class ParseCommentWeather(unittest.TestCase):
def test_humidity(self):
expected = "", {
"humidity": 0.0
"humidity": 100
}
result = parse_weather_data("h00")
self.assertEqual(expected, result)
expected = "", {
"humidity": 1
}
result = parse_weather_data("h01")
self.assertEqual(expected, result)
expected = "", {
"humidity": 99
}
@ -198,11 +204,11 @@ class ParseCommentWeather(unittest.TestCase):
def test_positionless_packet(self):
expected = {
'comment': 'wRSW',
'comment': '.ABS1.2CDF',
'format': 'wx',
'from': 'A',
'path': [],
'raw': 'A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW',
'raw': 'A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5.ABS1.2CDF',
'to': 'B',
'via': '',
'wx_raw_timestamp': '10090556',
@ -220,11 +226,11 @@ class ParseCommentWeather(unittest.TestCase):
}
}
packet = "A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5wRSW"
packet = "A>B:_10090556c220s004g005t077r010p020P030h50b09900s5.5.ABS1.2CDF"
self.assertEqual(expected, parse(packet))
packet2 = "A>B:_10090556c220s112g t r h b wRSW"
packet2 = "A>B:_10090556c220s112g t r h b .ABS1.2CDF"
expected['raw'] = packet2
expected['weather'] = {
"wind_direction": 220,
@ -233,7 +239,7 @@ class ParseCommentWeather(unittest.TestCase):
self.assertEqual(expected, parse(packet2))
packet3 = "A>B:_10090556c220s112g...t...r...p...P...b.....wRSW"
packet3 = "A>B:_10090556c220s112g...t...r...p...P...b......ABS1.2CDF"
expected['raw'] = packet3
expected['weather'] = {
"wind_direction": 220,

58
tests/test_thirdparty.py Normal file
View File

@ -0,0 +1,58 @@
import unittest
from aprslib import parse
from aprslib.exceptions import ParseError, UnknownFormat
class thirdpartyTC(unittest.TestCase):
def test_empty_subpacket(self):
self.assertRaises(ParseError, parse, "A>B:}")
def test_no_body(self):
self.assertRaises(ParseError, parse, "A>B:}C>D")
def test_empty_body(self):
self.assertRaises(ParseError, parse, "A>B:}C>D:")
def testparse_header_exception(self):
self.assertRaises(ParseError, parse, "A>B:}C:asd")
def test_empty_body_of_format_that_is_not_status(self):
self.assertRaises(ParseError, parse, "A>B:}C>D:!")
try:
parse("A>B:}C>D:>")
except:
self.fail("empty status packet shouldn't raise exception")
def test_unsupported_formats_raising(self):
with self.assertRaises(UnknownFormat):
for packet_type in '#$%)*,<?T[_{':
packet = "A>B:}C>D:%saaa" % packet_type
try:
parse(packet)
except UnknownFormat as exp:
self.assertEqual(exp.packet, packet)
raise
def test_valid_thirdparty_msg(self):
packet = "A-1>APRS,B-2,WIDE1*:}C>APU25N,TCPIP,A-1*::DEF :ack56"
result = parse(packet)
self.assertEqual(result['via'],'')
self.assertEqual(result['to'],'APRS')
self.assertEqual(result['from'],'A-1')
self.assertEqual(result['format'],'thirdparty')
self.assertEqual(result['raw'],packet)
self.assertEqual(result['path'],['B-2', 'WIDE1*'])
self.assertEqual(result['subpacket']['raw'],packet[21:])
self.assertEqual(result['subpacket']['via'],'')
self.assertEqual(result['subpacket']['msgNo'],'56')
self.assertEqual(result['subpacket']['from'],'C')
self.assertEqual(result['subpacket']['path'],['TCPIP', 'A-1*'])
self.assertEqual(result['subpacket']['response'],'ack')
self.assertEqual(result['subpacket']['format'],'message')
self.assertEqual(result['subpacket']['to'],'APU25N')
self.assertEqual(result['subpacket']['addresse'],'DEF')