Source code for wni.util

"""Random utilities"""


from __future__ import absolute_import, print_function

from functools import partial
from contextlib import redirect_stderr, redirect_stdout

import io
import logging.handlers
import queue
import re
import sys
import struct
import time
import threading

import msgpack
import nanomsg
import numpy as np


import wni.config as config
import wni.data_client
from wni.waveforms import Chirp
from wni.nano import get_current_connections


logger = logging.getLogger(__name__)


# the instances of these classes can be
PACKABLE_CLASSES = (
    type(None),
    int, float,
    bytes, str,
    np.ndarray,
    Chirp,
)


[docs]def nested_dict_merged(d1, d2): """ Return a dict representing a merged dict of d1 and d2. The values in d2 supersede the values in d1, but there is an important case here: if the value is another dict, this method will be called again for the two dicts. """ # nb: dicts can contain *themselves*, isn't that terrible? # we need to handle that case. def recurse(d1, d2, seen: set): if id(d1) in seen or id(d2) in seen: raise ValueError('Cannot merge nested eicts') seen.add(id(d1)) seen.add(id(d2)) out = {} all_keys = d1.keys() | d2.keys() sentinel = object() for k in all_keys: v1 = d1.get(k, sentinel) v2 = d2.get(k, sentinel) if isinstance(v1, dict) and isinstance(v2, dict): out[k] = recurse(v1, v2, seen) elif v1 is sentinel: out[k] = v2 elif v2 is sentinel: out[k] = v1 else: # v2 wins when both are present out[k] = v2 return out return recurse(d1, d2, set())
[docs]class nullcontext: # This exists in Python 3.7+, but that's too new for now def __init__(self, enter_result=None): self.enter_result = enter_result def __enter__(self): return self.enter_result def __exit__(self, *exc_info): return
class _ProxyInfo(object): """Used by NanoClient/NanoServer for proxying items or names.""" def __init__(self, name=None, item=None): self.name = name self.item = item class _AttrProxy(object): """Attribute Proxy, used by NanoClient/NanoServer""" _MY_ATTRS = 'name', 'client' def __init__(self, client, name): self.name = name self.client = client def __call__(self, *args, **kwargs): cmd = {'cmd': 'call', 'name': self.name, 'args': args, 'kwargs': kwargs } return self.client._send_and_recv(cmd) def __getattr__(self, name): data = { 'cmd': 'getattr', 'name': self.name + '.' + name, } return self.client._send_and_recv(data) def __getitem__(self, name): data = { 'cmd': 'getattr', 'name': self.name + '[' + repr(name) + ']' } return self.client._send_and_recv(data) def __setattr__(self, name, value): if name in self._MY_ATTRS: super().__setattr__(name, value) else: data = { 'cmd': 'setattr', 'head': self.name, 'name': name, 'value': value, } return self.client._send_and_recv(data) def __dir__(self): cmd = { 'cmd': 'call', 'name': self.name + '.__dir__', 'args': [], 'kwargs': {}, } return self.client._send_and_recv(cmd) def __repr__(self): cmd = {'cmd': 'call', 'name': self.name + '.__str__', 'args': [], 'kwargs': {} } objrepr = self.client._send_and_recv(cmd) return "proxy of %s" % objrepr def __len__(self): cmd = { 'cmd': 'call', 'name': self.name + '.__len__', 'args': [], 'kwargs': {}, } return self.client._send_and_recv(cmd) def _pack_uint(n): return struct.pack("I", n)
[docs]class EXT_TYPES: def __new__(cls, *_, **__): raise TypeError("this class should not be instantiated.") complex = 0 ndarray = 1 exception = 2 proxy = 3 chirp = 4 slice = 5
[docs]def default_pack(o): """ Serialize (certain) Python objects using msgpack. Currently supported objects: 1. complex numbers 2. Exceptions (limited support) 3. :class:`_ProxyInfo` objects, for use in ``NanoClient``/``NanoServer`` communication 4. numpy.ndarray """ if isinstance(o, complex): packed = struct.pack("ff", o.real, o.imag) ext = msgpack.ExtType(EXT_TYPES.complex, packed) elif isinstance(o, Exception): import traceback msg = ''.join(traceback.format_exception(type(o), o, o.__traceback__)) d = { 'name': o.__class__.__name__, 'args': o.args, 'traceback': msg, } packed = packb(d) ext = msgpack.ExtType(EXT_TYPES.exception, packed) elif isinstance(o, _ProxyInfo): # _ProxyInfo used for NanoServer/NanoClient proxy objects d = { 'name': o.name, 'item': o.item, } packed = packb(d) ext = msgpack.ExtType(EXT_TYPES.proxy, packed) elif isinstance(o, np.ndarray): dt = o.dtype.num shape = o.shape packed = packb([dt, shape, o.tostring()]) ext = msgpack.ExtType(EXT_TYPES.ndarray, packed) elif isinstance(o, Chirp): kwargs = { 'nsamples': o.nsamples, 'center': o.center, 'bw': o.bw, 'window': o.window, 'clk_freq': o.clk, 'scale': o.scale, } packed = msgpack.packb(kwargs) ext = msgpack.ExtType(EXT_TYPES.chirp, packed) elif isinstance(o, slice): slice_info = slice.start, slice.stop, slice.step packed = msgpack.packb(slice_info) ext = msgpack.ExtType(EXT_TYPES.slice, packed) else: raise TypeError("Unkown type: %r" % o) return ext
[docs]class ServerException(Exception): """Whenever a NanoServer would have raised an exception, the client receives this exception and raises it.""" def __init__(self, exc_info): self.exc_name = exc_info['name'] try: self.class_ = __builtins__[self.exc_name] except KeyError as e: self.class_ = Exception self.traceback = exc_info['traceback'] def __repr__(self): return "{}({}){}".format(self.exc_name, self.args, self.traceback)
[docs]def ext_hook_unpack(code, data): """ Unpack the Python objects that are known to :func:`default_pack` """ if code == EXT_TYPES.complex: r, i = struct.unpack("ff", data) ret = complex(r, i) elif code == EXT_TYPES.exception: d = unpackb(data) ret = ServerException(d) elif code == EXT_TYPES.proxy: # _ProxyInfo used for NanoServer/NanoClient proxy objects d = unpackb(data) name = d['name'] ret = _ProxyInfo(name) elif code == EXT_TYPES.ndarray: dtype, shape, array_data = msgpack.unpackb( data, max_bin_len=_LIMIT, max_str_len=_LIMIT, max_array_len=_LIMIT, max_map_len=_LIMIT, max_ext_len=_LIMIT, ) flat_array = np.frombuffer(array_data, dtype=np.sctypeDict[dtype]) ret = flat_array.reshape(shape) elif code == EXT_TYPES.chirp: chirp_kwargs = unpackb(data) ret = Chirp(**chirp_kwargs) elif code == EXT_TYPES.slice: slice_info = unpackb(data) ret = slice(*slice_info) else: raise TypeError("Unkown code: %r" % code) return ret
_LIMIT = 2**31 - 1 packb = partial(msgpack.packb, default=default_pack, use_bin_type=True) unpackb = partial( msgpack.unpackb, ext_hook=ext_hook_unpack, raw=False, max_bin_len=_LIMIT, max_str_len=_LIMIT, max_array_len=_LIMIT, max_map_len=_LIMIT, max_ext_len=_LIMIT, )
[docs]def setter(fset): return property(None, fset)
[docs]class NanoClient(object): """ A client for a NanoServer. Messages between client and server are encoded and decoded via msgpack. """ _MY_ATTRS = 'addr sock class_ _nanoclient_lock'.split() def __init__(self, connect_addr, class_=None, recv_timeout=10000): """if class_ is provided, it will be introspected to avoid unnecessary queries to the server. This only applies to functions. """ self.addr = connect_addr self.class_ = class_ self.sock = nanomsg.Socket(nanomsg.REQ) self.sock.connect(connect_addr) self.sock.recv_timeout = recv_timeout self._nanoclient_lock = threading.Lock() def __setattr__(self, name, value): if name in self._MY_ATTRS: super().__setattr__(name, value) else: data = { 'cmd': 'setattr', 'head': '', 'name': name, 'value': value, } return self._send_and_recv(data) def _send_and_recv(self, data): """ Send a dict (data) and return the response. """ with self._nanoclient_lock: self._send_dict(data) return self._get_resp()
[docs] def getattr(self, name): """ Return the attribute from the server """ data = { 'cmd': 'getattr', 'name': name, } return self._send_and_recv(data)
def _getattrs(self, attrs): data = { 'cmd': 'getattrs', 'attrs': attrs } return self._send_and_recv(data) def __getattr__(self, name): # check self.class_ to see if the name given is callable. # NOTE this is not perfect; just because something is callable doesn't # mean that the user actually _wants_ to call it, but for me, for now, # this is the right thing. # # this optimization removes the need for a round-trip to the server. if self.class_ is not None: # hasattr() can fail sometimes whenever it shouldn't... has_attr = False try: has_attr = hasattr(self.class_, name) except Exception: pass if has_attr: if callable(getattr(self.class_, name)): return _AttrProxy(self, name) return self.getattr(name)
[docs] def kill_the_server(self): data = { 'cmd': 'meta', 'meta_cmd': 'kill_the_server' } return self._send_and_recv(data)
def _send_dict(self, d): packed = packb(d) self.sock.send(packed)
[docs] def close_nanoclient(self): """ Closes the underlying socket. This isn't just called close() because it is too likely that we're proxying an object with a close() method. """ self.sock.close()
def _get_resp(self): """Waits for a response from the server. If the server returned an Exception, that is raised. """ try: stdout = '' stderr = '' data = self.sock.recv() except nanomsg.NanoMsgAPIError as exc: # we would use errno, EXCEPT there is a bug in nanomsg-python that # totally mangles the number. str_err = str(exc) if str_err.startswith("Connection timed out"): msg = ('NanoClient timed out. Probably, the associated server ' 'is either not running, or it is listening on a different ' 'IP address. If everything is set up correctly, you may ' 'just need to increase the NanoClient recv_timeout.') raise RuntimeError(msg) from exc raise resp = unpackb(data) retval = resp['return'] stderr = resp['stderr'] stdout = resp['stdout'] if stderr: print(stderr, end='', file=sys.stderr) if stdout: print(stdout, end='') if isinstance(retval, ServerException): raise retval.class_(retval.traceback) elif isinstance(retval, _ProxyInfo): return _AttrProxy(self, retval.name) return retval def __dir__(self): recvd = self._send_and_recv({ 'cmd': 'meta', 'meta_cmd': 'dir', }) return recvd def __del__(self): self.sock.close() def __enter__(self): return self def __exit__(self, *exc_info): self.close_nanoclient() def __getitem__(self, item): data = { 'cmd': 'getattr', # this is a pretty bad way to serialize the item... 'name': '[' + repr(item) + ']', } return self._send_and_recv(data)
[docs]def is_leaf(obj): if isinstance(obj, PACKABLE_CLASSES): return True if isinstance(obj, (list, tuple)): return all(is_leaf(v) for v in obj) if isinstance(obj, dict): return all(is_leaf(k) and is_leaf(v) for k, v in obj.items()) return False
[docs]class NanoServer(object): """ Manage an object remotely via nanomsg. Messages between client and server are encoded and decoded via msgpack. """ # sentinel object so that meta commands can unambiguously be recognized _META_COMMAND = object() def __init__(self, obj, bind_addr, capture_stdstreams=True): """ Serve ``obj``, listening with a nanomsg.REQ socket at ``bind_addr``. """ self.obj = obj self.addr = bind_addr self.sock = nanomsg.Socket(nanomsg.REP) self.sock.bind(bind_addr) self.sock.send_timeout = 1000 self.stdout = io.StringIO() self.stderr = io.StringIO() self.capture_stdstreams = capture_stdstreams
[docs] def serve(self): self.running = True stderr = '' stdout = '' while self.running: data = self.sock.recv() try: d = unpackb(data) stderr_pos = self.stderr.tell() stdout_pos = self.stdout.tell() if self.capture_stdstreams: r_err = redirect_stderr r_out = redirect_stdout else: r_err = nullcontext r_out = nullcontext try: with r_err(self.stderr), r_out(self.stdout): resp = self.handle_request(d) except Exception as e: resp = e if self.capture_stdstreams: self.stderr.seek(stderr_pos) self.stdout.seek(stdout_pos) stderr = self.stderr.read() stdout = self.stdout.read() else: stderr = stdout = '' except Exception as e: self._send_obj(e, stdout, stderr) else: if resp is self._META_COMMAND: # we're unpacking it a again which is inefficient, but meta # commands are uncommon and it keeps the handle_request() # function fairly clean cmd = unpackb(data) self._handle_meta_request(cmd['meta_cmd']) # killing the server is a special case because it has to # cleanly exit this loop if cmd['meta_cmd'] == 'kill_the_server': self.running = False else: self._send_obj(resp, stdout, stderr)
def _handle_meta_request(self, req): """Handles meta requests (requests that don't directly pass through to the client) """ if req == 'kill_the_server': self._send_obj(None) self.sock.close() elif req == 'dir': self._send_obj(dir(self.obj)) else: resp = ValueError('The command %s is unrecognized', req) self._send_obj(resp) def _get_obj_attr(self, name): """Returns the correct child attribute of self.obj. `name` is allowed to have dots separating names, so name == 'child.attr' will return up self.obj.child.attr. """ if not name: return self.obj name = name.replace(' ', '') parts = re.split(r'([[].*?])|\.', name) obj = self.obj for part in parts: if not part or part == '.': continue elif part.startswith('['): # not a good way to deserialize... val = eval(part)[0] obj = obj[val] else: obj = getattr(obj, part) return obj
[docs] def handle_request(self, d): """Handle a client's request""" cmd = d['cmd'] if cmd == 'setattr': name = d['name'] head = d['head'] obj = self._get_obj_attr(head) setattr(obj, name, d['value']) return None elif cmd == 'call': func = self._get_obj_attr(d['name']) args = d['args'] kwargs = d['kwargs'] ret = func(*args, **kwargs) return ret elif cmd == 'getattr' or cmd == 'getitem': attr = self._get_obj_attr(d['name']) # return the thing directly if msgpack can deal with it return self._fixup_obj(attr, d['name']) elif cmd == 'getattrs': attrs = d['attrs'] ret = {} for attrname in attrs: attr = self._get_obj_attr(attrname) ret[attrname] = self._fixup_obj(attr, attrname) return ret elif cmd == 'meta': return self._META_COMMAND else: raise ValueError('Unrecognized command %s', cmd)
def _fixup_obj(self, obj, name): if is_leaf(obj): return obj else: return _ProxyInfo(name)
[docs] def close(self): self.sock.close()
def _send_obj(self, obj, stdout='', stderr=''): send_obj(self.sock, obj, stdout, stderr) def __del__(self): self.sock.close() def _peaceful_exit(self): """ Stop the serve() loop. The use case is to call this from a separate thread than the serve() thread. This was created for testing, so that if a client screws up, the server can still be killed """ c = NanoClient(self.addr) c.kill_the_server() self.close()
[docs]def send_obj(sock, data, stdout='', stderr=''): try: obj = packb({'return': data, 'stdout': stdout, 'stderr': stderr}) sock.send(obj) except TypeError as e: obj = packb({'return': e, 'stdout': stdout, 'stderr': stderr}) sock.send(obj)
# make all the nanoclients and servers here, to avoid circular imports on # things that require util.py
[docs]class DpramReaderClient(NanoClient): def __init__(self, addr=config.WAVEFORM_ADDR, **kwargs): from wni.axi import DpramReader super().__init__(addr, DpramReader, **kwargs)
[docs]class ScanConfClient(NanoClient): def __init__(self, addr=config.SCAN_CONF_ADDR, **kwargs): from wni.scan_config import ScanConf super().__init__(addr, ScanConf, **kwargs)
[docs]def scan_conf_server(bind_addr=config.SCAN_CONF_ADDR): from wni.scan_config import ScanConf scan_conf = ScanConf() server = NanoServer(scan_conf, bind_addr) server.serve()
[docs]class ActuatorClient(NanoClient): def __init__(self, addr=config.ACTUATOR_ADDR, **kwargs): from wni.actuator import Actuator super().__init__(addr, Actuator, **kwargs)
[docs]def actuator_server(bind_addr=config.ACTUATOR_ADDR): """ This is meant to be run as its own process, and should be the only thing on the machine that communicates directly with the actuator. This process provides a request/respoonse nanomsg way of working with the actuator, to provide for faster responses for the position. The message is either of the form "get[value]" or "set[value##]"; if the value is set, an "ok" response is sent back. """ from wni.actuator import Actuator, FakeActuator try: act = Actuator() except ValueError as e: logger.exception('using fake actuator') act = FakeActuator() server = NanoServer(act, bind_addr) server.serve()
[docs]def ad9361_server(bind_addr=config.AD9361_ADDR, spidev_name=config.AD9361_SPI): from wni.ad9361.ad9361 import AD9361 ad9361 = AD9361(spidev_name) server = NanoServer(ad9361, bind_addr) server.serve()
[docs]class AD9361Client(NanoClient): def __init__(self, addr=config.AD9361_ADDR, **kwargs): from wni.ad9361.ad9361 import AD9361 super().__init__(addr, AD9361, **kwargs)
[docs]def scan_timer_server(bind_addr=config.TIMER_ADDR): from wni.scan_timer import _ScanTimer scan_timer = _ScanTimer() server = NanoServer(scan_timer, bind_addr) server.serve()
[docs]def radar_server(bind_addr='tcp://0.0.0.0:{}'.format(config.RADAR_DAEMON_PORT)): from wni.radar import Radar radar = Radar() server = NanoServer(radar, bind_addr) server.serve()
[docs]class ScanTimerClient(NanoClient): def __init__(self, addr=config.TIMER_ADDR, **kwargs): from wni.scan_timer import _ScanTimer super().__init__(addr, _ScanTimer, **kwargs) self.sock.set_int_option(nanomsg.REQ, nanomsg.REQ_RESEND_IVL, 2**31 - 1)
[docs]def wait_for_connection(sock, timeout): """ Busy-wait for connection on `sock`. If it does not happen within specified time, `assert False` is raised. """ start = time.monotonic() iters = 0 while get_current_connections(sock) == 0: iters += 1 later = time.monotonic() if later - start > timeout: raise ValueError() return iters
[docs]def r356_server(bind_addr=config.MOTOR_ADDR, use_fake=False): """ This is meant to be run as its own process, and should be the only thing on the machine that communicates directly with the motor. This process provides a request/response nanomsg way of working with the motor, to provide for faster responses for the position. Usage: p = multiprocessing.Process(target=R356NanomsgServer) p.start() """ from wni.util import NanoServer import serial from wni.motor.r356 import Motor, R356, FakeR356 if not use_fake: try: r356 = R356() except serial.SerialException: logger.exception('Motor not found: Using FakeR356.') r356 = FakeR356() else: r356 = FakeR356() motor = Motor(r356) server = NanoServer(motor, bind_addr) server.serve()
[docs]class Notifier: """ Connects to the timer surveyor. """ def __init__(self, surveyor_addr=config.TIMER_SURVEY_ADDR): # set self.sock early, so that if connecting fails for whatever reason, # the __del__ method won't raise another exception self.sock = None self.sock = nanomsg.Socket(nanomsg.RESPONDENT) self.sock.connect(surveyor_addr) self.subsock = nanomsg.Socket(nanomsg.SUB) self.sock.recv_timeout = 6000 self.subsock.set_string_option(nanomsg.SUB, nanomsg.SUB_SUBSCRIBE, b'') self.subsock.connect(config.TIMER_SYNC_ADDR) # we have to make sure we are constantly ready to send/receive # so that self.bgthread = threading.Thread(target=self.bg) self.queue = queue.Queue() self.bgthread.daemon = True self.bgthread.start() def __enter__(self): return self def __exit__(self, *exc_info): self.close()
[docs] def bg(self): while True: try: msg = self.sock.recv() self.sock.send(b'k thx') except nanomsg.NanoMsgAPIError as e: if e.errno == nanomsg.EBADF: # the main thread closed the socket break self.queue.put(msg)
[docs] def close(self): if self.sock is not None: self.sock.close()
[docs] def flush_queue(self): to = self.subsock.recv_timeout try: self.subsock.recv_timeout = 0 while True: try: self.subsock.recv() except nanomsg.NanoMsgAPIError: break finally: self.subsock.recv_timeout = to while True: try: self.queue.get_nowait() except queue.Empty: break
[docs] def wait_for_msg(self, timeout=None): msg = self.queue.get(timeout=timeout) return msg
[docs] def sync_start(self): """ Should be called after getting the surveyor message if you want to know exactly when a scan starts. """ self.subsock.recv()
def __del__(self): self.close()
[docs]class MotorClient(NanoClient): def __init__(self, addr=config.MOTOR_ADDR, **kwargs): from wni.motor.r356 import Motor super().__init__(addr, Motor, **kwargs)
[docs]def system_config_server(bind_addr=config.SYSTEM_CONFIG_ADDR): from wni.system_config import SystemConfig settings = SystemConfig() server = NanoServer(settings, bind_addr) server.serve()
[docs]class SystemConfigClient(NanoClient): def __init__(self, addr=config.SYSTEM_CONFIG_ADDR, **kwargs): from wni.system_config import SystemConfig super().__init__(addr, SystemConfig, **kwargs)
[docs]class PositionIndicatorClient(NanoClient): def __init__(self, addr=config.POS_INDICATOR_ADDR, **kwargs): from wni.position_indicator import PositionIndicator super().__init__(addr, PositionIndicator, **kwargs)
[docs]def data_client1(): """ Return a data client for channel 1 that can only be used from the processing computer or Microzed """ return wni.data_client.DataClient(connect_addr=config.CH1_LOCAL_MOMENT_ADDR)
[docs]def data_client2(): """ Return a data client for channel 2 that can only be used from the processing computer or Microzed """ return wni.data_client.DataClient(connect_addr=config.CH2_LOCAL_MOMENT_ADDR)
[docs]def pcpm_client(timeout=1000): """ Return a client for the processing computer process manager that can only be used from the processing computer or Microzed. """ import wni.processes return wni.processes.NanoProcessManagerClient(config.PC_PM_ADDR, recv_timeout=timeout)
[docs]def uzpm_client(timeout=1000): """ Return a client for the Microzed process manager that can only be used from the processing computer or Microzed. """ import wni.processes return wni.processes.NanoProcessManagerClient(config.UZ_PM_ADDR, recv_timeout=timeout)