Source code for wni.nano.util

"""Utility functions for working with nanomsg."""
import sys
import struct

import cffi
import nanomsg

__all__ = ['subscribe', 'unsubscribe', 'Subscriber', 'get_current_connections']


[docs]def Subscriber(addr, topic, method='connect'): """Returns a nanomsg.Socket that is connected to `addr` and subscribed to `topic`. `method` can be either "connect", "bind", or None """ sock = nanomsg.Socket(nanomsg.SUB) subscribe(sock, topic) if method == "connect": sock.connect(addr) elif method == "bind": sock.bind(addr) elif method.lower() == "none" or method is None: pass else: msg = 'Invalid option for method: "{}". '.format(method) msg += 'method must be either "connect", "bind", or None' raise ValueError(msg) return sock
[docs]def subscribe(socket, topic): socket.set_string_option(nanomsg.SUB, nanomsg.SUB_SUBSCRIBE, topic)
[docs]def unsubscribe(socket, topic): socket.set_string_option(nanomsg.SUB, nanomsg.SUB_UNSUBSCRIBE, topic)
def _pack_uint(n): return struct.pack('I', n) if sys.platform == 'win32': def get_current_connections(sock): """NOT IMPLEMENTED ON WINDOWS""" raise NotImplementedError("This is not implemented on Windows") else: # use cffi to check nanomsg statistics. # NN_STAT_CURRENT_CONNECTIONS = 201 ffi = cffi.FFI() lib = ffi.dlopen('libnanomsg.so') ffi.cdef('uint64_t nn_get_statistic(int s, int stat);')
[docs] def get_current_connections(socket): """ Return the number of accepted connections on the socket. Not implemented on Windows. """ return lib.nn_get_statistic(socket.fd, NN_STAT_CURRENT_CONNECTIONS)