aprs-python/aprslib/parse.py

885 lines
26 KiB
Python

# aprslib - Python library for working with APRS
# Copyright (C) 2013-2014 Rossen Georgiev
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
This module contains all function used in parsing packets
"""
import time
import re
import math
import logging
from datetime import datetime
from .exceptions import (UnknownFormat, ParseError)
from . import base91
__all__ = ['parse']
logger = logging.getLogger(__name__)
# Mic-e message type table
MTYPE_TABLE_STD = {
"111": "M0: Off Duty",
"110": "M1: En Route",
"101": "M2: In Service",
"100": "M3: Returning",
"011": "M4: Committed",
"010": "M5: Special",
"001": "M6: Priority",
"000": "Emergency",
}
MTYPE_TABLE_CUSTOM = {
"111": "C0: Custom-0",
"110": "C1: Custom-1",
"101": "C2: Custom-2",
"100": "C3: Custom-3",
"011": "C4: Custom-4",
"010": "C5: Custom-5",
"001": "C6: Custom-6",
"000": "Emergency",
}
def parse(packet):
"""
Parses an APRS packet and returns a dict with decoded data
- All attributes are in metric units
Supports:
* normal/compressed/mic-e position reports
- including comment extentions for altitude and telemetry
* messages including bulletins, announcements and telemetry
* status message
"""
packet = packet.rstrip("\r\n")
logger.debug("Parsing: %s", packet)
if len(packet) == 0:
raise ParseError("packet is empty", packet)
# typical packet format
#
# CALL1>CALL2,CALL3,CALL4:>longtext......
# |--------header--------|-----body-------|
#
try:
(head, body) = packet.split(':', 1)
except:
raise ParseError("packet has no body", packet)
if len(body) == 0:
raise ParseError("packet body is empty", packet)
parsed = {
'raw': packet,
}
try:
parsed.update(_parse_header(head))
except ParseError, msg:
raise ParseError(str(msg), packet)
packet_type = body[0]
body = body[1:]
if len(body) == 0 and packet_type != '>':
raise ParseError("packet body is empty after packet type character", packet)
# attempt to parse the body
# ------------------------------------------------------------------------------
try:
# NOT SUPPORTED FORMATS
#
# # - raw weather report
# $ - raw gps
# % - agrelo
# & - reserved
# ( - unused
# ) - item report
# * - complete weather report
# + - reserved
# , - invalid/test format
# - - unused
# . - reserved
# < - station capabilities
# ? - general query format
# T - telemetry report
# [ - maidenhead locator beacon
# \ - unused
# ] - unused
# ^ - unused
# _ - positionless weather report
# { - user defined
# } - 3rd party traffic
if packet_type in '#$%)*,<?T[_{}':
raise UnknownFormat("format is not supported", packet)
# STATUS PACKET
#
# >DDHHMMzComments
# >Comments
elif packet_type == '>':
logger.debug("Packet is just a status message")
body, result = _parse_timestamp(body, packet_type)
parsed.update(result)
parsed.update({
'format': 'status',
'status': body.strip(' ')
})
# Mic-encoded packet
elif packet_type in "`'":
logger.debug("Attempting to parse as mic-e packet")
body, result = _parse_mice(parsed['to'], body)
parsed.update(result)
# MESSAGE PACKET
elif packet_type == ':':
logger.debug("Attempting to parse as message packet")
body, result = _parse_message(body)
parsed.update(result)
# postion report (regular or compressed)
elif (packet_type in '!=/@;' or
0 <= body.find('!') < 40): # page 28 of spec (PDF)
if packet_type not in '!=/@;':
prefix, body = body.split('!', 1)
packet_type = '!'
if packet_type == ';':
logger.debug("Attempting to parse object report format")
match = re.findall(r"^([ -~]{9})(\*|_)", body)
if match:
name, flag = match[0]
parsed.update({
'object_name': name,
'alive': flag == '*',
})
body = body[10:]
else:
raise ParseError("invalid format")
else:
parsed.update({"messagecapable": packet_type in '@='})
# decode timestamp
if packet_type in "/@;":
body, result = _parse_timestamp(body, packet_type)
parsed.update(result)
if len(body) == 0 and 'timestamp' in parsed:
raise ParseError("invalid position report format", packet)
# decode body
logger.debug("Attempting to parse as compressed position report")
body, result = _parse_compressed(body)
parsed.update(result)
if len(result) == 0:
logger.debug("Attempting to parse as normal position report")
body, result = _parse_normal(body)
parsed.update(result)
if len(result) == 0:
raise ParseError("invalid format")
# decode comment
body, result = _parse_comment(body)
parsed.update(result)
if packet_type == ';':
parsed.update({
'object_format': parsed['format'],
'format': 'object',
})
# capture ParseErrors and attach the packet
except ParseError as exp:
exp.packet = packet
raise
# if we fail all attempts to parse, try beacon packet
if 'format' not in parsed:
if not re.match(r"^(AIR.*|ALL.*|AP.*|BEACON|CQ.*|GPS.*|DF.*|DGPS.*|"
"DRILL.*|DX.*|ID.*|JAVA.*|MAIL.*|MICE.*|QST.*|QTH.*|"
"RTCM.*|SKY.*|SPACE.*|SPC.*|SYM.*|TEL.*|TEST.*|TLM.*|"
"WX.*|ZIP.*|UIDIGI)$", parsed['to']):
raise UnknownFormat("format is not supported", packet)
parsed.update({
'format': 'beacon',
'text': packet_type + body,
})
logger.debug("Parsed ok.")
return parsed
# ------------------------------------------------------------------------------
# Helper parse functions
# ------------------------------------------------------------------------------
def _validate_callsign(callsign, prefix=""):
prefix = '%s: ' % prefix if bool(prefix) else ''
match = re.findall(r"^([A-Z0-9]{1,6})(-(\d{1,2}))?$", callsign)
if not match:
raise ParseError("%sinvalid callsign" % prefix)
callsign, x, ssid = match[0]
if bool(ssid) and int(ssid) > 15:
raise ParseError("%sssid not in 0-15 range" % prefix)
def _parse_header(head):
"""
Parses the header part of packet
Returns a dict
"""
# CALL1>CALL2,CALL3,CALL4,CALL5:
# |from-|--to-|------path-------|
#
try:
(fromcall, path) = head.split('>', 1)
except:
raise ParseError("invalid packet header")
# looking at aprs.fi, the rules for from/src callsign
# are a lot looser, causing a lot of packets to fail
# this check.
#
# if len(fromcall) == 0:
# raise ParseError("no fromcallsign in header")
# _validate_callsign(fromcall, "fromcallsign")
if (not 1 <= len(fromcall) <= 9 or
not re.findall(r"^[a-z0-9]{0,9}(\-[a-z0-9]{1,8})?$", fromcall, re.I)):
raise ParseError("fromcallsign is invalid")
path = path.split(',')
if len(path) < 1 or len(path[0]) == 0:
raise ParseError("no tocallsign in header")
tocall = path[0]
path = path[1:]
_validate_callsign(tocall, "tocallsign")
for digi in path:
if not re.findall(r"^[A-Z0-9\-]{1,9}\*?$", digi, re.I):
raise ParseError("invalid callsign in path")
parsed = {
'from': fromcall,
'to': tocall,
'path': path,
}
# viacall is the callsign that gated the packet to the net
# it's located behind the q-contructed
#
# CALL1>CALL2,CALL3,qAR,CALL5:
# .....................|-via-|
#
viacall = ""
if len(path) >= 2 and re.match(r"^q..$", path[-2]):
viacall = path[-1]
parsed.update({'via': viacall})
return parsed
def _parse_timestamp(body, packet_type=''):
parsed = {}
match = re.findall(r"^((\d{6})(.))$", body[0:7])
if match:
rawts, ts, form = match[0]
utc = datetime.utcnow()
timestamp = 0
if packet_type == '>' and form != 'z':
pass
if form in "hz/":
body = body[7:]
try:
# zulu hhmmss format
if form == 'h':
timestamp = "%d%02d02d%s" % (utc.year, utc.month, utc.day, ts)
# zulu ddhhmm format
# '/' local ddhhmm format
elif form in 'z/':
timestamp = "%d%02d%s%02d" % (utc.year, utc.month, ts, utc.second)
timestamp = utc.strptime(timestamp, "%Y%m%d%H%M%S")
timestamp = time.mktime(timestamp.timetuple())
parsed.update({'raw_timestamp': rawts})
except Exception as exp:
timestamp = 0
logger.debug(exp)
parsed.update({'timestamp': int(timestamp)})
return (body, parsed)
def _parse_comment(body):
parsed = {}
# attempt to parse remaining part of the packet (comment field)
# try CRS/SPD/
match = re.findall(r"^([0-9]{3})/([0-9]{3})", body)
if match:
cse, spd = match[0]
body = body[7:]
parsed.update({
'course': int(cse),
'speed': int(spd)*1.852 # knots to kms
})
# try BRG/NRQ/
match = re.findall(r"^([0-9]{3})/([0-9]{3})", body)
if match:
brg, nrq = match[0]
body = body[7:]
parsed.update({'bearing': int(brg), 'nrq': int(nrq)})
else:
match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))\/", body)
if match:
ext, phg = match[0]
body = body[len(ext):]
parsed.update({'phg': phg})
else:
match = re.findall(r"^(RNG(\d{4}))\/", body)
if match:
ext, rng = match[0]
body = body[len(ext):]
parsed.update({'rng': int(rng) * 1.609344}) # miles to km
# try find altitude in comment /A=dddddd
match = re.findall(r"^(.*?)/A=(\-\d{5}|\d{6})(.*)$", body)
if match:
body, altitude, post = match[0]
body += post # glue front and back part together, DONT ASK
parsed.update({'altitude': int(altitude)*0.3048})
body, telemetry = _parse_comment_telemetry(body)
parsed.update(telemetry)
if len(body) > 0 and body[0] == "/":
body = body[1:]
parsed.update({'comment': body.strip(' ')})
return ('', parsed)
def _parse_comment_telemetry(text):
"""
Looks for base91 telemetry found in comment field
Returns [remaining_text, telemetry]
"""
parsed = {}
match = re.findall(r"^(.*?)\|([!-{]{4,14})\|(.*)$", text)
if match and len(match[0][1]) % 2 == 0:
text, telemetry, post = match[0]
text += post
temp = [0] * 7
for i in range(7):
temp[i] = base91.to_decimal(telemetry[i*2:i*2+2])
parsed.update({
'telemetry': {
'seq': temp[0],
'vals': temp[1:6]
}
})
if temp[6] != '':
parsed['telemetry'].update({
'bits': "{0:08b}".format(temp[6] & 0xFF)[::-1]
})
return (text, parsed)
# Mic-encoded packet
#
# 'lllc/s$/......... Mic-E no message capability
# 'lllc/s$/>........ Mic-E message capability
# `lllc/s$/>........ Mic-E old posit
def _parse_mice(dstcall, body):
parsed = {'format': 'mic-e'}
dstcall = dstcall.split('-')[0]
# verify mic-e format
if len(dstcall) != 6:
raise ParseError("dstcall has to be 6 characters")
if len(body) < 8:
raise ParseError("packet data field is too short")
if not re.match(r"^[0-9A-Z]{3}[0-9L-Z]{3}$", dstcall):
raise ParseError("invalid dstcall")
if not re.match(r"^[&-\x7f][&-a][\x1c-\x7f]{2}[\x1c-\x7d]"
r"[\x1c-\x7f][\x21-\x7e][\/\\0-9A-Z]", body):
raise ParseError("invalid data format")
# get symbol table and symbol
parsed.update({
'symbol': body[6],
'symbol_table': body[7]
})
# parse latitude
# the routine translates each characters into a lat digit as described in
# 'Mic-E Destination Address Field Encoding' table
tmpdstcall = ""
for i in dstcall:
if i in "KLZ": # spaces
tmpdstcall += " "
elif ord(i) > 76: # P-Y
tmpdstcall += chr(ord(i) - 32)
elif ord(i) > 57: # A-J
tmpdstcall += chr(ord(i) - 16)
else: # 0-9
tmpdstcall += i
# determine position ambiguity
match = re.findall(r"^\d+( *)$", tmpdstcall)
if not match:
raise ParseError("invalid latitude ambiguity")
posambiguity = len(match[0])
parsed.update({
'posambiguity': posambiguity
})
# adjust the coordinates be in center of ambiguity box
tmpdstcall = list(tmpdstcall)
if posambiguity > 0:
if posambiguity >= 4:
tmpdstcall[2] = '3'
else:
tmpdstcall[6 - posambiguity] = '5'
tmpdstcall = "".join(tmpdstcall)
latminutes = float(("%s.%s" % (tmpdstcall[2:4], tmpdstcall[4:6])).replace(" ", "0"))
latitude = int(tmpdstcall[0:2]) + (latminutes / 60.0)
# determine the sign N/S
latitude = -latitude if ord(dstcall[3]) <= 0x4c else latitude
parsed.update({
'latitude': latitude
})
# parse message bits
mbits = re.sub(r"[0-9L]", "0", dstcall[0:3])
mbits = re.sub(r"[P-Z]", "1", mbits)
mbits = re.sub(r"[A-K]", "2", mbits)
parsed.update({
'mbits': mbits
})
# resolve message type
if mbits.find("2") > -1:
parsed.update({
'mtype': MTYPE_TABLE_CUSTOM[mbits.replace("2", "1")]
})
else:
parsed.update({
'mtype': MTYPE_TABLE_STD[mbits]
})
# parse longitude
longitude = ord(body[0]) - 28 # decimal part of longitude
longitude += 100 if ord(dstcall[4]) >= 0x50 else 0 # apply lng offset
longitude += -80 if longitude >= 180 and longitude <= 189 else 0
longitude += -190 if longitude >= 190 and longitude <= 199 else 0
# long minutes
lngminutes = ord(body[1]) - 28.0
lngminutes += -60 if lngminutes >= 60 else 0
# + (long hundredths of minutes)
lngminutes += ((ord(body[2]) - 28.0) / 100.0)
# apply position ambiguity
# routines adjust longitude to center of the ambiguity box
if posambiguity is 4:
lngminutes = 30
elif posambiguity is 3:
lngminutes = (math.floor(lngminutes/10) + 0.5) * 10
elif posambiguity is 2:
lngminutes = math.floor(lngminutes) + 0.5
elif posambiguity is 1:
lngminutes = (math.floor(lngminutes*10) + 0.5) / 10.0
elif posambiguity is not 0:
raise ParseError("Unsupported position ambiguity: %d" % posambiguity)
longitude += lngminutes / 60.0
# apply E/W sign
longitude = 0 - longitude if ord(dstcall[5]) >= 0x50 else longitude
parsed.update({
'longitude': longitude
})
# parse speed and course
speed = (ord(body[3]) - 28) * 10
course = ord(body[4]) - 28
quotient = int(course / 10.0)
course += -(quotient * 10)
course = course*100 + ord(body[5]) - 28
speed += quotient
speed += -800 if speed >= 800 else 0
course += -400 if course >= 400 else 0
speed *= 1.852 # knots * 1.852 = kmph
parsed.update({
'speed': speed,
'course': course
})
# the rest of the packet can contain telemetry and comment
if len(body) > 8:
body = body[8:]
# check for optional 2 or 5 channel telemetry
match = re.findall(r"^('[0-9a-f]{10}|`[0-9-af]{4})(.*)$", body)
if match:
hexdata, body = match[0]
hexdata = hexdata[1:] # remove telemtry flag
channels = len(hexdata) / 2 # determine number of channels
hexdata = int(hexdata, 16) # convert hex to int
telemetry = []
for i in range(channels):
telemetry.insert(0, int(hexdata >> 8*i & 255))
parsed.update({'telemetry': telemetry})
# check for optional altitude
match = re.findall(r"^(.*)([!-{]{3})\}(.*)$", body)
if match:
body, altitude, extra = match[0]
altitude = base91.to_decimal(altitude) - 10000
parsed.update({'altitude': altitude})
body = body + extra
# attempt to parse comment telemetry
body, telemetry = _parse_comment_telemetry(body)
parsed.update(telemetry)
#TODO !DAO! parsing
# rest is a comment
parsed.update({'comment': body.strip(' ')})
return ('', parsed)
# MESSAGE PACKET
#
# :ADDRESSEE:Message text ........{XXXXX Up to 5 char line number
# :ADDRESSEE:ackXXXXX Ack for same line number
# :ADDRESSEE:Message text ........{MM}AA Line# with REPLY ACK
#
# TELEMETRY MESSAGES
#
# :N3MIM:PARM.Battery,BTemp,AirTemp,Pres,Altude,Camra,Chute,Sun,10m,ATV
# :N3MIM:UNIT.Volts,deg.F,deg.F,Mbar,Kfeet,Clik,OPEN!,on,on,high
# :N3MIM:EQNS.0,2.6,0,0,.53,-32,3,4.39,49,-32,3,18,1,2,3
# :N3MIM:BITS.10110101,PROJECT TITLE...
def _parse_message(body):
parsed = {}
# the while loop is used to easily break out once a match is found
while True:
# try to match bulletin
match = re.findall(r"^BLN([0-9])([a-z0-9_ \-]{5}):(.{0,67})", body, re.I)
if match:
bid, identifier, text = match[0]
identifier = identifier.rstrip(' ')
mformat = 'bulletin' if identifier == "" else 'group-bulletin'
parsed.update({
'format': mformat,
'message_text': text.strip(' '),
'bid': bid,
'identifier': identifier
})
break
# try to match announcement
match = re.findall(r"^BLN([A-Z])([a-zA-Z0-9_ \-]{5}):(.{0,67})", body)
if match:
aid, identifier, text = match[0]
identifier = identifier.rstrip(' ')
parsed.update({
'format': 'announcement',
'message_text': text.strip(' '),
'aid': aid,
'identifier': identifier
})
break
# validate addresse
match = re.findall(r"^([a-zA-Z0-9_ \-]{9}):(.*)$", body)
if not match:
break
addresse, body = match[0]
parsed.update({'addresse': addresse.rstrip(' ')})
# check if it's a telemetry configuration message
match = re.findall(r"^(PARM|UNIT|EQNS|BITS)\.(.*)$", body)
if match:
logger.debug("Attempting to parse telemetry-message packet")
form, body = match[0]
parsed.update({'format': 'telemetry-message'})
if form in ["PARM", "UNIT"]:
vals = body.split(',')[:13]
for val in vals:
if not re.match(r"^(.{1,20}|)$", val):
raise ParseError("incorrect format of %s (name too long?)" % form)
defvals = [''] * 13
defvals[:len(vals)] = vals
parsed.update({
't%s' % form: defvals
})
elif form == "EQNS":
eqns = body.split(',')[:15]
teqns = [0, 1, 0] * 5
for idx, val in enumerate(eqns):
if not re.match(r"^([-]?\d*\.?\d+|)$", val):
raise ParseError("value at %d is not a number in %s" % (idx+1, form))
else:
try:
val = int(val)
except:
val = float(val) if val != "" else 0
teqns[idx] = val
# group values in 5 list of 3
teqns = [teqns[i*3:(i+1)*3] for i in range(5)]
parsed.update({
't%s' % form: teqns
})
elif form == "BITS":
match = re.findall(r"^([01]{8}),(.{0,23})$", body)
if not match:
raise ParseError("incorrect format of %s (title too long?)" % form)
bits, title = match[0]
parsed.update({
't%s' % form: bits,
'title': title.strip(' ')
})
# regular message
else:
logger.debug("Packet is just a regular message")
parsed.update({'format': 'message'})
match = re.findall(r"^(ack|rej)\{([0-9]{1,5})$", body)
if match:
response, number = match[0]
parsed.update({
'response': response,
'msgNo': number
})
else:
body = body[0:70]
match = re.findall(r"\{([0-9]{1,5})$", body)
if match:
msgid = match[0]
body = body[:len(body) - 1 - len(msgid)]
parsed.update({'msgNo': int(msgid)})
parsed.update({'message_text': body.strip(' ')})
break
return ('', parsed)
def _parse_compressed(body):
parsed = {}
if re.match(r"^[\/\\A-Za-j][!-|]{8}[!-{}][ -|]{3}", body):
logger.debug("Attempting to parse as compressed position report")
if len(body) < 13:
raise ParseError("Invalid compressed packet (less than 13 characters)")
parsed.update({'format': 'compressed'})
compressed = body[:13]
body = body[13:]
symbol_table = compressed[0]
symbol = compressed[9]
try:
latitude = 90 - (base91.to_decimal(compressed[1:5]) / 380926.0)
longitude = -180 + (base91.to_decimal(compressed[5:9]) / 190463.0)
except ValueError:
raise ParseError("invalid characters in latitude/longitude encoding")
# parse csT
# converts the relevant characters from base91
c1, s1, ctype = [ord(x) - 33 for x in compressed[10:13]]
if c1 == -1:
parsed.update({'gpsfixstatus': 1 if ctype & 0x20 == 0x20 else 0})
if -1 in [c1, s1]:
pass
elif ctype & 0x18 == 0x10:
parsed.update({'altitude': (1.002 ** (c1 * 91 + s1)) * 0.3048})
elif c1 >= 0 and c1 <= 89:
parsed.update({'course': 360 if c1 == 0 else c1 * 4})
parsed.update({'speed': (1.08 ** s1 - 1) * 1.852}) # mul = convert knts to kmh
elif c1 == 90:
parsed.update({'radiorange': (2 * 1.08 ** s1) * 1.609344}) # mul = convert mph to kmh
parsed.update({
'symbol': symbol,
'symbol_table': symbol_table,
'latitude': latitude,
'longitude': longitude,
})
return (body, parsed)
def _parse_normal(body):
parsed = {}
match = re.findall(r"^(\d{2})([0-9 ]{2}\.[0-9 ]{2})([NnSs])([\/\\0-9A-Z])"
r"(\d{3})([0-9 ]{2}\.[0-9 ]{2})([EeWw])([\x21-\x7e])(.*)$", body)
if match:
parsed.update({'format': 'uncompressed'})
(
lat_deg,
lat_min,
lat_dir,
symbol_table,
lon_deg,
lon_min,
lon_dir,
symbol,
body
) = match[0]
# position ambiguity
posambiguity = lat_min.count(' ')
parsed.update({'posambiguity': posambiguity})
# we center the position inside the ambiguity box
if posambiguity >= 4:
lat_min = "30"
lon_min = "30"
else:
lat_min = lat_min.replace(' ', '5', 1)
lon_min = lon_min.replace(' ', '5', 1)
# validate longitude and latitude
if int(lat_deg) > 89 or int(lat_deg) < 0:
raise ParseError("latitude is out of range (0-90 degrees)")
if int(lon_deg) > 179 or int(lon_deg) < 0:
raise ParseError("longitutde is out of range (0-180 degrees)")
"""
f float(lat_min) >= 60:
raise ParseError("latitude minutes are out of range (0-60)")
if float(lon_min) >= 60:
raise ParseError("longitude minutes are out of range (0-60)")
the above is commented out intentionally
apperantly aprs.fi doesn't bound check minutes
and there are actual packets that have >60min
i don't even know why that's the case
"""
# convert coordinates from DDMM.MM to decimal
latitude = int(lat_deg) + (float(lat_min) / 60.0)
longitude = int(lon_deg) + (float(lon_min) / 60.0)
latitude *= -1 if lat_dir in 'Ss' else 1
longitude *= -1 if lon_dir in 'Ww' else 1
parsed.update({
'symbol': symbol,
'symbol_table': symbol_table,
'latitude': latitude,
'longitude': longitude,
})
return (body, parsed)