"""
Open client connection to the data servers.
"""
import os
import msgpack
import nanomsg
import wni.util
[docs]class DataClientError(Exception):
pass
[docs]class DataClient:
"""Sends requests to the system, asking for moment data.
Call the method ``latest`` to receive only the most recent data from the
server. Call the method ``all`` to receive all the data that the client
has not yet received.
"""
def __init__(self,
connect_addr,
recv_timeout=1000,
send_timeout=1000):
self.sock = sock = nanomsg.Socket(nanomsg.REQ)
self.sock.recv_timeout = recv_timeout
self.sock.send_timeout = send_timeout
self.sock.set_int_option(
nanomsg.SOL_SOCKET, nanomsg.RCVBUF, int(64E6)
)
sock.connect(connect_addr)
sock.set_int_option(nanomsg.SOL_SOCKET, nanomsg.RCVMAXSIZE, -1)
# a self-assigned ID is necessary so the rep sock can differentiate
# between clients. With a 32-bit number there is a 1/2**32 chance of
# collisions
self.id = os.urandom(32)
self._closed = False
self.sync()
def _make_request(self, request):
"""
Make a request, but do not decode the response
"""
_limit = 2**31 - 1
b = msgpack.packb(request, use_bin_type=True)
self.sock.send(b)
resp = self.sock.recv()
return msgpack.unpackb(
resp, raw=True,
max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit,
max_map_len=_limit, max_ext_len=_limit,
)
[docs] def decode(self, response):
"""
Decode response and return the Python objects represented.
"""
_limit = 2**31 - 1
unpacker = msgpack.Unpacker(
raw=False,
ext_hook=wni.util.ext_hook_unpack,
max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit,
max_map_len=_limit, max_ext_len=_limit,
)
unpacker.feed(response)
parts = [obj for obj in unpacker]
return parts
def _make_request_and_decode(self, request: dict):
"""
Sends ``request`` and returns decoded data from the data server.
"""
resp = self._make_request(request)
type_ = resp[b'type']
if type_ != b'data':
msg = 'Bad response type: {} (value "{}")'
value = resp.get(b'value', b'')
msg = msg.format(type_.decode(), value.decode())
raise DataClientError(msg)
return self.decode(resp[b'value'])
[docs] def latest(self):
"""
Returns the most recent radial data. If no data is available, returns
None
"""
request = {
b'id': self.id,
b'type': b'latest'
}
# could be either a single-element list (if new data was available) or
# an empty list, if no new data was available.
data = self._make_request_and_decode(request)
if data:
return data[0]
else:
return None
[docs] def all(self):
"""
Request all data on the server that the client has not received yet.
Returns a list of radial data.
"""
request = {
b'id': self.id,
b'type': b'all'
}
data = self._make_request_and_decode(request)
return data
[docs] def next_n(self, n):
"""
Return the next ``n`` not yet received radials.
"""
request = {
b'id': self.id,
b'type': b'next_n',
b'n': n
}
data = self._make_request_and_decode(request)
return data
[docs] def all_bytes(self):
"""
Request all data on the server that the client has not received yet.
Does not decode the response; the response is the bytes exactly
returned by the data server.
Returns a tuple of (num_radials, raw_bytes).
"""
request = {
b'id': self.id,
b'type': b'all'
}
data = self._make_request(request)
length = data[b'length']
raw_bytes = data[b'value']
return length, raw_bytes
[docs] def sync(self):
"""
Tell the data server that we don't want any data older than the current
point in time.
"""
request = {
b'id': self.id,
b'type': b'sync'
}
data = self._make_request(request)
if data[b'type'] != b'synced':
raise DataClientError('Unexpected response from server')
return None
def _close(self):
request = {
b'id': self.id,
b'type': b'close'
}
data = self._make_request(request)
if data[b'type'] == b'closed':
return None
else:
raise DataClientError('unexpected response {!r:}'.format(data))
[docs] def close(self):
"""
Closes the underlying connection to the data server
"""
if not self._closed:
self._close()
self.sock.close()
self._closed = True
@property
def recv_timeout(self):
"""recv timeout in ms"""
return self.sock.recv_timeout
@recv_timeout.setter
def recv_timeout(self, val):
self.sock.recv_timeout = val
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
[docs]class DataClients:
"""
Like DataClient, but makes requests of both channels each time.
"""
def __init__(self, ch1_addr, ch2_addr, recv_timeout=1000,
send_timeout=1000):
self._ch1_addr = ch1_addr
self._ch2_addr = ch2_addr
self._data_client1 = DataClient(ch1_addr, recv_timeout, send_timeout)
self._data_client2 = DataClient(ch2_addr, recv_timeout, send_timeout)
self._closed = False
[docs] def close(self):
if not self._closed:
self._data_client1.close()
self._data_client2.close()
self._closed = True
[docs] def all_bytes(self):
"""
Request all data on the server that the client has not received yet.
Does not decode the response; the response is the bytes exactly
returned by the data server.
"""
return self._data_client1.all_bytes(), self._data_client2.all_bytes()
[docs] def sync(self):
return self._data_client1.sync(), self._data_client2.all_bytes()
[docs] def all(self):
return self._data_client1.all(), self._data_client2.all()
[docs] def latest(self):
return self._data_client1.latest(), self._data_client2.latest()
decode = DataClient.decode