refined code using bytes type
This commit is contained in:
parent
5329bf782d
commit
a3a72cb4af
|
|
@ -67,10 +67,7 @@ class IS(object):
|
|||
self.filter = "" # default filter, everything
|
||||
|
||||
self._connected = False
|
||||
if is_py3:
|
||||
self.buf = b''
|
||||
else:
|
||||
self.buf = ''
|
||||
self.buf = b''
|
||||
|
||||
def _sendall(self, text):
|
||||
if is_py3:
|
||||
|
|
@ -130,10 +127,7 @@ class IS(object):
|
|||
"""
|
||||
|
||||
self._connected = False
|
||||
if is_py3:
|
||||
self.buf = b''
|
||||
else:
|
||||
self.buf = ''
|
||||
self.buf = b''
|
||||
|
||||
if self.sock is not None:
|
||||
self.sock.close()
|
||||
|
|
@ -326,12 +320,8 @@ class IS(object):
|
|||
raise ConnectionDrop("connection dropped")
|
||||
|
||||
while True:
|
||||
if is_py3:
|
||||
short_buf = b''
|
||||
newline = b'\r\n'
|
||||
else:
|
||||
short_buf = ''
|
||||
newline = '\r\n'
|
||||
short_buf = b''
|
||||
newline = b'\r\n'
|
||||
|
||||
select.select([self.sock], [], [], None if blocking else 0)
|
||||
|
||||
|
|
|
|||
|
|
@ -82,14 +82,11 @@ def parse(packet):
|
|||
raise TypeError("Epected packet to be str/unicode/bytes, got %s", type(packet))
|
||||
|
||||
# attempt to detect encoding
|
||||
if isinstance(packet, bytes if is_py3 else str):
|
||||
if isinstance(packet, bytes):
|
||||
try:
|
||||
packet = packet.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
if is_py3:
|
||||
res = chardet.detect(packet.split(b':', 1)[-1])
|
||||
else:
|
||||
res = chardet.detect(packet.split(':', 1)[-1])
|
||||
res = chardet.detect(packet.split(b':', 1)[-1])
|
||||
|
||||
if res['confidence'] > 0.7:
|
||||
packet = packet.decode(res['encoding'])
|
||||
|
|
|
|||
|
|
@ -8,12 +8,6 @@ from mox3 import mox
|
|||
|
||||
|
||||
# byte shim for testing in both py2 and py3
|
||||
def _b(text):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text.encode('latin-1')
|
||||
else:
|
||||
return text
|
||||
|
||||
|
||||
class TC_IS(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
|
@ -25,7 +19,7 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
def test_initilization(self):
|
||||
self.assertFalse(self.ais._connected)
|
||||
self.assertEqual(self.ais.buf, _b(''))
|
||||
self.assertEqual(self.ais.buf, b'')
|
||||
self.assertIsNone(self.ais.sock)
|
||||
self.assertEqual(self.ais.callsign, "LZ1DEV-99")
|
||||
self.assertEqual(self.ais.passwd, "testpwd")
|
||||
|
|
@ -41,7 +35,7 @@ class TC_IS(unittest.TestCase):
|
|||
|
||||
mox.Verify(self.ais.sock)
|
||||
self.assertFalse(self.ais._connected)
|
||||
self.assertEqual(self.ais.buf, _b(''))
|
||||
self.assertEqual(self.ais.buf, b'')
|
||||
|
||||
def test_open_socket(self):
|
||||
with self.assertRaises(socket.error):
|
||||
|
|
@ -60,7 +54,7 @@ class TC_IS(unittest.TestCase):
|
|||
# part 2 - conn drop trying to recv
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b(''))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b'')
|
||||
# part 3 - nothing to read
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
|
|
@ -69,16 +63,16 @@ class TC_IS(unittest.TestCase):
|
|||
# part 4 - yield 3 lines (blocking False)
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("a\r\n"*3))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"a\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(
|
||||
socket.error("Resource temporarily unavailable"))
|
||||
# part 5 - yield 3 lines 2 times (blocking True)
|
||||
self.ais.sock.setblocking(0)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("b\r\n"*3))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"b\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("b\r\n"*3))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"b\r\n"*3)
|
||||
self.ais.sock.fileno().AndReturn(fdr)
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndRaise(StopIteration)
|
||||
mox.Replay(self.ais.sock)
|
||||
|
|
@ -96,10 +90,10 @@ class TC_IS(unittest.TestCase):
|
|||
getattr(self.ais._socket_readlines(), next_method)()
|
||||
# part 4
|
||||
for line in self.ais._socket_readlines():
|
||||
self.assertEqual(line, _b('a'))
|
||||
self.assertEqual(line, b'a')
|
||||
# part 5
|
||||
for line in self.ais._socket_readlines(blocking=True):
|
||||
self.assertEqual(line, _b('b'))
|
||||
self.assertEqual(line, b'b')
|
||||
|
||||
mox.Verify(self.ais.sock)
|
||||
|
||||
|
|
@ -110,31 +104,31 @@ class TC_IS(unittest.TestCase):
|
|||
# part 1 - raises
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("invalidreply"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"invalidreply")
|
||||
self.ais.close()
|
||||
# part 2 - raises (empty callsign)
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("# logresp verified, xx"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp verified, xx")
|
||||
self.ais.close()
|
||||
# part 3 - raises (callsign doesn't match
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("# logresp NOMATCH verified, xx"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp NOMATCH verified, xx")
|
||||
self.ais.close()
|
||||
# part 4 - raises (unverified, but pass is not -1)
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("# logresp CALL unverified, xx"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp CALL unverified, xx")
|
||||
self.ais.close()
|
||||
# part 5 - normal, receive only
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("# logresp CALL unverified, xx"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp CALL unverified, xx")
|
||||
# part 6 - normal, correct pass
|
||||
self.ais._sendall(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("# logresp CALL verified, xx"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# logresp CALL verified, xx")
|
||||
mox.Replay(self.ais.sock)
|
||||
self.m.ReplayAll()
|
||||
|
||||
|
|
@ -179,7 +173,7 @@ class TC_IS(unittest.TestCase):
|
|||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("junk"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"junk")
|
||||
self.ais.close()
|
||||
# part 3 - everything going well
|
||||
self.ais._open_socket()
|
||||
|
|
@ -191,7 +185,7 @@ class TC_IS(unittest.TestCase):
|
|||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.setsockopt(mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(_b("# server banner"))
|
||||
self.ais.sock.recv(mox.IgnoreArg()).AndReturn(b"# server banner")
|
||||
mox.Replay(self.ais.sock)
|
||||
self.m.ReplayAll()
|
||||
|
||||
|
|
@ -211,11 +205,11 @@ class TC_IS(unittest.TestCase):
|
|||
self.m.VerifyAll()
|
||||
|
||||
def test_filter(self):
|
||||
testFilter = "x/CALLSIGN"
|
||||
testFilter = 'x/CALLSIGN'
|
||||
|
||||
self.ais._connected = True
|
||||
self.ais.sock = mox.MockAnything()
|
||||
self.ais.sock.sendall(_b("#filter %s\r\n" % testFilter))
|
||||
self.ais.sock.sendall(b'#filter ' + testFilter.encode('ascii') + b'\r\n')
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
self.ais.set_filter(testFilter)
|
||||
|
|
@ -324,7 +318,7 @@ class TC_IS(unittest.TestCase):
|
|||
self.ais.sock = mox.MockAnything()
|
||||
self.ais.sock.setblocking(mox.IgnoreArg())
|
||||
self.ais.sock.settimeout(mox.IgnoreArg())
|
||||
self.ais._sendall(_b("%s\r\n" % str(line).rstrip('\r\n'))).AndReturn(None)
|
||||
self.ais._sendall(b"%c" + line.rstrip('\r\n').encode('ascii') + b'\r\n').AndReturn(None)
|
||||
mox.Replay(self.ais.sock)
|
||||
|
||||
self.ais.sendall(line)
|
||||
|
|
|
|||
|
|
@ -9,39 +9,39 @@ from aprslib import parsing
|
|||
from aprslib.exceptions import ParseError, UnknownFormat
|
||||
|
||||
|
||||
def _u(text, c='utf8'):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text
|
||||
else:
|
||||
return text.decode(c)
|
||||
|
||||
|
||||
class ParseTestCase(unittest.TestCase):
|
||||
def test_unicode(self):
|
||||
def _u(text, c='utf8'):
|
||||
if sys.version_info[0] >= 3:
|
||||
return text
|
||||
else:
|
||||
return text.decode(c)
|
||||
|
||||
|
||||
_unicode = str if sys.version_info[0] >= 3 else unicode
|
||||
|
||||
# 7bit ascii
|
||||
result = parse("A>B:>status")
|
||||
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("status"))
|
||||
|
||||
# string with degree sign
|
||||
result = parse("A>B:>status\xb0")
|
||||
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertEqual(result['status'], _u("status\xb0",'latin-1'))
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("status\xb0", 'latin-1'))
|
||||
|
||||
# str with utf8
|
||||
result = parse("A>B:>статус")
|
||||
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("статус"))
|
||||
|
||||
# unicode input
|
||||
result = parse(_u("A>B:>статус"))
|
||||
|
||||
self.assertIsInstance(result['status'],_unicode)
|
||||
self.assertIsInstance(result['status'], _unicode)
|
||||
self.assertEqual(result['status'], _u("статус"))
|
||||
|
||||
def test_empty_packet(self):
|
||||
|
|
|
|||
Loading…
Reference in New Issue