Source code for wni.nano.poll

"""Polling for nanomsg sockets"""

import logging

import nanomsg

logger = logging.getLogger(__name__)

__all__ = ['RxPoll']

[docs]class RxPoll(object): """Wrapse select.poll() to provide a convenient interface for nanomsg sockets. Note: Any nanomsg sockets registered with this poll object will have their recv_timeout set to 0, so that polling will be fast (the poll function call already blocks, so blocking on recv doesn't make any sense.) """ def __init__(self, sockets): self.watched_sockets = [] for socket in sockets: self.register(socket)
[docs] def register(self, socket): """Register a nanomsg socket for polling.""" self.watched_sockets.append(socket) socket.recv_timeout = 0
[docs] def unregister(self, socket): """Remove a nanomsg socket from the poll.""" self.watched_sockets.pop(socket)
[docs] def poll(self, timeout=-1): """Returns a list of (socket, recv_data) tuples for all sockets that had receive data ready.""" # Hack: on Windows, a negative number is the same as 0s timeout, # on Unix it is the same as infinite timeout. So we'll need given = timeout if timeout < 0: timeout = 100000 # Wrap the poll() call in an infinite loop, becuase poll() will show # that the socket has data to be receieved for subscribers even if they # are not subscribed to the topic, since the OS has no idea that the # data is just going to be ignored. while True: try: rx, _ = nanomsg.poll(self.watched_sockets, [], timeout=timeout) except nanomsg.NanoMsgAPIError: if given < 0: continue else: raise if not rx: return [] # loop through the sockets that have data; if the are not # subscribed to whatever message was sent then recv() will timeout # and we'll just re-poll until important data is actually ready. ret = [] for sock in rx: try: data = sock.recv() ret.append((sock, data)) except nanomsg.NanoMsgAPIError: continue if ret: return ret
if __name__ == '__main__': # This could be invoked as python -i -m wni.nano.poll so that interaction # can happen. Otherwise it just runs some tests. import sys from wni.nano import subscribe if len(sys.argv) > 1: f = sys.argv[1] else: f = 'ipc:///tmp/test.ipc' pub = nanomsg.Socket(nanomsg.PUB) pub.bind(f) sub1 = nanomsg.Socket(nanomsg.SUB) sub1.connect(f) subscribe(sub1, 'shared-topic') subscribe(sub1, 'topic1') sub2 = nanomsg.Socket(nanomsg.SUB) sub2.connect(f) subscribe(sub2, 'shared-topic') subscribe(sub2, 'topic2') # run some tests p = RxPoll([sub1, sub2]) pub.send('topic1') res = p.poll(1) assert res == [(sub1, 'topic1')] pub.send('topic2') res = p.poll(1) assert res == [(sub2, 'topic2')] pub.send('no one is subscribed') # set a timeout on this one res = p.poll(1) assert res == [] pub.send('shared-topic') res = p.poll(1) expected = [(sub1, 'shared-topic'), (sub2, 'shared-topic')] assert res == expected or res == list(reversed(expected))