Merge branch 'dev', v0.6.38
dc6356a- bump to v0.6.388ea78c7- mark package as zip_safe8ee87bd- raise exception on long/lat ambiguity mismatcha3a72cb- refined code using bytes type5329bf7- fix code+tests to make them work under py3d55cc91- fix format field index (py2.6 things)b5c0ce0- use unittest2, so tests run under py2.6d10860d- code and test changes for py3 compabilityb3cffc5- set verbosity=1 for make test5f487bb- fix relative import bug introduced ina0b59ee6ac5220- updated email in setup.py3caa412- test_parse code style tweaks92b22bd- removed chardet from req.txt for test suite6798a42- include latest changes in CHANGES filea0b59ee- first part of tests for parsing submodule9fa62c4- don't try to decode already unicode7a168b2- updated shields in README
This commit is contained in:
commit
1046577cb3
|
|
@ -1,11 +1,15 @@
|
|||
language: python
|
||||
python:
|
||||
- "2.6"
|
||||
- "2.7"
|
||||
- "3.2"
|
||||
- "3.3"
|
||||
- "3.4"
|
||||
install:
|
||||
- make init
|
||||
- pip install coveralls
|
||||
script:
|
||||
make build
|
||||
make test
|
||||
after_success:
|
||||
coveralls
|
||||
|
||||
|
|
|
|||
17
CHANGES
17
CHANGES
|
|
@ -1,6 +1,23 @@
|
|||
CHANGES
|
||||
-------
|
||||
|
||||
# v0.6.37
|
||||
|
||||
- charset detection, unicode for all
|
||||
- improved _parse_timestamp
|
||||
- fix mic-e dstcall decoding bug
|
||||
- tiny syntax fix in parse.py
|
||||
|
||||
# v0.6.36
|
||||
|
||||
- fix typo in timestamp format
|
||||
- tweak debug messages for position reports
|
||||
- now able to parse object report format
|
||||
|
||||
# v0.6.35
|
||||
|
||||
- rotated base91 telemetry bits in output
|
||||
|
||||
# v0.6.34
|
||||
|
||||
- fixed timestamps being parsed incorrectly
|
||||
|
|
|
|||
6
Makefile
6
Makefile
|
|
@ -11,6 +11,8 @@ Available commands:
|
|||
|
||||
endef
|
||||
|
||||
verbosity=1
|
||||
|
||||
export HELPBODY
|
||||
help:
|
||||
@echo "$$HELPBODY"
|
||||
|
|
@ -19,8 +21,8 @@ init:
|
|||
pip install -r req.txt
|
||||
|
||||
test:
|
||||
rm -f aprslib/*.pyc
|
||||
nosetests --verbosity 2 --with-coverage --cover-package=aprslib
|
||||
rm -f .coverage aprslib/*.pyc
|
||||
nosetests --verbosity $(verbosity) --with-coverage --cover-package=aprslib
|
||||
|
||||
pylint:
|
||||
pylint -r n -f colorized aprslib || true
|
||||
|
|
|
|||
20
README.rst
20
README.rst
|
|
@ -1,7 +1,7 @@
|
|||
APRS library for Python
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|Build Status| |Coverage Status|
|
||||
|pypi| |coverage| |master_build| |dev_build|
|
||||
|
||||
A tiny library for dealing with APRS. It can be used to connect and listen to the APRS-IS feed as well as upload.
|
||||
Parsing of packets is also possible, but the entire spec is not fully implemented yet.
|
||||
|
|
@ -150,9 +150,19 @@ Docs
|
|||
|
||||
$ python -m pydoc aprslib
|
||||
|
||||
.. |Build Status| image:: https://travis-ci.org/rossengeorgiev/aprs-python.svg?branch=master
|
||||
:target: https://travis-ci.org/rossengeorgiev/aprs-python
|
||||
.. |Coverage Status| image:: https://coveralls.io/repos/rossengeorgiev/aprs-python/badge.png?branch=master
|
||||
:target: https://coveralls.io/r/rossengeorgiev/aprs-python?branch=master
|
||||
.. |pypi| image:: https://img.shields.io/pypi/v/aprslib.svg?style=flat&label=latest%20version
|
||||
:target: https://pypi.python.org/pypi/aprslib
|
||||
:alt: Latest version released on PyPi
|
||||
|
||||
.. |coverage| image:: https://img.shields.io/coveralls/rossengeorgiev/aprs-python/master.svg?style=flat
|
||||
:target: https://coveralls.io/r/rossengeorgiev/aprs-python?branch=master
|
||||
:alt: Test coverage
|
||||
|
||||
.. |master_build| image:: https://img.shields.io/travis/rossengeorgiev/aprs-python/master.svg?style=flat&label=master%20build
|
||||
:target: http://travis-ci.org/rossengeorgiev/aprs-python
|
||||
:alt: Build status of master branch
|
||||
|
||||
.. |dev_build| image:: https://img.shields.io/travis/rossengeorgiev/aprs-python/dev.svg?style=flat&label=dev%20build
|
||||
:target: http://travis-ci.org/rossengeorgiev/aprs-python
|
||||
:alt: Build status of dev branch
|
||||
|
||||
|
|
|
|||
|
|
@ -24,8 +24,8 @@ import time
|
|||
import logging
|
||||
import sys
|
||||
|
||||
from . import __version__
|
||||
from .parse import parse
|
||||
from . import __version__, string_type, is_py3
|
||||
from .parsing import parse
|
||||
from .exceptions import (
|
||||
GenericError,
|
||||
ConnectionDrop,
|
||||
|
|
@ -67,7 +67,12 @@ class IS(object):
|
|||
self.filter = "" # default filter, everything
|
||||
|
||||
self._connected = False
|
||||
self.buf = ''
|
||||
self.buf = b''
|
||||
|
||||
def _sendall(self, text):
|
||||
if is_py3:
|
||||
text = text.encode('utf-8')
|
||||
self.sock.sendall(text)
|
||||
|
||||
def set_filter(self, filter_text):
|
||||
"""
|
||||
|
|
@ -78,7 +83,7 @@ class IS(object):
|
|||
self.logger.info("Setting filter to: %s", self.filter)
|
||||
|
||||
if self._connected:
|
||||
self.sock.sendall("#filter %s\r\n" % self.filter)
|
||||
self._sendall("#filter %s\r\n" % self.filter)
|
||||
|
||||
def set_login(self, callsign, passwd):
|
||||
"""
|
||||
|
|
@ -122,7 +127,7 @@ class IS(object):
|
|||
"""
|
||||
|
||||
self._connected = False
|
||||
self.buf = ''
|
||||
self.buf = b''
|
||||
|
||||
if self.sock is not None:
|
||||
self.sock.close()
|
||||
|
|
@ -131,14 +136,11 @@ class IS(object):
|
|||
"""
|
||||
Send a line, or multiple lines sperapted by '\\r\\n'
|
||||
"""
|
||||
if not isinstance(line, string_type):
|
||||
raise TypeError("Expected line to be str, got %s", type(line))
|
||||
if not self._connected:
|
||||
raise ConnectionError("not connected")
|
||||
|
||||
if isinstance(line, unicode):
|
||||
line = line.encode('utf8')
|
||||
elif not isinstance(line, str):
|
||||
line = str(line)
|
||||
|
||||
if line == "":
|
||||
return
|
||||
|
||||
|
|
@ -147,7 +149,7 @@ class IS(object):
|
|||
try:
|
||||
self.sock.setblocking(1)
|
||||
self.sock.settimeout(5)
|
||||
self.sock.sendall(line)
|
||||
self._sendall(line)
|
||||
except socket.error as exp:
|
||||
self.close()
|
||||
raise ConnectionError(str(exp))
|
||||
|
|
@ -242,6 +244,8 @@ class IS(object):
|
|||
# pylint: enable=E1103
|
||||
|
||||
banner = self.sock.recv(512)
|
||||
if is_py3:
|
||||
banner = banner.decode('latin-1')
|
||||
|
||||
if banner[0] == "#":
|
||||
self.logger.debug("Banner: %s", banner.rstrip())
|
||||
|
|
@ -276,9 +280,12 @@ class IS(object):
|
|||
self.logger.info("Sending login information")
|
||||
|
||||
try:
|
||||
self.sock.sendall(login_str)
|
||||
self._sendall(login_str)
|
||||
self.sock.settimeout(5)
|
||||
test = self.sock.recv(len(login_str) + 100).rstrip()
|
||||
test = self.sock.recv(len(login_str) + 100)
|
||||
if is_py3:
|
||||
test = test.decode('latin-1')
|
||||
test = test.rstrip()
|
||||
|
||||
self.logger.debug("Server: %s", test)
|
||||
|
||||
|
|
@ -313,7 +320,8 @@ class IS(object):
|
|||
raise ConnectionDrop("connection dropped")
|
||||
|
||||
while True:
|
||||
short_buf = ''
|
||||
short_buf = b''
|
||||
newline = b'\r\n'
|
||||
|
||||
select.select([self.sock], [], [], None if blocking else 0)
|
||||
|
||||
|
|
@ -324,14 +332,14 @@ class IS(object):
|
|||
if not short_buf:
|
||||
raise ConnectionDrop("connection dropped")
|
||||
except socket.error as e:
|
||||
if "Resource temporarily unavailable" in e:
|
||||
if "Resource temporarily unavailable" in str(e):
|
||||
if not blocking:
|
||||
if len(self.buf) == 0:
|
||||
break
|
||||
|
||||
self.buf += short_buf
|
||||
|
||||
while "\r\n" in self.buf:
|
||||
line, self.buf = self.buf.split("\r\n", 1)
|
||||
while newline in self.buf:
|
||||
line, self.buf = self.buf.split(newline, 1)
|
||||
|
||||
yield line
|
||||
|
|
|
|||
|
|
@ -23,6 +23,19 @@ Currently the library provides facilities to:
|
|||
- Connect and listen to an aprs-is packet feed
|
||||
"""
|
||||
|
||||
# Py2 & Py3 compability
|
||||
import sys
|
||||
if sys.version_info[0] >= 3:
|
||||
is_py3 = True
|
||||
string_type = (str, )
|
||||
string_type_parse = string_type + (bytes, )
|
||||
int_type = int
|
||||
else:
|
||||
is_py3 = False
|
||||
string_type = (str, unicode)
|
||||
string_type_parse = string_type
|
||||
int_type = (int, long)
|
||||
|
||||
# handles reloading
|
||||
if 'IS' in globals():
|
||||
MODULES = __import__('sys').modules
|
||||
|
|
@ -37,12 +50,12 @@ from datetime import date as _date
|
|||
__date__ = str(_date.today())
|
||||
del _date
|
||||
|
||||
__version__ = "0.6.37"
|
||||
__version__ = "0.6.38"
|
||||
__author__ = "Rossen Georgiev"
|
||||
__all__ = ['IS', 'parse', 'passcode']
|
||||
|
||||
from .exceptions import *
|
||||
from .parse import parse
|
||||
from .parsing import parse
|
||||
from .passcode import passcode
|
||||
|
||||
from .IS import IS
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ Provides facilities for covertion from/to base91
|
|||
__all__ = ['to_decimal', 'from_decimal']
|
||||
from math import log
|
||||
from re import findall
|
||||
from . import string_type, int_type
|
||||
|
||||
|
||||
def to_decimal(text):
|
||||
|
|
@ -29,7 +30,7 @@ def to_decimal(text):
|
|||
Takes a base91 char string and returns decimal
|
||||
"""
|
||||
|
||||
if not isinstance(text, basestring):
|
||||
if not isinstance(text, string_type):
|
||||
raise TypeError("expected str or unicode, %s given" % type(text))
|
||||
|
||||
if findall(r"[\x00-\x20\x7c-\xff]", text):
|
||||
|
|
@ -50,13 +51,17 @@ def from_decimal(number, padding=1):
|
|||
"""
|
||||
text = []
|
||||
|
||||
if not isinstance(number, (int, long)) is not int or number < 0:
|
||||
raise ValueError("non-positive integer error")
|
||||
elif not isinstance(number, (int, long)) or padding < 1:
|
||||
raise ValueError("padding must be integer and >0")
|
||||
if not isinstance(number, int_type):
|
||||
raise TypeError("Expected number to be int, got %s", type(number))
|
||||
elif not isinstance(padding, int_type):
|
||||
raise TypeError("Expected padding to be int, got %s", type(number))
|
||||
elif number < 0:
|
||||
raise ValueError("Expected number to be positive integer")
|
||||
elif padding < 1:
|
||||
raise ValueError("Expected padding to be >0")
|
||||
elif number > 0:
|
||||
for divisor in [91**e for e in reversed(range(int(log(number) / log(91)) + 1))]:
|
||||
quotient = number / divisor
|
||||
quotient = number // divisor
|
||||
number = number % divisor
|
||||
text.append(chr(33 + quotient))
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class GenericError(Exception):
|
|||
"""
|
||||
def __init__(self, message):
|
||||
super(GenericError, self).__init__(message)
|
||||
self.message = message
|
||||
|
||||
|
||||
class UnknownFormat(GenericError):
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ except ImportError:
|
|||
return {'confidence': 0.0, 'encoding': 'windows-1252'}
|
||||
|
||||
from .exceptions import (UnknownFormat, ParseError)
|
||||
from . import base91
|
||||
from . import base91, string_type_parse, is_py3
|
||||
|
||||
__all__ = ['parse']
|
||||
|
||||
|
|
@ -78,16 +78,20 @@ def parse(packet):
|
|||
* status message
|
||||
"""
|
||||
|
||||
# attempt to detect encoding
|
||||
try:
|
||||
packet = packet.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
res = chardet.detect(packet)
|
||||
if not isinstance(packet, string_type_parse):
|
||||
raise TypeError("Epected packet to be str/unicode/bytes, got %s", type(packet))
|
||||
|
||||
if res['confidence'] > 0.7:
|
||||
packet = packet.decode(res['encoding'])
|
||||
else:
|
||||
packet = packet.decode('latin-1')
|
||||
# attempt to detect encoding
|
||||
if isinstance(packet, bytes):
|
||||
try:
|
||||
packet = packet.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
res = chardet.detect(packet.split(b':', 1)[-1])
|
||||
|
||||
if res['confidence'] > 0.7:
|
||||
packet = packet.decode(res['encoding'])
|
||||
else:
|
||||
packet = packet.decode('latin-1')
|
||||
|
||||
packet = packet.rstrip("\r\n")
|
||||
logger.debug("Parsing: %s", packet)
|
||||
|
|
@ -98,7 +102,7 @@ def parse(packet):
|
|||
# typical packet format
|
||||
#
|
||||
# CALL1>CALL2,CALL3,CALL4:>longtext......
|
||||
# |--------header--------|-----body-------|
|
||||
# |--------header---------|-----body------|
|
||||
#
|
||||
try:
|
||||
(head, body) = packet.split(':', 1)
|
||||
|
|
@ -864,6 +868,10 @@ def _parse_normal(body):
|
|||
|
||||
# position ambiguity
|
||||
posambiguity = lat_min.count(' ')
|
||||
|
||||
if posambiguity != lon_min.count(' '):
|
||||
raise ParseError("latitude and longitude ambiguity mismatch")
|
||||
|
||||
parsed.update({'posambiguity': posambiguity})
|
||||
|
||||
# we center the position inside the ambiguity box
|
||||
4
setup.py
4
setup.py
|
|
@ -17,7 +17,7 @@ setup(
|
|||
long_description=long_description,
|
||||
url='https://github.com/rossengeorgiev/aprs-python',
|
||||
author='Rossen Georgiev',
|
||||
author_email='zx.devel@gmail.com',
|
||||
author_email='hello@rgp.io',
|
||||
license='GPLv2',
|
||||
classifiers=[
|
||||
'Development Status :: 4 - Beta',
|
||||
|
|
@ -31,5 +31,5 @@ setup(
|
|||
keywords='aprs aprslib parse parsing aprs-is library base91',
|
||||
packages=['aprslib'],
|
||||
install_requires=[],
|
||||
zip_safe=False,
|
||||
zip_safe=True,
|
||||
)
|
||||
|
|
|
|||
146
tests/test_IS.py
146
tests/test_IS.py
|
|
@ -1,11 +1,13 @@
|
|||
import unittest
|
||||
import mox
|
||||
import aprslib
|
||||
import logging
|
||||
import unittest2 as unittest
|
||||
import socket
|
||||
import sys
|
||||
import os
|
||||
|
||||
import aprslib
|
||||
from mox3 import mox
|
||||
|
||||
|
||||
# byte shim for testing in both py2 and py3
|
||||
|
||||
class TC_IS(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
|
@ -17,7 +19,7 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
def test_initilization(self):
|
||||
self.assertFalse(self.ais._connected)
|
||||
self.assertEqual(self.ais.buf, '')
|
||||
self.assertEqual(self.ais.buf, b'')
|
||||
self.assertIsNone(self.ais.sock)
|
||||
self.assertEqual(self.ais.callsign, "LZ1DEV-99")
|
||||
self.assertEqual(self.ais.passwd, "testpwd")
|
||||
|
|
@ -25,16 +27,15 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
def test_close(self):
|
||||
self.ais._connected = True
|
||||
s = mox.MockAnything()
|
||||
s.close()
|
||||
self.ais.sock = s
|
||||
mox.Replay(s)
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.ais.sock.close()
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
self.ais.close()
|
||||
|
||||
mox.Verify(s)
|
||||
mox.Verify(self.ais.sock)
|
||||
self.assertFalse(self.ais._connected)
|
||||
self.assertEqual(self.ais.buf, '')
|
||||
self.assertEqual(self.ais.buf, b'')
|
||||
|
||||
def test_open_socket(self):
|
||||
with self.assertRaises(socket.error):
|
||||
|
|
@ -42,7 +43,7 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
def test_socket_readlines(self):
|
||||
fdr, fdw = os.pipe()
|
||||
f = os.fdopen(fdw,'w')
|
||||
f = os.fdopen(fdw, 'w')
|
||||
f.write("something")
|
||||
f.close()
|
||||
|
||||
|
|
@ -53,78 +54,81 @@ class TC_IS(unittest.TestCase):
|
|||
# part 2 - conn drop trying to recv
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn('')
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b'')
|
||||
# part 3 - nothing to read
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(socket.error(""
|
||||
"Resource temporarily unavailable"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
|
||||
socket.error("Resource temporarily unavailable"))
|
||||
# part 4 - yield 3 lines (blocking False)
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("a\r\n"*3)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"a\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(socket.error(""
|
||||
"Resource temporarily unavailable"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
|
||||
socket.error("Resource temporarily unavailable"))
|
||||
# part 5 - yield 3 lines 2 times (blocking True)
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("b\r\n"*3)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"b\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("b\r\n"*3)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"b\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(StopIteration)
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
next_method = '__next__' if sys.version_info[0] >= 3 else 'next'
|
||||
|
||||
# part 1
|
||||
with self.assertRaises(aprslib.exceptions.ConnectionDrop):
|
||||
self.ais._socket_readlines().next()
|
||||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 2
|
||||
with self.assertRaises(aprslib.exceptions.ConnectionDrop):
|
||||
self.ais._socket_readlines().next()
|
||||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 3
|
||||
with self.assertRaises(StopIteration):
|
||||
self.ais._socket_readlines().next()
|
||||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 4
|
||||
for line in self.ais._socket_readlines():
|
||||
self.assertEqual(line, 'a')
|
||||
self.assertEqual(line, b'a')
|
||||
# part 5
|
||||
for line in self.ais._socket_readlines(blocking=True):
|
||||
self.assertEqual(line, 'b')
|
||||
self.assertEqual(line, b'b')
|
||||
|
||||
mox.Verify(self.ais.sock)
|
||||
|
||||
def test_send_login(self):
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.m.StubOutWithMock(self.ais, "close")
|
||||
self.m.StubOutWithMock(self.ais, "_sendall")
|
||||
# part 1 - raises
|
||||
self.ais.sock.sendall(mox.IgnoreArg())
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("invalidreply")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"invalidreply")
|
||||
self.ais.close()
|
||||
# part 2 - raises (empty callsign)
|
||||
self.ais.sock.sendall(mox.IgnoreArg())
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("# logresp verified, xx")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp verified, xx")
|
||||
self.ais.close()
|
||||
# part 3 - raises (callsign doesn't match
|
||||
self.ais.sock.sendall(mox.IgnoreArg())
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("# logresp NOMATCH verified, xx")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp NOMATCH verified, xx")
|
||||
self.ais.close()
|
||||
# part 4 - raises (unverified, but pass is not -1)
|
||||
self.ais.sock.sendall(mox.IgnoreArg())
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("# logresp CALL unverified, xx")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp CALL unverified, xx")
|
||||
self.ais.close()
|
||||
# part 5 - normal, receive only
|
||||
self.ais.sock.sendall(mox.IgnoreArg())
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("# logresp CALL unverified, xx")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp CALL unverified, xx")
|
||||
# part 6 - normal, correct pass
|
||||
self.ais.sock.sendall(mox.IgnoreArg())
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("# logresp CALL verified, xx")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp CALL verified, xx")
|
||||
mox.Replay(self.ais.sock)
|
||||
self.m.ReplayAll()
|
||||
|
||||
|
|
@ -169,7 +173,7 @@ class TC_IS(unittest.TestCase):
|
|||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("junk")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"junk")
|
||||
self.ais.close()
|
||||
# part 3 - everything going well
|
||||
self.ais._open_socket()
|
||||
|
|
@ -181,7 +185,7 @@ class TC_IS(unittest.TestCase):
|
|||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("# server banner")
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# server banner")
|
||||
mox.Replay(self.ais.sock)
|
||||
self.m.ReplayAll()
|
||||
|
||||
|
|
@ -201,18 +205,17 @@ class TC_IS(unittest.TestCase):
|
|||
self.m.VerifyAll()
|
||||
|
||||
def test_filter(self):
|
||||
testFilter = "x/CALLSIGN"
|
||||
testFilter = 'x/CALLSIGN'
|
||||
|
||||
self.ais._connected = True
|
||||
s = mox.MockAnything()
|
||||
s.sendall("#filter %s\r\n" % testFilter)
|
||||
self.ais.sock = s
|
||||
mox.Replay(s)
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.ais.sock.sendall(b'#filter ' + testFilter.encode('ascii') + b'\r\n')
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
self.ais.set_filter(testFilter)
|
||||
self.assertEqual(self.ais.filter, testFilter)
|
||||
|
||||
mox.Verify(s)
|
||||
mox.Verify(self.ais.sock)
|
||||
|
||||
def test_connect_from_notconnected(self):
|
||||
self.m.StubOutWithMock(self.ais, "_connect")
|
||||
|
|
@ -260,53 +263,62 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_sendall(self):
|
||||
def test_sendall_type_exception(self):
|
||||
for testType in [5, 0.5, dict, list]:
|
||||
with self.assertRaises(TypeError):
|
||||
self.ais.sendall(testType)
|
||||
|
||||
def test_sendall_not_connected(self):
|
||||
self.ais._connected = False
|
||||
with self.assertRaises(aprslib.ConnectionError):
|
||||
self.ais.sendall("test")
|
||||
|
||||
def test_sendall_socketerror(self):
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.m.StubOutWithMock(self.ais, "close")
|
||||
|
||||
# part 1 not connected
|
||||
#
|
||||
# part 2 socket.error
|
||||
# setup
|
||||
self.ais.sock.setblocking(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.sendall(mox.IgnoreArg()).AndRaise(socket.error)
|
||||
self.ais.close()
|
||||
# part 3 empty input
|
||||
#
|
||||
|
||||
mox.Replay(self.ais.sock)
|
||||
self.m.ReplayAll()
|
||||
|
||||
# part 1
|
||||
self.ais._connected = False
|
||||
with self.assertRaises(aprslib.ConnectionError):
|
||||
self.ais.sendall("test")
|
||||
# part 2
|
||||
# test
|
||||
self.ais._connected = True
|
||||
with self.assertRaises(aprslib.ConnectionError):
|
||||
self.ais.sendall("test")
|
||||
|
||||
# part 3
|
||||
self.ais._connected = True
|
||||
self.ais.sendall("")
|
||||
|
||||
# verify so far
|
||||
# verify
|
||||
mox.Verify(self.ais.sock)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_sendall_empty_input(self):
|
||||
self.ais._connected = True
|
||||
self.ais.sendall("")
|
||||
|
||||
def test_sendall_passing_to_socket(self):
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.m.StubOutWithMock(self.ais, "close")
|
||||
self.m.StubOutWithMock(self.ais, "_sendall")
|
||||
|
||||
# rest
|
||||
_unicode = str if sys.version_info[0] >= 3 else unicode
|
||||
|
||||
self.ais._connected = True
|
||||
for line in [
|
||||
"test", # no \r\n
|
||||
"test\r\n", # with \r\n
|
||||
u"test", # unicode
|
||||
5, # number or anything with __str__
|
||||
"test",
|
||||
"test\r\n",
|
||||
_unicode("test"),
|
||||
_unicode("test\r\n"),
|
||||
]:
|
||||
# setup
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.ais.sock.setblocking(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.sendall("%s\r\n" % str(line).rstrip('\r\n')).AndReturn(None)
|
||||
self.ais._sendall(b"%c" + line.rstrip('\r\n').encode('ascii') + b'\r\n').AndReturn(None)
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
self.ais.sendall(line)
|
||||
|
|
@ -380,7 +392,7 @@ class TC_IS_consumer(unittest.TestCase):
|
|||
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_consumer_exptions(self):
|
||||
def test_consumer_exceptions(self):
|
||||
self.ais._socket_readlines(False).AndRaise(SystemExit)
|
||||
self.ais._socket_readlines(False).AndRaise(KeyboardInterrupt)
|
||||
self.ais._socket_readlines(False).AndRaise(Exception("random"))
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import unittest
|
||||
import unittest2 as unittest
|
||||
import sys
|
||||
|
||||
from aprslib import base91
|
||||
|
||||
|
|
@ -16,23 +17,29 @@ class a_FromDecimal(unittest.TestCase):
|
|||
# 91**2 = "!!
|
||||
# 91**3 = "!!!
|
||||
# etc
|
||||
testData += [[91**i, '"' + '!'*i] for i in xrange(20)]
|
||||
testData += [[91**i, '"' + '!'*i] for i in range(20)]
|
||||
|
||||
for n, expected in testData:
|
||||
self.assertEqual(expected, base91.from_decimal(n))
|
||||
|
||||
def test_invalid_input_type(self):
|
||||
testData = ['0', '1', 1.0, None, [0]]
|
||||
def test_invalid_number_type(self):
|
||||
testData = ['0', '1', 1.0, None, [0], dict]
|
||||
|
||||
for n in testData:
|
||||
self.assertRaises(ValueError, base91.from_decimal, n)
|
||||
self.assertRaises(TypeError, base91.from_decimal, n)
|
||||
|
||||
def test_invalid_input_range(self):
|
||||
def test_invalid_number_range(self):
|
||||
testData = [-10000, -5, -1]
|
||||
|
||||
for n in testData:
|
||||
self.assertRaises(ValueError, base91.from_decimal, n)
|
||||
|
||||
def test_invalid_padding_type(self):
|
||||
testData = ['0', '1', 1.0, None, [0], dict]
|
||||
|
||||
for n in testData:
|
||||
self.assertRaises(TypeError, base91.from_decimal, 0, padding=n)
|
||||
|
||||
def test_valid_padding(self):
|
||||
testData = [1, 2, 5, 10, 100]
|
||||
|
||||
|
|
@ -63,8 +70,10 @@ class b_ToDecimal(unittest.TestCase):
|
|||
# 91**2 = "!!
|
||||
# 91**3 = "!!!
|
||||
# etc
|
||||
testData += [[91**i, '"' + '!'*i] for i in xrange(20)]
|
||||
testData += [[91**i, u'"' + u'!'*i] for i in xrange(20)]
|
||||
testData += [[91**i, '"' + '!'*i] for i in range(20)]
|
||||
|
||||
if sys.version_info[0] < 3:
|
||||
testData += [[91**i, unicode('"') + unicode('!')*i] for i in range(20)]
|
||||
|
||||
for expected, n in testData:
|
||||
self.assertEqual(expected, base91.to_decimal(n))
|
||||
|
|
@ -77,7 +86,7 @@ class b_ToDecimal(unittest.TestCase):
|
|||
|
||||
def test_invalid_input(self):
|
||||
# test for every value outside of the accepted range
|
||||
testData = [chr(i) for i in range(ord('!'))+range(ord('{')+1, 256)]
|
||||
testData = [chr(i) for i in list(range(ord('!')))+list(range(ord('{')+1, 256))]
|
||||
|
||||
# same as above, except each value is prefix with a valid char
|
||||
testData += ['!'+c for c in testData]
|
||||
|
|
@ -88,14 +97,14 @@ class b_ToDecimal(unittest.TestCase):
|
|||
|
||||
class c_Both(unittest.TestCase):
|
||||
def test_from_decimal_to_decimal(self):
|
||||
for number in xrange(91**2 + 5):
|
||||
for number in range(91**2 + 5):
|
||||
text = base91.from_decimal(number)
|
||||
result = base91.to_decimal(text)
|
||||
|
||||
self.assertEqual(result, number)
|
||||
|
||||
def test_stability(self):
|
||||
for number in xrange(50):
|
||||
for number in range(50):
|
||||
largeN = 91 ** number
|
||||
text = base91.from_decimal(largeN)
|
||||
result = base91.to_decimal(text)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import unittest
|
||||
import unittest2 as unittest
|
||||
|
||||
from aprslib.exceptions import *
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,133 @@
|
|||
# encoding: utf-8
|
||||
|
||||
import sys
|
||||
import unittest2 as unittest
|
||||
from mox3 import mox
|
||||
|
||||
from aprslib import parse
|
||||
from aprslib import parsing
|
||||
from aprslib.exceptions import ParseError, UnknownFormat
|
||||
|
||||
|
||||
def _u(text, c='utf8'):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text
|
||||
else:
|
||||
return text.decode(c)
|
||||
|
||||
|
||||
class ParseTestCase(unittest.TestCase):
|
||||
def test_unicode(self):
|
||||
_unicode = str if sys.version_info[0] >= 3 else unicode
|
||||
|
||||
# 7bit ascii
|
||||
result = parse("A>B:>status")
|
||||
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("status"))
|
||||
|
||||
# string with degree sign
|
||||
result = parse("A>B:>status\xb0")
|
||||
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("status\xb0", 'latin-1'))
|
||||
|
||||
# str with utf8
|
||||
result = parse("A>B:>статус")
|
||||
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("статус"))
|
||||
|
||||
# unicode input
|
||||
result = parse(_u("A>B:>статус"))
|
||||
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("статус"))
|
||||
|
||||
def test_empty_packet(self):
|
||||
self.assertRaises(ParseError, parse, "")
|
||||
|
||||
def test_no_body(self):
|
||||
self.assertRaises(ParseError, parse, "A>B")
|
||||
|
||||
def test_empty_body(self):
|
||||
self.assertRaises(ParseError, parse, "A>B:")
|
||||
|
||||
def test_parse_header_exception(self):
|
||||
self.assertRaises(ParseError, parse, "A:asd")
|
||||
|
||||
def test_empty_body_of_format_that_is_not_status(self):
|
||||
self.assertRaises(ParseError, parse, "A>B:!")
|
||||
|
||||
try:
|
||||
parse("A>B:>")
|
||||
except:
|
||||
self.fail("empty status packet shouldn't raise exception")
|
||||
|
||||
def test_unsupported_formats_raising(self):
|
||||
with self.assertRaises(UnknownFormat):
|
||||
for packet_type in '#$%)*,<?T[_{}':
|
||||
packet = "A>B:%saaa" % packet_type
|
||||
|
||||
try:
|
||||
parse(packet)
|
||||
except UnknownFormat as exp:
|
||||
self.assertEqual(exp.packet, packet)
|
||||
raise
|
||||
|
||||
|
||||
class ParseBranchesTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.m = mox.Mox()
|
||||
|
||||
def tearDown(self):
|
||||
self.m.UnsetStubs()
|
||||
|
||||
def test_status_format_branch(self):
|
||||
self.m.StubOutWithMock(parsing, "_parse_timestamp")
|
||||
parsing._parse_timestamp(mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(("test", {}))
|
||||
self.m.ReplayAll()
|
||||
|
||||
def _u(text, c='utf8'):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text
|
||||
else:
|
||||
return text.decode(c)
|
||||
|
||||
expected = {
|
||||
'status': 'test',
|
||||
'raw': _u('A>B:>test'),
|
||||
'via': '',
|
||||
'from': _u('A'),
|
||||
'to': _u('B'),
|
||||
'path': [],
|
||||
'format': 'status'
|
||||
}
|
||||
result = parse("A>B:>test")
|
||||
|
||||
self.assertEqual(result, expected)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_mice_format_branch(self):
|
||||
self.m.StubOutWithMock(parsing, "_parse_mice")
|
||||
parsing._parse_mice("B", "test").AndReturn(('', {'format': ''}))
|
||||
parsing._parse_mice("D", "test").AndReturn(('', {'format': ''}))
|
||||
self.m.ReplayAll()
|
||||
|
||||
parse("A>B:`test")
|
||||
parse("C>D:'test")
|
||||
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_message_format_branch(self):
|
||||
self.m.StubOutWithMock(parsing, "_parse_message")
|
||||
parsing._parse_message("test").AndReturn(('', {'format': ''}))
|
||||
self.m.ReplayAll()
|
||||
|
||||
parse("A>B::test")
|
||||
|
||||
self.m.VerifyAll()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
import unittest
|
||||
import unittest2 as unittest
|
||||
|
||||
from aprslib.parse import _parse_comment_telemetry
|
||||
from aprslib.parsing import _parse_comment_telemetry
|
||||
from aprslib import base91
|
||||
from random import randint
|
||||
|
||||
|
|
@ -42,13 +42,13 @@ class ParseCommentTelemetry(unittest.TestCase):
|
|||
return "".join(text)
|
||||
|
||||
def test_random_valid_telemetry(self):
|
||||
for i in xrange(100):
|
||||
vals = [randint(0, self.b91max) for x in xrange(randint(1, 5))]
|
||||
for i in range(100):
|
||||
vals = [randint(0, self.b91max) for x in range(randint(1, 5))]
|
||||
|
||||
bits = None
|
||||
|
||||
if len(vals) is 5 and randint(1, 10) > 5:
|
||||
bits = "{:08b}".format(randint(0, 255))[::-1]
|
||||
bits = "{0:08b}".format(randint(0, 255))[::-1]
|
||||
|
||||
testData = self.genTelem(i, vals, bits)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,19 +1,19 @@
|
|||
import unittest
|
||||
import unittest2 as unittest
|
||||
import string
|
||||
from random import randint, randrange, sample
|
||||
|
||||
from aprslib.parse import _parse_header
|
||||
from aprslib.parse import _validate_callsign
|
||||
from aprslib.parsing import _parse_header
|
||||
from aprslib.parsing import _validate_callsign
|
||||
from aprslib.exceptions import ParseError
|
||||
|
||||
|
||||
class ValidateCallsign(unittest.TestCase):
|
||||
|
||||
def test_valid_input(self):
|
||||
chars = string.letters.upper() + string.digits
|
||||
chars = string.ascii_letters.upper() + string.digits
|
||||
|
||||
def random_valid_callsigns():
|
||||
for x in xrange(0, 500):
|
||||
for x in range(0, 500):
|
||||
call = "".join(sample(chars, randrange(1, 6)))
|
||||
|
||||
if bool(randint(0, 1)):
|
||||
|
|
@ -118,7 +118,7 @@ class ParseHeader(unittest.TestCase):
|
|||
for head in testData:
|
||||
try:
|
||||
_parse_header(head)
|
||||
except ParseError, msg:
|
||||
except ParseError as msg:
|
||||
self.fail("{0}('{1}') PraseError, {2}"
|
||||
.format(_parse_header.__name__, head, msg))
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import unittest
|
||||
import unittest2 as unittest
|
||||
from aprslib import passcode
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue