"""Polling for nanomsg sockets"""
import logging
import nanomsg
logger = logging.getLogger(__name__)
__all__ = ['RxPoll']
[docs]class RxPoll(object):
"""Wrapse select.poll() to provide a convenient interface for nanomsg
sockets.
Note:
Any nanomsg sockets registered with this poll object will have their
recv_timeout set to 0, so that polling will be fast (the poll function
call already blocks, so blocking on recv doesn't make any sense.)
"""
def __init__(self, sockets):
self.watched_sockets = []
for socket in sockets:
self.register(socket)
[docs] def register(self, socket):
"""Register a nanomsg socket for polling."""
self.watched_sockets.append(socket)
socket.recv_timeout = 0
[docs] def unregister(self, socket):
"""Remove a nanomsg socket from the poll."""
self.watched_sockets.pop(socket)
[docs] def poll(self, timeout=-1):
"""Returns a list of (socket, recv_data) tuples for all sockets that
had receive data ready."""
# Hack: on Windows, a negative number is the same as 0s timeout,
# on Unix it is the same as infinite timeout. So we'll need
given = timeout
if timeout < 0:
timeout = 100000
# Wrap the poll() call in an infinite loop, becuase poll() will show
# that the socket has data to be receieved for subscribers even if they
# are not subscribed to the topic, since the OS has no idea that the
# data is just going to be ignored.
while True:
try:
rx, _ = nanomsg.poll(self.watched_sockets, [], timeout=timeout)
except nanomsg.NanoMsgAPIError:
if given < 0:
continue
else:
raise
if not rx:
return []
# loop through the sockets that have data; if the are not
# subscribed to whatever message was sent then recv() will timeout
# and we'll just re-poll until important data is actually ready.
ret = []
for sock in rx:
try:
data = sock.recv()
ret.append((sock, data))
except nanomsg.NanoMsgAPIError:
continue
if ret:
return ret
if __name__ == '__main__':
# This could be invoked as python -i -m wni.nano.poll so that interaction
# can happen. Otherwise it just runs some tests.
import sys
from wni.nano import subscribe
if len(sys.argv) > 1:
f = sys.argv[1]
else:
f = 'ipc:///tmp/test.ipc'
pub = nanomsg.Socket(nanomsg.PUB)
pub.bind(f)
sub1 = nanomsg.Socket(nanomsg.SUB)
sub1.connect(f)
subscribe(sub1, 'shared-topic')
subscribe(sub1, 'topic1')
sub2 = nanomsg.Socket(nanomsg.SUB)
sub2.connect(f)
subscribe(sub2, 'shared-topic')
subscribe(sub2, 'topic2')
# run some tests
p = RxPoll([sub1, sub2])
pub.send('topic1')
res = p.poll(1)
assert res == [(sub1, 'topic1')]
pub.send('topic2')
res = p.poll(1)
assert res == [(sub2, 'topic2')]
pub.send('no one is subscribed')
# set a timeout on this one
res = p.poll(1)
assert res == []
pub.send('shared-topic')
res = p.poll(1)
expected = [(sub1, 'shared-topic'), (sub2, 'shared-topic')]
assert res == expected or res == list(reversed(expected))