code and test changes for py3 compability

This commit is contained in:
Rossen Georgiev 2015-05-16 21:50:11 +01:00
parent b3cffc5fca
commit d10860db57
13 changed files with 140 additions and 80 deletions

View File

@ -1,11 +1,15 @@
language: python
python:
- "2.6"
- "2.7"
- "3.2"
- "3.3"
- "3.4"
install:
- make init
- pip install coveralls
script:
make build
make test
after_success:
coveralls

View File

@ -21,7 +21,7 @@ init:
pip install -r req.txt
test:
rm -f aprslib/*.pyc
rm -f .coverage aprslib/*.pyc
nosetests --verbosity $(verbosity) --with-coverage --cover-package=aprslib
pylint:

View File

@ -24,7 +24,7 @@ import time
import logging
import sys
from . import __version__
from . import __version__, string_type
from .parsing import parse
from .exceptions import (
GenericError,
@ -131,14 +131,11 @@ class IS(object):
"""
Send a line, or multiple lines sperapted by '\\r\\n'
"""
if not isinstance(line, string_type):
raise TypeError("Expected line to be str, got %s", type(line))
if not self._connected:
raise ConnectionError("not connected")
if isinstance(line, unicode):
line = line.encode('utf8')
elif not isinstance(line, str):
line = str(line)
if line == "":
return
@ -324,7 +321,7 @@ class IS(object):
if not short_buf:
raise ConnectionDrop("connection dropped")
except socket.error as e:
if "Resource temporarily unavailable" in e:
if "Resource temporarily unavailable" in str(e):
if not blocking:
if len(self.buf) == 0:
break

View File

@ -23,6 +23,19 @@ Currently the library provides facilities to:
- Connect and listen to an aprs-is packet feed
"""
# Py2 & Py3 compability
import sys
if sys.version_info[0] >= 3:
is_py3 = True
string_type = (str, )
string_type_parse = string_type + (bytes, )
int_type = int
else:
is_py3 = False
string_type = (str, unicode)
string_type_parse = string_type
int_type = (int, long)
# handles reloading
if 'IS' in globals():
MODULES = __import__('sys').modules

View File

@ -22,6 +22,7 @@ Provides facilities for covertion from/to base91
__all__ = ['to_decimal', 'from_decimal']
from math import log
from re import findall
from . import string_type, int_type
def to_decimal(text):
@ -29,7 +30,7 @@ def to_decimal(text):
Takes a base91 char string and returns decimal
"""
if not isinstance(text, basestring):
if not isinstance(text, string_type):
raise TypeError("expected str or unicode, %s given" % type(text))
if findall(r"[\x00-\x20\x7c-\xff]", text):
@ -50,13 +51,17 @@ def from_decimal(number, padding=1):
"""
text = []
if not isinstance(number, (int, long)) is not int or number < 0:
raise ValueError("non-positive integer error")
elif not isinstance(number, (int, long)) or padding < 1:
raise ValueError("padding must be integer and >0")
if not isinstance(number, int_type):
raise TypeError("Expected number to be int, got %s", type(number))
elif not isinstance(padding, int_type):
raise TypeError("Expected padding to be int, got %s", type(number))
elif number < 0:
raise ValueError("Expected number to be positive integer")
elif padding < 1:
raise ValueError("Expected padding to be >0")
elif number > 0:
for divisor in [91**e for e in reversed(range(int(log(number) / log(91)) + 1))]:
quotient = number / divisor
quotient = number // divisor
number = number % divisor
text.append(chr(33 + quotient))

View File

@ -35,6 +35,7 @@ class GenericError(Exception):
"""
def __init__(self, message):
super(GenericError, self).__init__(message)
self.message = message
class UnknownFormat(GenericError):

View File

@ -35,7 +35,7 @@ except ImportError:
return {'confidence': 0.0, 'encoding': 'windows-1252'}
from .exceptions import (UnknownFormat, ParseError)
from . import base91
from . import base91, string_type_parse, is_py3
__all__ = ['parse']
@ -78,12 +78,18 @@ def parse(packet):
* status message
"""
if not isinstance(packet, string_type_parse):
raise TypeError("Epected packet to be str/unicode/bytes, got %s", type(packet))
# attempt to detect encoding
if isinstance(packet, str):
if isinstance(packet, bytes if is_py3 else str):
try:
packet = packet.decode('utf-8')
except UnicodeDecodeError:
res = chardet.detect(packet)
if is_py3:
res = chardet.detect(packet.split(b':', 1)[-1])
else:
res = chardet.detect(packet.split(':', 1)[-1])
if res['confidence'] > 0.7:
packet = packet.decode(res['encoding'])

View File

@ -1,4 +1,3 @@
pylint
nose
coverage
mox
mox3

View File

@ -1,11 +1,11 @@
import unittest
import mox
import aprslib
import logging
import socket
import sys
import os
import aprslib
from mox3 import mox
class TC_IS(unittest.TestCase):
def setUp(self):
@ -42,7 +42,7 @@ class TC_IS(unittest.TestCase):
def test_socket_readlines(self):
fdr, fdw = os.pipe()
f = os.fdopen(fdw,'w')
f = os.fdopen(fdw, 'w')
f.write("something")
f.close()
@ -57,15 +57,15 @@ class TC_IS(unittest.TestCase):
# part 3 - nothing to read
self.ais.sock.setblocking(0)
self.ais.sock.fileno().AndReturn(fdr)
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(socket.error(""
"Resource temporarily unavailable"))
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
socket.error("Resource temporarily unavailable"))
# part 4 - yield 3 lines (blocking False)
self.ais.sock.setblocking(0)
self.ais.sock.fileno().AndReturn(fdr)
self.ais.sock.recv(mox.IgnoreArg()).AndReturn("a\r\n"*3)
self.ais.sock.fileno().AndReturn(fdr)
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(socket.error(""
"Resource temporarily unavailable"))
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
socket.error("Resource temporarily unavailable"))
# part 5 - yield 3 lines 2 times (blocking True)
self.ais.sock.setblocking(0)
self.ais.sock.fileno().AndReturn(fdr)
@ -76,15 +76,17 @@ class TC_IS(unittest.TestCase):
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(StopIteration)
mox.Replay(self.ais.sock)
next_method = '__next__' if sys.version_info[0] >= 3 else 'next'
# part 1
with self.assertRaises(aprslib.exceptions.ConnectionDrop):
self.ais._socket_readlines().next()
getattr(self.ais._socket_readlines(), next_method)()
# part 2
with self.assertRaises(aprslib.exceptions.ConnectionDrop):
self.ais._socket_readlines().next()
getattr(self.ais._socket_readlines(), next_method)()
# part 3
with self.assertRaises(StopIteration):
self.ais._socket_readlines().next()
getattr(self.ais._socket_readlines(), next_method)()
# part 4
for line in self.ais._socket_readlines():
self.assertEqual(line, 'a')
@ -260,47 +262,55 @@ class TC_IS(unittest.TestCase):
self.m.VerifyAll()
def test_sendall(self):
def test_sendall_type_exception(self):
for testType in [5, 0.5, dict, list]:
with self.assertRaises(TypeError):
self.ais.sendall(testType)
def test_sendall_not_connected(self):
self.ais._connected = False
with self.assertRaises(aprslib.ConnectionError):
self.ais.sendall("test")
def test_sendall_socketerror(self):
self.ais.sock = mox.MockAnything()
self.m.StubOutWithMock(self.ais, "close")
# part 1 not connected
#
# part 2 socket.error
# setup
self.ais.sock.setblocking(mox.IgnoreArg())
self.ais.sock.settimeout(mox.IgnoreArg())
self.ais.sock.sendall(mox.IgnoreArg()).AndRaise(socket.error)
self.ais.close()
# part 3 empty input
#
mox.Replay(self.ais.sock)
self.m.ReplayAll()
# part 1
self.ais._connected = False
with self.assertRaises(aprslib.ConnectionError):
self.ais.sendall("test")
# part 2
# test
self.ais._connected = True
with self.assertRaises(aprslib.ConnectionError):
self.ais.sendall("test")
# part 3
self.ais._connected = True
self.ais.sendall("")
# verify so far
# verify
mox.Verify(self.ais.sock)
self.m.VerifyAll()
def test_sendall_empty_input(self):
self.ais._connected = True
self.ais.sendall("")
def test_sendall_passing_to_socket(self):
self.ais.sock = mox.MockAnything()
self.m.StubOutWithMock(self.ais, "close")
# rest
_unicode = str if sys.version_info[0] >= 3 else unicode
self.ais._connected = True
for line in [
"test", # no \r\n
"test\r\n", # with \r\n
u"test", # unicode
5, # number or anything with __str__
"test",
"test\r\n",
_unicode("test"),
_unicode("test\r\n"),
]:
# setup
self.ais.sock = mox.MockAnything()
@ -380,7 +390,7 @@ class TC_IS_consumer(unittest.TestCase):
self.m.VerifyAll()
def test_consumer_exptions(self):
def test_consumer_exceptions(self):
self.ais._socket_readlines(False).AndRaise(SystemExit)
self.ais._socket_readlines(False).AndRaise(KeyboardInterrupt)
self.ais._socket_readlines(False).AndRaise(Exception("random"))

View File

@ -1,4 +1,5 @@
import unittest
import sys
from aprslib import base91
@ -16,23 +17,29 @@ class a_FromDecimal(unittest.TestCase):
# 91**2 = "!!
# 91**3 = "!!!
# etc
testData += [[91**i, '"' + '!'*i] for i in xrange(20)]
testData += [[91**i, '"' + '!'*i] for i in range(20)]
for n, expected in testData:
self.assertEqual(expected, base91.from_decimal(n))
def test_invalid_input_type(self):
testData = ['0', '1', 1.0, None, [0]]
def test_invalid_number_type(self):
testData = ['0', '1', 1.0, None, [0], dict]
for n in testData:
self.assertRaises(ValueError, base91.from_decimal, n)
self.assertRaises(TypeError, base91.from_decimal, n)
def test_invalid_input_range(self):
def test_invalid_number_range(self):
testData = [-10000, -5, -1]
for n in testData:
self.assertRaises(ValueError, base91.from_decimal, n)
def test_invalid_padding_type(self):
testData = ['0', '1', 1.0, None, [0], dict]
for n in testData:
self.assertRaises(TypeError, base91.from_decimal, 0, padding=n)
def test_valid_padding(self):
testData = [1, 2, 5, 10, 100]
@ -63,8 +70,10 @@ class b_ToDecimal(unittest.TestCase):
# 91**2 = "!!
# 91**3 = "!!!
# etc
testData += [[91**i, '"' + '!'*i] for i in xrange(20)]
testData += [[91**i, u'"' + u'!'*i] for i in xrange(20)]
testData += [[91**i, '"' + '!'*i] for i in range(20)]
if sys.version_info[0] < 3:
testData += [[91**i, unicode('"') + unicode('!')*i] for i in range(20)]
for expected, n in testData:
self.assertEqual(expected, base91.to_decimal(n))
@ -77,7 +86,7 @@ class b_ToDecimal(unittest.TestCase):
def test_invalid_input(self):
# test for every value outside of the accepted range
testData = [chr(i) for i in range(ord('!'))+range(ord('{')+1, 256)]
testData = [chr(i) for i in list(range(ord('!')))+list(range(ord('{')+1, 256))]
# same as above, except each value is prefix with a valid char
testData += ['!'+c for c in testData]
@ -88,14 +97,14 @@ class b_ToDecimal(unittest.TestCase):
class c_Both(unittest.TestCase):
def test_from_decimal_to_decimal(self):
for number in xrange(91**2 + 5):
for number in range(91**2 + 5):
text = base91.from_decimal(number)
result = base91.to_decimal(text)
self.assertEqual(result, number)
def test_stability(self):
for number in xrange(50):
for number in range(50):
largeN = 91 ** number
text = base91.from_decimal(largeN)
result = base91.to_decimal(text)

View File

@ -1,7 +1,8 @@
# encoding: utf-8
import sys
import unittest
import mox
from mox3 import mox
from aprslib import parse
from aprslib import parsing
@ -10,29 +11,38 @@ from aprslib.exceptions import ParseError, UnknownFormat
class ParseTestCase(unittest.TestCase):
def test_unicode(self):
def _u(text, c='utf8'):
if sys.version_info[0] >= 3:
return text
else:
return text.decode(c)
_unicode = str if sys.version_info[0] >= 3 else unicode
# 7bit ascii
result = parse("A>B:>status")
self.assertIsInstance(result['status'], unicode)
self.assertEqual(result['status'], u"status")
self.assertIsInstance(result['status'],_unicode)
self.assertEqual(result['status'], _u("status"))
# string with degree sign
result = parse("A>B:>status\xb0")
self.assertIsInstance(result['status'], unicode)
self.assertEqual(result['status'], u"status\xb0")
self.assertIsInstance(result['status'],_unicode)
self.assertEqual(result['status'], _u("status\xb0",'latin-1'))
# str with utf8
result = parse("A>B:>статус")
self.assertIsInstance(result['status'], unicode)
self.assertEqual(result['status'], u"статус")
self.assertIsInstance(result['status'],_unicode)
self.assertEqual(result['status'], _u("статус"))
# unicode input
result = parse(u"A>B:>статус")
result = parse(_u("A>B:>статус"))
self.assertIsInstance(result['status'], unicode)
self.assertEqual(result['status'], u"статус")
self.assertIsInstance(result['status'],_unicode)
self.assertEqual(result['status'], _u("статус"))
def test_empty_packet(self):
self.assertRaises(ParseError, parse, "")
@ -78,12 +88,18 @@ class ParseBranchesTestCase(unittest.TestCase):
parsing._parse_timestamp(mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(("test", {}))
self.m.ReplayAll()
def _u(text, c='utf8'):
if sys.version_info[0] >= 3:
return text
else:
return text.decode(c)
expected = {
'status': 'test',
'raw': u'A>B:>test',
'raw': _u('A>B:>test'),
'via': '',
'from': u'A',
'to': u'B',
'from': _u('A'),
'to': _u('B'),
'path': [],
'format': 'status'
}

View File

@ -42,8 +42,8 @@ class ParseCommentTelemetry(unittest.TestCase):
return "".join(text)
def test_random_valid_telemetry(self):
for i in xrange(100):
vals = [randint(0, self.b91max) for x in xrange(randint(1, 5))]
for i in range(100):
vals = [randint(0, self.b91max) for x in range(randint(1, 5))]
bits = None

View File

@ -10,10 +10,10 @@ from aprslib.exceptions import ParseError
class ValidateCallsign(unittest.TestCase):
def test_valid_input(self):
chars = string.letters.upper() + string.digits
chars = string.ascii_letters.upper() + string.digits
def random_valid_callsigns():
for x in xrange(0, 500):
for x in range(0, 500):
call = "".join(sample(chars, randrange(1, 6)))
if bool(randint(0, 1)):
@ -118,7 +118,7 @@ class ParseHeader(unittest.TestCase):
for head in testData:
try:
_parse_header(head)
except ParseError, msg:
except ParseError as msg:
self.fail("{0}('{1}') PraseError, {2}"
.format(_parse_header.__name__, head, msg))