"""Utility functions for working with nanomsg."""
import sys
import struct
import cffi
import nanomsg
__all__ = ['subscribe', 'unsubscribe', 'Subscriber', 'get_current_connections']
[docs]def Subscriber(addr, topic, method='connect'):
"""Returns a nanomsg.Socket that is connected to `addr` and
subscribed to `topic`. `method` can be either "connect", "bind", or None
"""
sock = nanomsg.Socket(nanomsg.SUB)
subscribe(sock, topic)
if method == "connect":
sock.connect(addr)
elif method == "bind":
sock.bind(addr)
elif method.lower() == "none" or method is None:
pass
else:
msg = 'Invalid option for method: "{}". '.format(method)
msg += 'method must be either "connect", "bind", or None'
raise ValueError(msg)
return sock
[docs]def subscribe(socket, topic):
socket.set_string_option(nanomsg.SUB, nanomsg.SUB_SUBSCRIBE, topic)
[docs]def unsubscribe(socket, topic):
socket.set_string_option(nanomsg.SUB, nanomsg.SUB_UNSUBSCRIBE, topic)
def _pack_uint(n):
return struct.pack('I', n)
if sys.platform == 'win32':
def get_current_connections(sock):
"""NOT IMPLEMENTED ON WINDOWS"""
raise NotImplementedError("This is not implemented on Windows")
else:
# use cffi to check nanomsg statistics.
#
NN_STAT_CURRENT_CONNECTIONS = 201
ffi = cffi.FFI()
lib = ffi.dlopen('libnanomsg.so')
ffi.cdef('uint64_t nn_get_statistic(int s, int stat);')
[docs] def get_current_connections(socket):
"""
Return the number of accepted connections on the socket.
Not implemented on Windows.
"""
return lib.nn_get_statistic(socket.fd, NN_STAT_CURRENT_CONNECTIONS)