diff --git a/aprslib/IS.py b/aprslib/IS.py index fc0d16b..8f99d62 100644 --- a/aprslib/IS.py +++ b/aprslib/IS.py @@ -127,6 +127,31 @@ class IS(object): if self.sock is not None: self.sock.close() + def sendall(self, line): + """ + Send a line, or multiple lines sperapted by '\\r\\n' + """ + if not self._connected: + raise ConnectionError("not connected") + + if isinstance(line, unicode): + line = line.encode('utf8') + elif not isinstance(line, str): + line = str(line) + + if line == "": + return + + line = line.rstrip("\r\n") + "\r\n" + + try: + self.sock.setblocking(1) + self.sock.settimeout(5) + self.sock.sendall(line) + except socket.error as exp: + self.close() + raise ConnectionError(str(exp)) + def consumer(self, callback, blocking=True, immortal=False, raw=False): """ When a position sentence is received, it will be passed to the callback function diff --git a/tests/test_IS.py b/tests/test_IS.py index 186d043..b7aa382 100644 --- a/tests/test_IS.py +++ b/tests/test_IS.py @@ -7,7 +7,7 @@ import sys import os -class TestCase_IS(unittest.TestCase): +class TC_IS(unittest.TestCase): def setUp(self): self.ais = aprslib.IS("LZ1DEV-99", "testpwd", "127.0.0.1", "11111") self.m = mox.Mox() @@ -260,8 +260,61 @@ class TestCase_IS(unittest.TestCase): self.m.VerifyAll() + def test_sendall(self): + self.ais.sock = mox.MockAnything() + self.m.StubOutWithMock(self.ais, "close") -class TestCase_IS_consumer(unittest.TestCase): + # part 1 not connected + # + # part 2 socket.error + self.ais.sock.setblocking(mox.IgnoreArg()) + self.ais.sock.settimeout(mox.IgnoreArg()) + self.ais.sock.sendall(mox.IgnoreArg()).AndRaise(socket.error) + self.ais.close() + # part 3 empty input + # + + mox.Replay(self.ais.sock) + self.m.ReplayAll() + + # part 1 + self.ais._connected = False + with self.assertRaises(aprslib.ConnectionError): + self.ais.sendall("test") + # part 2 + self.ais._connected = True + with self.assertRaises(aprslib.ConnectionError): + self.ais.sendall("test") + + # part 3 + self.ais._connected = True + self.ais.sendall("") + + # verify so far + mox.Verify(self.ais.sock) + self.m.VerifyAll() + + # rest + self.ais._connected = True + for line in [ + "test", # no \r\n + "test\r\n", # with \r\n + u"test", # unicode + 5, # number or anything with __str__ + ]: + # setup + self.ais.sock = mox.MockAnything() + self.ais.sock.setblocking(mox.IgnoreArg()) + self.ais.sock.settimeout(mox.IgnoreArg()) + self.ais.sock.sendall("%s\r\n" % str(line).rstrip('\r\n')).AndReturn(None) + mox.Replay(self.ais.sock) + + self.ais.sendall(line) + + mox.Verify(self.ais.sock) + + +class TC_IS_consumer(unittest.TestCase): def setUp(self): self.ais = aprslib.IS("LZ1DEV-99") self.ais._connected = True