code and test changes for py3 compability
This commit is contained in:
parent
b3cffc5fca
commit
d10860db57
|
|
@ -1,11 +1,15 @@
|
|||
language: python
|
||||
python:
|
||||
- "2.6"
|
||||
- "2.7"
|
||||
- "3.2"
|
||||
- "3.3"
|
||||
- "3.4"
|
||||
install:
|
||||
- make init
|
||||
- pip install coveralls
|
||||
script:
|
||||
make build
|
||||
make test
|
||||
after_success:
|
||||
coveralls
|
||||
|
||||
|
|
|
|||
2
Makefile
2
Makefile
|
|
@ -21,7 +21,7 @@ init:
|
|||
pip install -r req.txt
|
||||
|
||||
test:
|
||||
rm -f aprslib/*.pyc
|
||||
rm -f .coverage aprslib/*.pyc
|
||||
nosetests --verbosity $(verbosity) --with-coverage --cover-package=aprslib
|
||||
|
||||
pylint:
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ import time
|
|||
import logging
|
||||
import sys
|
||||
|
||||
from . import __version__
|
||||
from . import __version__, string_type
|
||||
from .parsing import parse
|
||||
from .exceptions import (
|
||||
GenericError,
|
||||
|
|
@ -131,14 +131,11 @@ class IS(object):
|
|||
"""
|
||||
Send a line, or multiple lines sperapted by '\\r\\n'
|
||||
"""
|
||||
if not isinstance(line, string_type):
|
||||
raise TypeError("Expected line to be str, got %s", type(line))
|
||||
if not self._connected:
|
||||
raise ConnectionError("not connected")
|
||||
|
||||
if isinstance(line, unicode):
|
||||
line = line.encode('utf8')
|
||||
elif not isinstance(line, str):
|
||||
line = str(line)
|
||||
|
||||
if line == "":
|
||||
return
|
||||
|
||||
|
|
@ -324,7 +321,7 @@ class IS(object):
|
|||
if not short_buf:
|
||||
raise ConnectionDrop("connection dropped")
|
||||
except socket.error as e:
|
||||
if "Resource temporarily unavailable" in e:
|
||||
if "Resource temporarily unavailable" in str(e):
|
||||
if not blocking:
|
||||
if len(self.buf) == 0:
|
||||
break
|
||||
|
|
|
|||
|
|
@ -23,6 +23,19 @@ Currently the library provides facilities to:
|
|||
- Connect and listen to an aprs-is packet feed
|
||||
"""
|
||||
|
||||
# Py2 & Py3 compability
|
||||
import sys
|
||||
if sys.version_info[0] >= 3:
|
||||
is_py3 = True
|
||||
string_type = (str, )
|
||||
string_type_parse = string_type + (bytes, )
|
||||
int_type = int
|
||||
else:
|
||||
is_py3 = False
|
||||
string_type = (str, unicode)
|
||||
string_type_parse = string_type
|
||||
int_type = (int, long)
|
||||
|
||||
# handles reloading
|
||||
if 'IS' in globals():
|
||||
MODULES = __import__('sys').modules
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ Provides facilities for covertion from/to base91
|
|||
__all__ = ['to_decimal', 'from_decimal']
|
||||
from math import log
|
||||
from re import findall
|
||||
from . import string_type, int_type
|
||||
|
||||
|
||||
def to_decimal(text):
|
||||
|
|
@ -29,7 +30,7 @@ def to_decimal(text):
|
|||
Takes a base91 char string and returns decimal
|
||||
"""
|
||||
|
||||
if not isinstance(text, basestring):
|
||||
if not isinstance(text, string_type):
|
||||
raise TypeError("expected str or unicode, %s given" % type(text))
|
||||
|
||||
if findall(r"[\x00-\x20\x7c-\xff]", text):
|
||||
|
|
@ -50,13 +51,17 @@ def from_decimal(number, padding=1):
|
|||
"""
|
||||
text = []
|
||||
|
||||
if not isinstance(number, (int, long)) is not int or number < 0:
|
||||
raise ValueError("non-positive integer error")
|
||||
elif not isinstance(number, (int, long)) or padding < 1:
|
||||
raise ValueError("padding must be integer and >0")
|
||||
if not isinstance(number, int_type):
|
||||
raise TypeError("Expected number to be int, got %s", type(number))
|
||||
elif not isinstance(padding, int_type):
|
||||
raise TypeError("Expected padding to be int, got %s", type(number))
|
||||
elif number < 0:
|
||||
raise ValueError("Expected number to be positive integer")
|
||||
elif padding < 1:
|
||||
raise ValueError("Expected padding to be >0")
|
||||
elif number > 0:
|
||||
for divisor in [91**e for e in reversed(range(int(log(number) / log(91)) + 1))]:
|
||||
quotient = number / divisor
|
||||
quotient = number // divisor
|
||||
number = number % divisor
|
||||
text.append(chr(33 + quotient))
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class GenericError(Exception):
|
|||
"""
|
||||
def __init__(self, message):
|
||||
super(GenericError, self).__init__(message)
|
||||
self.message = message
|
||||
|
||||
|
||||
class UnknownFormat(GenericError):
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ except ImportError:
|
|||
return {'confidence': 0.0, 'encoding': 'windows-1252'}
|
||||
|
||||
from .exceptions import (UnknownFormat, ParseError)
|
||||
from . import base91
|
||||
from . import base91, string_type_parse, is_py3
|
||||
|
||||
__all__ = ['parse']
|
||||
|
||||
|
|
@ -78,12 +78,18 @@ def parse(packet):
|
|||
* status message
|
||||
"""
|
||||
|
||||
if not isinstance(packet, string_type_parse):
|
||||
raise TypeError("Epected packet to be str/unicode/bytes, got %s", type(packet))
|
||||
|
||||
# attempt to detect encoding
|
||||
if isinstance(packet, str):
|
||||
if isinstance(packet, bytes if is_py3 else str):
|
||||
try:
|
||||
packet = packet.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
res = chardet.detect(packet)
|
||||
if is_py3:
|
||||
res = chardet.detect(packet.split(b':', 1)[-1])
|
||||
else:
|
||||
res = chardet.detect(packet.split(':', 1)[-1])
|
||||
|
||||
if res['confidence'] > 0.7:
|
||||
packet = packet.decode(res['encoding'])
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
import unittest
|
||||
import mox
|
||||
import aprslib
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
import os
|
||||
|
||||
import aprslib
|
||||
from mox3 import mox
|
||||
|
||||
|
||||
class TC_IS(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
|
@ -42,7 +42,7 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
def test_socket_readlines(self):
|
||||
fdr, fdw = os.pipe()
|
||||
f = os.fdopen(fdw,'w')
|
||||
f = os.fdopen(fdw, 'w')
|
||||
f.write("something")
|
||||
f.close()
|
||||
|
||||
|
|
@ -57,15 +57,15 @@ class TC_IS(unittest.TestCase):
|
|||
# part 3 - nothing to read
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(socket.error(""
|
||||
"Resource temporarily unavailable"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
|
||||
socket.error("Resource temporarily unavailable"))
|
||||
# part 4 - yield 3 lines (blocking False)
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("a\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(socket.error(""
|
||||
"Resource temporarily unavailable"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
|
||||
socket.error("Resource temporarily unavailable"))
|
||||
# part 5 - yield 3 lines 2 times (blocking True)
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
|
|
@ -76,15 +76,17 @@ class TC_IS(unittest.TestCase):
|
|||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(StopIteration)
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
next_method = '__next__' if sys.version_info[0] >= 3 else 'next'
|
||||
|
||||
# part 1
|
||||
with self.assertRaises(aprslib.exceptions.ConnectionDrop):
|
||||
self.ais._socket_readlines().next()
|
||||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 2
|
||||
with self.assertRaises(aprslib.exceptions.ConnectionDrop):
|
||||
self.ais._socket_readlines().next()
|
||||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 3
|
||||
with self.assertRaises(StopIteration):
|
||||
self.ais._socket_readlines().next()
|
||||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 4
|
||||
for line in self.ais._socket_readlines():
|
||||
self.assertEqual(line, 'a')
|
||||
|
|
@ -260,47 +262,55 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_sendall(self):
|
||||
def test_sendall_type_exception(self):
|
||||
for testType in [5, 0.5, dict, list]:
|
||||
with self.assertRaises(TypeError):
|
||||
self.ais.sendall(testType)
|
||||
|
||||
def test_sendall_not_connected(self):
|
||||
self.ais._connected = False
|
||||
with self.assertRaises(aprslib.ConnectionError):
|
||||
self.ais.sendall("test")
|
||||
|
||||
def test_sendall_socketerror(self):
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.m.StubOutWithMock(self.ais, "close")
|
||||
|
||||
# part 1 not connected
|
||||
#
|
||||
# part 2 socket.error
|
||||
# setup
|
||||
self.ais.sock.setblocking(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.sendall(mox.IgnoreArg()).AndRaise(socket.error)
|
||||
self.ais.close()
|
||||
# part 3 empty input
|
||||
#
|
||||
|
||||
mox.Replay(self.ais.sock)
|
||||
self.m.ReplayAll()
|
||||
|
||||
# part 1
|
||||
self.ais._connected = False
|
||||
with self.assertRaises(aprslib.ConnectionError):
|
||||
self.ais.sendall("test")
|
||||
# part 2
|
||||
# test
|
||||
self.ais._connected = True
|
||||
with self.assertRaises(aprslib.ConnectionError):
|
||||
self.ais.sendall("test")
|
||||
|
||||
# part 3
|
||||
self.ais._connected = True
|
||||
self.ais.sendall("")
|
||||
|
||||
# verify so far
|
||||
# verify
|
||||
mox.Verify(self.ais.sock)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_sendall_empty_input(self):
|
||||
self.ais._connected = True
|
||||
self.ais.sendall("")
|
||||
|
||||
def test_sendall_passing_to_socket(self):
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.m.StubOutWithMock(self.ais, "close")
|
||||
|
||||
# rest
|
||||
_unicode = str if sys.version_info[0] >= 3 else unicode
|
||||
|
||||
self.ais._connected = True
|
||||
for line in [
|
||||
"test", # no \r\n
|
||||
"test\r\n", # with \r\n
|
||||
u"test", # unicode
|
||||
5, # number or anything with __str__
|
||||
"test",
|
||||
"test\r\n",
|
||||
_unicode("test"),
|
||||
_unicode("test\r\n"),
|
||||
]:
|
||||
# setup
|
||||
self.ais.sock = mox.MockAnything()
|
||||
|
|
@ -380,7 +390,7 @@ class TC_IS_consumer(unittest.TestCase):
|
|||
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_consumer_exptions(self):
|
||||
def test_consumer_exceptions(self):
|
||||
self.ais._socket_readlines(False).AndRaise(SystemExit)
|
||||
self.ais._socket_readlines(False).AndRaise(KeyboardInterrupt)
|
||||
self.ais._socket_readlines(False).AndRaise(Exception("random"))
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import unittest
|
||||
import sys
|
||||
|
||||
from aprslib import base91
|
||||
|
||||
|
|
@ -16,23 +17,29 @@ class a_FromDecimal(unittest.TestCase):
|
|||
# 91**2 = "!!
|
||||
# 91**3 = "!!!
|
||||
# etc
|
||||
testData += [[91**i, '"' + '!'*i] for i in xrange(20)]
|
||||
testData += [[91**i, '"' + '!'*i] for i in range(20)]
|
||||
|
||||
for n, expected in testData:
|
||||
self.assertEqual(expected, base91.from_decimal(n))
|
||||
|
||||
def test_invalid_input_type(self):
|
||||
testData = ['0', '1', 1.0, None, [0]]
|
||||
def test_invalid_number_type(self):
|
||||
testData = ['0', '1', 1.0, None, [0], dict]
|
||||
|
||||
for n in testData:
|
||||
self.assertRaises(ValueError, base91.from_decimal, n)
|
||||
self.assertRaises(TypeError, base91.from_decimal, n)
|
||||
|
||||
def test_invalid_input_range(self):
|
||||
def test_invalid_number_range(self):
|
||||
testData = [-10000, -5, -1]
|
||||
|
||||
for n in testData:
|
||||
self.assertRaises(ValueError, base91.from_decimal, n)
|
||||
|
||||
def test_invalid_padding_type(self):
|
||||
testData = ['0', '1', 1.0, None, [0], dict]
|
||||
|
||||
for n in testData:
|
||||
self.assertRaises(TypeError, base91.from_decimal, 0, padding=n)
|
||||
|
||||
def test_valid_padding(self):
|
||||
testData = [1, 2, 5, 10, 100]
|
||||
|
||||
|
|
@ -63,8 +70,10 @@ class b_ToDecimal(unittest.TestCase):
|
|||
# 91**2 = "!!
|
||||
# 91**3 = "!!!
|
||||
# etc
|
||||
testData += [[91**i, '"' + '!'*i] for i in xrange(20)]
|
||||
testData += [[91**i, u'"' + u'!'*i] for i in xrange(20)]
|
||||
testData += [[91**i, '"' + '!'*i] for i in range(20)]
|
||||
|
||||
if sys.version_info[0] < 3:
|
||||
testData += [[91**i, unicode('"') + unicode('!')*i] for i in range(20)]
|
||||
|
||||
for expected, n in testData:
|
||||
self.assertEqual(expected, base91.to_decimal(n))
|
||||
|
|
@ -77,7 +86,7 @@ class b_ToDecimal(unittest.TestCase):
|
|||
|
||||
def test_invalid_input(self):
|
||||
# test for every value outside of the accepted range
|
||||
testData = [chr(i) for i in range(ord('!'))+range(ord('{')+1, 256)]
|
||||
testData = [chr(i) for i in list(range(ord('!')))+list(range(ord('{')+1, 256))]
|
||||
|
||||
# same as above, except each value is prefix with a valid char
|
||||
testData += ['!'+c for c in testData]
|
||||
|
|
@ -88,14 +97,14 @@ class b_ToDecimal(unittest.TestCase):
|
|||
|
||||
class c_Both(unittest.TestCase):
|
||||
def test_from_decimal_to_decimal(self):
|
||||
for number in xrange(91**2 + 5):
|
||||
for number in range(91**2 + 5):
|
||||
text = base91.from_decimal(number)
|
||||
result = base91.to_decimal(text)
|
||||
|
||||
self.assertEqual(result, number)
|
||||
|
||||
def test_stability(self):
|
||||
for number in xrange(50):
|
||||
for number in range(50):
|
||||
largeN = 91 ** number
|
||||
text = base91.from_decimal(largeN)
|
||||
result = base91.to_decimal(text)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
# encoding: utf-8
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
import mox
|
||||
from mox3 import mox
|
||||
|
||||
from aprslib import parse
|
||||
from aprslib import parsing
|
||||
|
|
@ -10,29 +11,38 @@ from aprslib.exceptions import ParseError, UnknownFormat
|
|||
|
||||
class ParseTestCase(unittest.TestCase):
|
||||
def test_unicode(self):
|
||||
def _u(text, c='utf8'):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text
|
||||
else:
|
||||
return text.decode(c)
|
||||
|
||||
|
||||
_unicode = str if sys.version_info[0] >= 3 else unicode
|
||||
|
||||
# 7bit ascii
|
||||
result = parse("A>B:>status")
|
||||
|
||||
self.assertIsInstance(result['status'], unicode)
|
||||
self.assertEqual(result['status'], u"status")
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertEqual(result['status'], _u("status"))
|
||||
|
||||
# string with degree sign
|
||||
result = parse("A>B:>status\xb0")
|
||||
|
||||
self.assertIsInstance(result['status'], unicode)
|
||||
self.assertEqual(result['status'], u"status\xb0")
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertEqual(result['status'], _u("status\xb0",'latin-1'))
|
||||
|
||||
# str with utf8
|
||||
result = parse("A>B:>статус")
|
||||
|
||||
self.assertIsInstance(result['status'], unicode)
|
||||
self.assertEqual(result['status'], u"статус")
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertEqual(result['status'], _u("статус"))
|
||||
|
||||
# unicode input
|
||||
result = parse(u"A>B:>статус")
|
||||
result = parse(_u("A>B:>статус"))
|
||||
|
||||
self.assertIsInstance(result['status'], unicode)
|
||||
self.assertEqual(result['status'], u"статус")
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertEqual(result['status'], _u("статус"))
|
||||
|
||||
def test_empty_packet(self):
|
||||
self.assertRaises(ParseError, parse, "")
|
||||
|
|
@ -78,12 +88,18 @@ class ParseBranchesTestCase(unittest.TestCase):
|
|||
parsing._parse_timestamp(mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(("test", {}))
|
||||
self.m.ReplayAll()
|
||||
|
||||
def _u(text, c='utf8'):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text
|
||||
else:
|
||||
return text.decode(c)
|
||||
|
||||
expected = {
|
||||
'status': 'test',
|
||||
'raw': u'A>B:>test',
|
||||
'raw': _u('A>B:>test'),
|
||||
'via': '',
|
||||
'from': u'A',
|
||||
'to': u'B',
|
||||
'from': _u('A'),
|
||||
'to': _u('B'),
|
||||
'path': [],
|
||||
'format': 'status'
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,8 +42,8 @@ class ParseCommentTelemetry(unittest.TestCase):
|
|||
return "".join(text)
|
||||
|
||||
def test_random_valid_telemetry(self):
|
||||
for i in xrange(100):
|
||||
vals = [randint(0, self.b91max) for x in xrange(randint(1, 5))]
|
||||
for i in range(100):
|
||||
vals = [randint(0, self.b91max) for x in range(randint(1, 5))]
|
||||
|
||||
bits = None
|
||||
|
||||
|
|
|
|||
|
|
@ -10,10 +10,10 @@ from aprslib.exceptions import ParseError
|
|||
class ValidateCallsign(unittest.TestCase):
|
||||
|
||||
def test_valid_input(self):
|
||||
chars = string.letters.upper() + string.digits
|
||||
chars = string.ascii_letters.upper() + string.digits
|
||||
|
||||
def random_valid_callsigns():
|
||||
for x in xrange(0, 500):
|
||||
for x in range(0, 500):
|
||||
call = "".join(sample(chars, randrange(1, 6)))
|
||||
|
||||
if bool(randint(0, 1)):
|
||||
|
|
@ -118,7 +118,7 @@ class ParseHeader(unittest.TestCase):
|
|||
for head in testData:
|
||||
try:
|
||||
_parse_header(head)
|
||||
except ParseError, msg:
|
||||
except ParseError as msg:
|
||||
self.fail("{0}('{1}') PraseError, {2}"
|
||||
.format(_parse_header.__name__, head, msg))
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue