aprs-python/aprslib/parsing/__init__.py

199 lines
5.4 KiB
Python

# aprslib - Python library for working with APRS
# Copyright (C) 2013-2014 Rossen Georgiev
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
This module contains all function used in parsing packets
"""
import re
import logging
logger = logging.getLogger(__name__)
try:
import chardet
except ImportError:
# create fake chardet
class chardet:
@staticmethod
def detect(x):
return {'confidence': 0.0, 'encoding': 'windows-1252'}
from aprslib import string_type_parse
from aprslib.exceptions import (UnknownFormat, ParseError)
from aprslib.parsing.common import *
from aprslib.parsing.misc import *
from aprslib.parsing.position import *
from aprslib.parsing.mice import *
from aprslib.parsing.message import *
from aprslib.parsing.telemetry import *
from aprslib.parsing.weather import *
def _unicode_packet(packet):
# attempt utf-8
try:
return packet.decode('utf-8')
except UnicodeDecodeError:
pass
# attempt to detect encoding
res = chardet.detect(packet.split(b':', 1)[-1])
if res['confidence'] > 0.7 and res['encoding'] != 'EUC-TW':
try:
return packet.decode(res['encoding'])
except UnicodeDecodeError:
pass
# if everything fails
return packet.decode('latin-1')
def parse(packet):
"""
Parses an APRS packet and returns a dict with decoded data
- All attributes are in metric units
"""
if not isinstance(packet, string_type_parse):
raise TypeError("Expected packet to be str/unicode/bytes, got %s", type(packet))
if len(packet) == 0:
raise ParseError("packet is empty", packet)
# attempt to detect encoding
if isinstance(packet, bytes):
packet = _unicode_packet(packet)
packet = packet.rstrip("\r\n")
logger.debug("Parsing: %s", packet)
# split into head and body
try:
(head, body) = packet.split(':', 1)
except:
raise ParseError("packet has no body", packet)
if len(body) == 0:
raise ParseError("packet body is empty", packet)
parsed = {
'raw': packet,
}
# parse head
try:
parsed.update(_parse_header(head))
except ParseError as msg:
raise ParseError(str(msg), packet)
# parse body
packet_type = body[0]
body = body[1:]
if len(body) == 0 and packet_type != '>':
raise ParseError("packet body is empty after packet type character", packet)
# attempt to parse the body
try:
_try_to_parse_body(packet_type, body, parsed)
# capture ParseErrors and attach the packet
except (UnknownFormat, ParseError) as exp:
exp.packet = packet
raise
# if we fail all attempts to parse, try beacon packet
if 'format' not in parsed:
if not re.match(r"^(AIR.*|ALL.*|AP.*|BEACON|CQ.*|GPS.*|DF.*|DGPS.*|"
"DRILL.*|DX.*|ID.*|JAVA.*|MAIL.*|MICE.*|QST.*|QTH.*|"
"RTCM.*|SKY.*|SPACE.*|SPC.*|SYM.*|TEL.*|TEST.*|TLM.*|"
"WX.*|ZIP.*|UIDIGI)$", parsed['to']):
raise UnknownFormat("format is not supported", packet)
parsed.update({
'format': 'beacon',
'text': packet_type + body,
})
logger.debug("Parsed ok.")
return parsed
def _try_to_parse_body(packet_type, body, parsed):
result = {}
# NOT SUPPORTED FORMATS
#
# # - raw weather report
# $ - raw gps
# % - agrelo
# & - reserved
# ( - unused
# ) - item report
# * - complete weather report
# + - reserved
# , - invalid/test format
# - - unused
# . - reserved
# < - station capabilities
# ? - general query format
# T - telemetry report
# [ - maidenhead locator beacon
# \ - unused
# ] - unused
# ^ - unused
# { - user defined
# } - 3rd party traffic
if packet_type in '#$%)*,<?T[{}':
raise UnknownFormat("format is not supported")
# Status report
elif packet_type == '>':
logger.debug("Packet is just a status message")
body, result = _parse_status(packet_type, body)
# Mic-encoded packet
elif packet_type in "`'":
logger.debug("Attempting to parse as mic-e packet")
body, result = _parse_mice(parsed['to'], body)
# Message packet
elif packet_type == ':':
logger.debug("Attempting to parse as message packet")
body, result = _parse_message(body)
# Positionless weather report
elif packet_type == '_':
logger.debug("Attempting to parse as positionless weather report")
body, result = _parse_weather(body)
# postion report (regular or compressed)
elif (packet_type in '!=/@;' or
0 <= body.find('!') < 40): # page 28 of spec (PDF)
body, result = _parse_position(packet_type, body)
# we are done
parsed.update(result)