Source code for wni.data_client

"""
Open client connection to the data servers.
"""

import os

import msgpack
import nanomsg

import wni.util


[docs]class DataClientError(Exception): pass
[docs]class DataClient: """Sends requests to the system, asking for moment data. Call the method ``latest`` to receive only the most recent data from the server. Call the method ``all`` to receive all the data that the client has not yet received. """ def __init__(self, connect_addr, recv_timeout=1000, send_timeout=1000): self.sock = sock = nanomsg.Socket(nanomsg.REQ) self.sock.recv_timeout = recv_timeout self.sock.send_timeout = send_timeout self.sock.set_int_option( nanomsg.SOL_SOCKET, nanomsg.RCVBUF, int(64E6) ) sock.connect(connect_addr) sock.set_int_option(nanomsg.SOL_SOCKET, nanomsg.RCVMAXSIZE, -1) # a self-assigned ID is necessary so the rep sock can differentiate # between clients. With a 32-bit number there is a 1/2**32 chance of # collisions self.id = os.urandom(32) self._closed = False self.sync() def _make_request(self, request): """ Make a request, but do not decode the response """ _limit = 2**31 - 1 b = msgpack.packb(request, use_bin_type=True) self.sock.send(b) resp = self.sock.recv() return msgpack.unpackb( resp, raw=True, max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit, max_map_len=_limit, max_ext_len=_limit, )
[docs] def decode(self, response): """ Decode response and return the Python objects represented. """ _limit = 2**31 - 1 unpacker = msgpack.Unpacker( raw=False, ext_hook=wni.util.ext_hook_unpack, max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit, max_map_len=_limit, max_ext_len=_limit, ) unpacker.feed(response) parts = [obj for obj in unpacker] return parts
def _make_request_and_decode(self, request: dict): """ Sends ``request`` and returns decoded data from the data server. """ resp = self._make_request(request) type_ = resp[b'type'] if type_ != b'data': msg = 'Bad response type: {} (value "{}")' value = resp.get(b'value', b'') msg = msg.format(type_.decode(), value.decode()) raise DataClientError(msg) return self.decode(resp[b'value'])
[docs] def latest(self): """ Returns the most recent radial data. If no data is available, returns None """ request = { b'id': self.id, b'type': b'latest' } # could be either a single-element list (if new data was available) or # an empty list, if no new data was available. data = self._make_request_and_decode(request) if data: return data[0] else: return None
[docs] def all(self): """ Request all data on the server that the client has not received yet. Returns a list of radial data. """ request = { b'id': self.id, b'type': b'all' } data = self._make_request_and_decode(request) return data
[docs] def next_n(self, n): """ Return the next ``n`` not yet received radials. """ request = { b'id': self.id, b'type': b'next_n', b'n': n } data = self._make_request_and_decode(request) return data
[docs] def all_bytes(self): """ Request all data on the server that the client has not received yet. Does not decode the response; the response is the bytes exactly returned by the data server. Returns a tuple of (num_radials, raw_bytes). """ request = { b'id': self.id, b'type': b'all' } data = self._make_request(request) length = data[b'length'] raw_bytes = data[b'value'] return length, raw_bytes
[docs] def sync(self): """ Tell the data server that we don't want any data older than the current point in time. """ request = { b'id': self.id, b'type': b'sync' } data = self._make_request(request) if data[b'type'] != b'synced': raise DataClientError('Unexpected response from server') return None
def _close(self): request = { b'id': self.id, b'type': b'close' } data = self._make_request(request) if data[b'type'] == b'closed': return None else: raise DataClientError('unexpected response {!r:}'.format(data))
[docs] def close(self): """ Closes the underlying connection to the data server """ if not self._closed: self._close() self.sock.close() self._closed = True
@property def recv_timeout(self): """recv timeout in ms""" return self.sock.recv_timeout @recv_timeout.setter def recv_timeout(self, val): self.sock.recv_timeout = val def __enter__(self): return self def __exit__(self, *exc_info): self.close()
[docs]class DataClients: """ Like DataClient, but makes requests of both channels each time. """ def __init__(self, ch1_addr, ch2_addr, recv_timeout=1000, send_timeout=1000): self._ch1_addr = ch1_addr self._ch2_addr = ch2_addr self._data_client1 = DataClient(ch1_addr, recv_timeout, send_timeout) self._data_client2 = DataClient(ch2_addr, recv_timeout, send_timeout) self._closed = False
[docs] def close(self): if not self._closed: self._data_client1.close() self._data_client2.close() self._closed = True
[docs] def all_bytes(self): """ Request all data on the server that the client has not received yet. Does not decode the response; the response is the bytes exactly returned by the data server. """ return self._data_client1.all_bytes(), self._data_client2.all_bytes()
[docs] def sync(self): return self._data_client1.sync(), self._data_client2.all_bytes()
[docs] def all(self): return self._data_client1.all(), self._data_client2.all()
[docs] def latest(self): return self._data_client1.latest(), self._data_client2.latest()
decode = DataClient.decode