"""Random utilities"""
from __future__ import absolute_import, print_function
from functools import partial
from contextlib import redirect_stderr, redirect_stdout
import io
import logging.handlers
import queue
import re
import sys
import struct
import time
import threading
import msgpack
import nanomsg
import numpy as np
import wni.config as config
import wni.data_client
from wni.waveforms import Chirp
from wni.nano import get_current_connections
logger = logging.getLogger(__name__)
# the instances of these classes can be
PACKABLE_CLASSES = (
type(None),
int, float,
bytes, str,
np.ndarray,
Chirp,
)
[docs]def nested_dict_merged(d1, d2):
"""
Return a dict representing a merged dict of d1 and d2.
The values in d2 supersede the values in d1, but there is an important case
here: if the value is another dict, this method will be called again for
the two dicts.
"""
# nb: dicts can contain *themselves*, isn't that terrible?
# we need to handle that case.
def recurse(d1, d2, seen: set):
if id(d1) in seen or id(d2) in seen:
raise ValueError('Cannot merge nested eicts')
seen.add(id(d1))
seen.add(id(d2))
out = {}
all_keys = d1.keys() | d2.keys()
sentinel = object()
for k in all_keys:
v1 = d1.get(k, sentinel)
v2 = d2.get(k, sentinel)
if isinstance(v1, dict) and isinstance(v2, dict):
out[k] = recurse(v1, v2, seen)
elif v1 is sentinel:
out[k] = v2
elif v2 is sentinel:
out[k] = v1
else:
# v2 wins when both are present
out[k] = v2
return out
return recurse(d1, d2, set())
[docs]class nullcontext:
# This exists in Python 3.7+, but that's too new for now
def __init__(self, enter_result=None):
self.enter_result = enter_result
def __enter__(self):
return self.enter_result
def __exit__(self, *exc_info):
return
class _ProxyInfo(object):
"""Used by NanoClient/NanoServer for proxying items or names."""
def __init__(self, name=None, item=None):
self.name = name
self.item = item
class _AttrProxy(object):
"""Attribute Proxy, used by NanoClient/NanoServer"""
_MY_ATTRS = 'name', 'client'
def __init__(self, client, name):
self.name = name
self.client = client
def __call__(self, *args, **kwargs):
cmd = {'cmd': 'call',
'name': self.name,
'args': args,
'kwargs': kwargs
}
return self.client._send_and_recv(cmd)
def __getattr__(self, name):
data = {
'cmd': 'getattr',
'name': self.name + '.' + name,
}
return self.client._send_and_recv(data)
def __getitem__(self, name):
data = {
'cmd': 'getattr',
'name': self.name + '[' + repr(name) + ']'
}
return self.client._send_and_recv(data)
def __setattr__(self, name, value):
if name in self._MY_ATTRS:
super().__setattr__(name, value)
else:
data = {
'cmd': 'setattr',
'head': self.name,
'name': name,
'value': value,
}
return self.client._send_and_recv(data)
def __dir__(self):
cmd = {
'cmd': 'call',
'name': self.name + '.__dir__',
'args': [],
'kwargs': {},
}
return self.client._send_and_recv(cmd)
def __repr__(self):
cmd = {'cmd': 'call',
'name': self.name + '.__str__',
'args': [],
'kwargs': {}
}
objrepr = self.client._send_and_recv(cmd)
return "proxy of %s" % objrepr
def __len__(self):
cmd = {
'cmd': 'call',
'name': self.name + '.__len__',
'args': [],
'kwargs': {},
}
return self.client._send_and_recv(cmd)
def _pack_uint(n):
return struct.pack("I", n)
[docs]class EXT_TYPES:
def __new__(cls, *_, **__):
raise TypeError("this class should not be instantiated.")
complex = 0
ndarray = 1
exception = 2
proxy = 3
chirp = 4
slice = 5
[docs]def default_pack(o):
"""
Serialize (certain) Python objects using msgpack.
Currently supported objects:
1. complex numbers
2. Exceptions (limited support)
3. :class:`_ProxyInfo` objects, for use in ``NanoClient``/``NanoServer``
communication
4. numpy.ndarray
"""
if isinstance(o, complex):
packed = struct.pack("ff", o.real, o.imag)
ext = msgpack.ExtType(EXT_TYPES.complex, packed)
elif isinstance(o, Exception):
import traceback
msg = ''.join(traceback.format_exception(type(o), o, o.__traceback__))
d = {
'name': o.__class__.__name__,
'args': o.args,
'traceback': msg,
}
packed = packb(d)
ext = msgpack.ExtType(EXT_TYPES.exception, packed)
elif isinstance(o, _ProxyInfo):
# _ProxyInfo used for NanoServer/NanoClient proxy objects
d = {
'name': o.name,
'item': o.item,
}
packed = packb(d)
ext = msgpack.ExtType(EXT_TYPES.proxy, packed)
elif isinstance(o, np.ndarray):
dt = o.dtype.num
shape = o.shape
packed = packb([dt, shape, o.tostring()])
ext = msgpack.ExtType(EXT_TYPES.ndarray, packed)
elif isinstance(o, Chirp):
kwargs = {
'nsamples': o.nsamples,
'center': o.center,
'bw': o.bw,
'window': o.window,
'clk_freq': o.clk,
'scale': o.scale,
}
packed = msgpack.packb(kwargs)
ext = msgpack.ExtType(EXT_TYPES.chirp, packed)
elif isinstance(o, slice):
slice_info = slice.start, slice.stop, slice.step
packed = msgpack.packb(slice_info)
ext = msgpack.ExtType(EXT_TYPES.slice, packed)
else:
raise TypeError("Unkown type: %r" % o)
return ext
[docs]class ServerException(Exception):
"""Whenever a NanoServer would have raised an exception, the client
receives this exception and raises it."""
def __init__(self, exc_info):
self.exc_name = exc_info['name']
try:
self.class_ = __builtins__[self.exc_name]
except KeyError as e:
self.class_ = Exception
self.traceback = exc_info['traceback']
def __repr__(self):
return "{}({}){}".format(self.exc_name, self.args, self.traceback)
[docs]def ext_hook_unpack(code, data):
"""
Unpack the Python objects that are known to :func:`default_pack`
"""
if code == EXT_TYPES.complex:
r, i = struct.unpack("ff", data)
ret = complex(r, i)
elif code == EXT_TYPES.exception:
d = unpackb(data)
ret = ServerException(d)
elif code == EXT_TYPES.proxy:
# _ProxyInfo used for NanoServer/NanoClient proxy objects
d = unpackb(data)
name = d['name']
ret = _ProxyInfo(name)
elif code == EXT_TYPES.ndarray:
dtype, shape, array_data = msgpack.unpackb(
data,
max_bin_len=_LIMIT, max_str_len=_LIMIT, max_array_len=_LIMIT,
max_map_len=_LIMIT, max_ext_len=_LIMIT,
)
flat_array = np.frombuffer(array_data, dtype=np.sctypeDict[dtype])
ret = flat_array.reshape(shape)
elif code == EXT_TYPES.chirp:
chirp_kwargs = unpackb(data)
ret = Chirp(**chirp_kwargs)
elif code == EXT_TYPES.slice:
slice_info = unpackb(data)
ret = slice(*slice_info)
else:
raise TypeError("Unkown code: %r" % code)
return ret
_LIMIT = 2**31 - 1
packb = partial(msgpack.packb, default=default_pack, use_bin_type=True)
unpackb = partial(
msgpack.unpackb,
ext_hook=ext_hook_unpack, raw=False,
max_bin_len=_LIMIT, max_str_len=_LIMIT, max_array_len=_LIMIT,
max_map_len=_LIMIT, max_ext_len=_LIMIT,
)
[docs]def setter(fset):
return property(None, fset)
[docs]class NanoClient(object):
"""
A client for a NanoServer.
Messages between client and server are encoded and decoded via msgpack.
"""
_MY_ATTRS = 'addr sock class_ _nanoclient_lock'.split()
def __init__(self, connect_addr, class_=None, recv_timeout=10000):
"""if class_ is provided, it will be introspected to avoid
unnecessary queries to the server. This only applies to functions.
"""
self.addr = connect_addr
self.class_ = class_
self.sock = nanomsg.Socket(nanomsg.REQ)
self.sock.connect(connect_addr)
self.sock.recv_timeout = recv_timeout
self._nanoclient_lock = threading.Lock()
def __setattr__(self, name, value):
if name in self._MY_ATTRS:
super().__setattr__(name, value)
else:
data = {
'cmd': 'setattr',
'head': '',
'name': name,
'value': value,
}
return self._send_and_recv(data)
def _send_and_recv(self, data):
"""
Send a dict (data) and return the response.
"""
with self._nanoclient_lock:
self._send_dict(data)
return self._get_resp()
[docs] def getattr(self, name):
"""
Return the attribute from the server
"""
data = {
'cmd': 'getattr',
'name': name,
}
return self._send_and_recv(data)
def _getattrs(self, attrs):
data = {
'cmd': 'getattrs',
'attrs': attrs
}
return self._send_and_recv(data)
def __getattr__(self, name):
# check self.class_ to see if the name given is callable.
# NOTE this is not perfect; just because something is callable doesn't
# mean that the user actually _wants_ to call it, but for me, for now,
# this is the right thing.
#
# this optimization removes the need for a round-trip to the server.
if self.class_ is not None:
# hasattr() can fail sometimes whenever it shouldn't...
has_attr = False
try:
has_attr = hasattr(self.class_, name)
except Exception:
pass
if has_attr:
if callable(getattr(self.class_, name)):
return _AttrProxy(self, name)
return self.getattr(name)
[docs] def kill_the_server(self):
data = {
'cmd': 'meta',
'meta_cmd': 'kill_the_server'
}
return self._send_and_recv(data)
def _send_dict(self, d):
packed = packb(d)
self.sock.send(packed)
[docs] def close_nanoclient(self):
"""
Closes the underlying socket.
This isn't just called close() because it is too likely that we're
proxying an object with a close() method.
"""
self.sock.close()
def _get_resp(self):
"""Waits for a response from the server. If the server returned an
Exception, that is raised.
"""
try:
stdout = ''
stderr = ''
data = self.sock.recv()
except nanomsg.NanoMsgAPIError as exc:
# we would use errno, EXCEPT there is a bug in nanomsg-python that
# totally mangles the number.
str_err = str(exc)
if str_err.startswith("Connection timed out"):
msg = ('NanoClient timed out. Probably, the associated server '
'is either not running, or it is listening on a different '
'IP address. If everything is set up correctly, you may '
'just need to increase the NanoClient recv_timeout.')
raise RuntimeError(msg) from exc
raise
resp = unpackb(data)
retval = resp['return']
stderr = resp['stderr']
stdout = resp['stdout']
if stderr:
print(stderr, end='', file=sys.stderr)
if stdout:
print(stdout, end='')
if isinstance(retval, ServerException):
raise retval.class_(retval.traceback)
elif isinstance(retval, _ProxyInfo):
return _AttrProxy(self, retval.name)
return retval
def __dir__(self):
recvd = self._send_and_recv({
'cmd': 'meta',
'meta_cmd': 'dir',
})
return recvd
def __del__(self):
self.sock.close()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close_nanoclient()
def __getitem__(self, item):
data = {
'cmd': 'getattr',
# this is a pretty bad way to serialize the item...
'name': '[' + repr(item) + ']',
}
return self._send_and_recv(data)
[docs]def is_leaf(obj):
if isinstance(obj, PACKABLE_CLASSES):
return True
if isinstance(obj, (list, tuple)):
return all(is_leaf(v) for v in obj)
if isinstance(obj, dict):
return all(is_leaf(k) and is_leaf(v) for k, v in obj.items())
return False
[docs]class NanoServer(object):
"""
Manage an object remotely via nanomsg.
Messages between client and server are encoded and decoded via msgpack.
"""
# sentinel object so that meta commands can unambiguously be recognized
_META_COMMAND = object()
def __init__(self, obj, bind_addr, capture_stdstreams=True):
"""
Serve ``obj``, listening with a nanomsg.REQ socket at ``bind_addr``.
"""
self.obj = obj
self.addr = bind_addr
self.sock = nanomsg.Socket(nanomsg.REP)
self.sock.bind(bind_addr)
self.sock.send_timeout = 1000
self.stdout = io.StringIO()
self.stderr = io.StringIO()
self.capture_stdstreams = capture_stdstreams
[docs] def serve(self):
self.running = True
stderr = ''
stdout = ''
while self.running:
data = self.sock.recv()
try:
d = unpackb(data)
stderr_pos = self.stderr.tell()
stdout_pos = self.stdout.tell()
if self.capture_stdstreams:
r_err = redirect_stderr
r_out = redirect_stdout
else:
r_err = nullcontext
r_out = nullcontext
try:
with r_err(self.stderr), r_out(self.stdout):
resp = self.handle_request(d)
except Exception as e:
resp = e
if self.capture_stdstreams:
self.stderr.seek(stderr_pos)
self.stdout.seek(stdout_pos)
stderr = self.stderr.read()
stdout = self.stdout.read()
else:
stderr = stdout = ''
except Exception as e:
self._send_obj(e, stdout, stderr)
else:
if resp is self._META_COMMAND:
# we're unpacking it a again which is inefficient, but meta
# commands are uncommon and it keeps the handle_request()
# function fairly clean
cmd = unpackb(data)
self._handle_meta_request(cmd['meta_cmd'])
# killing the server is a special case because it has to
# cleanly exit this loop
if cmd['meta_cmd'] == 'kill_the_server':
self.running = False
else:
self._send_obj(resp, stdout, stderr)
def _handle_meta_request(self, req):
"""Handles meta requests (requests that don't directly pass through
to the client)
"""
if req == 'kill_the_server':
self._send_obj(None)
self.sock.close()
elif req == 'dir':
self._send_obj(dir(self.obj))
else:
resp = ValueError('The command %s is unrecognized', req)
self._send_obj(resp)
def _get_obj_attr(self, name):
"""Returns the correct child attribute of self.obj. `name` is allowed
to have dots separating names, so name == 'child.attr' will return
up self.obj.child.attr.
"""
if not name:
return self.obj
name = name.replace(' ', '')
parts = re.split(r'([[].*?])|\.', name)
obj = self.obj
for part in parts:
if not part or part == '.':
continue
elif part.startswith('['):
# not a good way to deserialize...
val = eval(part)[0]
obj = obj[val]
else:
obj = getattr(obj, part)
return obj
[docs] def handle_request(self, d):
"""Handle a client's request"""
cmd = d['cmd']
if cmd == 'setattr':
name = d['name']
head = d['head']
obj = self._get_obj_attr(head)
setattr(obj, name, d['value'])
return None
elif cmd == 'call':
func = self._get_obj_attr(d['name'])
args = d['args']
kwargs = d['kwargs']
ret = func(*args, **kwargs)
return ret
elif cmd == 'getattr' or cmd == 'getitem':
attr = self._get_obj_attr(d['name'])
# return the thing directly if msgpack can deal with it
return self._fixup_obj(attr, d['name'])
elif cmd == 'getattrs':
attrs = d['attrs']
ret = {}
for attrname in attrs:
attr = self._get_obj_attr(attrname)
ret[attrname] = self._fixup_obj(attr, attrname)
return ret
elif cmd == 'meta':
return self._META_COMMAND
else:
raise ValueError('Unrecognized command %s', cmd)
def _fixup_obj(self, obj, name):
if is_leaf(obj):
return obj
else:
return _ProxyInfo(name)
[docs] def close(self):
self.sock.close()
def _send_obj(self, obj, stdout='', stderr=''):
send_obj(self.sock, obj, stdout, stderr)
def __del__(self):
self.sock.close()
def _peaceful_exit(self):
"""
Stop the serve() loop.
The use case is to call this from a separate thread than the serve()
thread. This was created for testing, so that if a client screws up,
the server can still be killed
"""
c = NanoClient(self.addr)
c.kill_the_server()
self.close()
[docs]def send_obj(sock, data, stdout='', stderr=''):
try:
obj = packb({'return': data, 'stdout': stdout, 'stderr': stderr})
sock.send(obj)
except TypeError as e:
obj = packb({'return': e, 'stdout': stdout, 'stderr': stderr})
sock.send(obj)
# make all the nanoclients and servers here, to avoid circular imports on
# things that require util.py
[docs]class DpramReaderClient(NanoClient):
def __init__(self, addr=config.WAVEFORM_ADDR, **kwargs):
from wni.axi import DpramReader
super().__init__(addr, DpramReader, **kwargs)
[docs]class ScanConfClient(NanoClient):
def __init__(self, addr=config.SCAN_CONF_ADDR, **kwargs):
from wni.scan_config import ScanConf
super().__init__(addr, ScanConf, **kwargs)
[docs]def scan_conf_server(bind_addr=config.SCAN_CONF_ADDR):
from wni.scan_config import ScanConf
scan_conf = ScanConf()
server = NanoServer(scan_conf, bind_addr)
server.serve()
[docs]class ActuatorClient(NanoClient):
def __init__(self, addr=config.ACTUATOR_ADDR, **kwargs):
from wni.actuator import Actuator
super().__init__(addr, Actuator, **kwargs)
[docs]def actuator_server(bind_addr=config.ACTUATOR_ADDR):
"""
This is meant to be run as its own process, and should be the only thing on
the machine that communicates directly with the actuator.
This process provides a request/respoonse nanomsg way of working with the
actuator, to provide for faster responses for the position.
The message is either of the form "get[value]" or "set[value##]"; if the
value is set, an "ok" response is sent back.
"""
from wni.actuator import Actuator, FakeActuator
try:
act = Actuator()
except ValueError as e:
logger.exception('using fake actuator')
act = FakeActuator()
server = NanoServer(act, bind_addr)
server.serve()
[docs]def ad9361_server(bind_addr=config.AD9361_ADDR,
spidev_name=config.AD9361_SPI):
from wni.ad9361.ad9361 import AD9361
ad9361 = AD9361(spidev_name)
server = NanoServer(ad9361, bind_addr)
server.serve()
[docs]class AD9361Client(NanoClient):
def __init__(self, addr=config.AD9361_ADDR, **kwargs):
from wni.ad9361.ad9361 import AD9361
super().__init__(addr, AD9361, **kwargs)
[docs]def scan_timer_server(bind_addr=config.TIMER_ADDR):
from wni.scan_timer import _ScanTimer
scan_timer = _ScanTimer()
server = NanoServer(scan_timer, bind_addr)
server.serve()
[docs]def radar_server(bind_addr='tcp://0.0.0.0:{}'.format(config.RADAR_DAEMON_PORT)):
from wni.radar import Radar
radar = Radar()
server = NanoServer(radar, bind_addr)
server.serve()
[docs]class ScanTimerClient(NanoClient):
def __init__(self, addr=config.TIMER_ADDR, **kwargs):
from wni.scan_timer import _ScanTimer
super().__init__(addr, _ScanTimer, **kwargs)
self.sock.set_int_option(nanomsg.REQ, nanomsg.REQ_RESEND_IVL, 2**31 - 1)
[docs]def wait_for_connection(sock, timeout):
"""
Busy-wait for connection on `sock`. If it does not happen within specified
time, `assert False` is raised.
"""
start = time.monotonic()
iters = 0
while get_current_connections(sock) == 0:
iters += 1
later = time.monotonic()
if later - start > timeout:
raise ValueError()
return iters
[docs]def r356_server(bind_addr=config.MOTOR_ADDR, use_fake=False):
"""
This is meant to be run as its own process, and should be the only thing on
the machine that communicates directly with the motor.
This process provides a request/response nanomsg way of working with the
motor, to provide for faster responses for the position.
Usage:
p = multiprocessing.Process(target=R356NanomsgServer)
p.start()
"""
from wni.util import NanoServer
import serial
from wni.motor.r356 import Motor, R356, FakeR356
if not use_fake:
try:
r356 = R356()
except serial.SerialException:
logger.exception('Motor not found: Using FakeR356.')
r356 = FakeR356()
else:
r356 = FakeR356()
motor = Motor(r356)
server = NanoServer(motor, bind_addr)
server.serve()
[docs]class Notifier:
"""
Connects to the timer surveyor.
"""
def __init__(self, surveyor_addr=config.TIMER_SURVEY_ADDR):
# set self.sock early, so that if connecting fails for whatever reason,
# the __del__ method won't raise another exception
self.sock = None
self.sock = nanomsg.Socket(nanomsg.RESPONDENT)
self.sock.connect(surveyor_addr)
self.subsock = nanomsg.Socket(nanomsg.SUB)
self.sock.recv_timeout = 6000
self.subsock.set_string_option(nanomsg.SUB, nanomsg.SUB_SUBSCRIBE, b'')
self.subsock.connect(config.TIMER_SYNC_ADDR)
# we have to make sure we are constantly ready to send/receive
# so that
self.bgthread = threading.Thread(target=self.bg)
self.queue = queue.Queue()
self.bgthread.daemon = True
self.bgthread.start()
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
[docs] def bg(self):
while True:
try:
msg = self.sock.recv()
self.sock.send(b'k thx')
except nanomsg.NanoMsgAPIError as e:
if e.errno == nanomsg.EBADF:
# the main thread closed the socket
break
self.queue.put(msg)
[docs] def close(self):
if self.sock is not None:
self.sock.close()
[docs] def flush_queue(self):
to = self.subsock.recv_timeout
try:
self.subsock.recv_timeout = 0
while True:
try:
self.subsock.recv()
except nanomsg.NanoMsgAPIError:
break
finally:
self.subsock.recv_timeout = to
while True:
try:
self.queue.get_nowait()
except queue.Empty:
break
[docs] def wait_for_msg(self, timeout=None):
msg = self.queue.get(timeout=timeout)
return msg
[docs] def sync_start(self):
"""
Should be called after getting the surveyor message if you want to know
exactly when a scan starts.
"""
self.subsock.recv()
def __del__(self):
self.close()
[docs]class MotorClient(NanoClient):
def __init__(self, addr=config.MOTOR_ADDR, **kwargs):
from wni.motor.r356 import Motor
super().__init__(addr, Motor, **kwargs)
[docs]def system_config_server(bind_addr=config.SYSTEM_CONFIG_ADDR):
from wni.system_config import SystemConfig
settings = SystemConfig()
server = NanoServer(settings, bind_addr)
server.serve()
[docs]class SystemConfigClient(NanoClient):
def __init__(self, addr=config.SYSTEM_CONFIG_ADDR, **kwargs):
from wni.system_config import SystemConfig
super().__init__(addr, SystemConfig, **kwargs)
[docs]class PositionIndicatorClient(NanoClient):
def __init__(self, addr=config.POS_INDICATOR_ADDR, **kwargs):
from wni.position_indicator import PositionIndicator
super().__init__(addr, PositionIndicator, **kwargs)
[docs]def data_client1():
"""
Return a data client for channel 1 that can only be used from the
processing computer or Microzed
"""
return wni.data_client.DataClient(connect_addr=config.CH1_LOCAL_MOMENT_ADDR)
[docs]def data_client2():
"""
Return a data client for channel 2 that can only be used from the
processing computer or Microzed
"""
return wni.data_client.DataClient(connect_addr=config.CH2_LOCAL_MOMENT_ADDR)
[docs]def pcpm_client(timeout=1000):
"""
Return a client for the processing computer process manager that can only
be used from the processing computer or Microzed.
"""
import wni.processes
return wni.processes.NanoProcessManagerClient(config.PC_PM_ADDR,
recv_timeout=timeout)
[docs]def uzpm_client(timeout=1000):
"""
Return a client for the Microzed process manager that can only be used from
the processing computer or Microzed.
"""
import wni.processes
return wni.processes.NanoProcessManagerClient(config.UZ_PM_ADDR,
recv_timeout=timeout)