Source code for wni.datautil

"""
Utilities for scripting with the data produced by the WNI radar.
"""

import abc
import collections
import datetime
import functools
import mmap
import os
import time

import msgpack
import nanomsg

import wni.util
from wni.util import ext_hook_unpack, packb, unpackb
import wni.config

# speed of light in m/s
C = 299792458.0

OUTPUT_DATA_FORMATS = 'matlab', 'hdf5', 'msgpack'

# pylint: disable=invalid-name

# we don't need scipy.io or h5py unless we're running certain scripts.
try:
    import scipy.io
except ImportError:
    pass

try:
    import h5py
except ImportError:
    pass

TYPEID = '__type__'


[docs]def remove_none(waveforms): new = [] for wfm in waveforms: new.append({k: v for k, v in wfm.items() if v is not None}) return new
[docs]def extract_keys(key, list_of_dicts, replace_none=()): """Extract `key` from every dict in a list of dicts.""" ret = [] for d in list_of_dicts: item = d.get(key) if item is None: item = replace_none ret.append(item) return ret
[docs]def radials_to_dict(radials: list, scan_set=0): """ Return a consolidated dict of radials. Args: radials: the list of radials, as returned from a ``DataClient``. scan_set: Index into each radial within radials. """ radials = [radial[scan_set] for radial in radials] extract = functools.partial(extract_keys, list_of_dicts=radials) az = extract('az') el = extract('el') iq = extract('iq') ref = extract('ref') velocity = extract('velocity') mag_R1 = extract('mag_R1') npulses = extract('npulses') radial_number = extract('radial_number') scan_id = extract('scan_id') timestamp = extract('timestamp') # first radial r = radials[0] data = { 'az': az, 'el': el, 'ref': ref, 'velocity': velocity, 'mag_R1': mag_R1, 'iq': iq, 'gates': r['gates'], 'channel': r['channel'], 'timestamp': timestamp, 'npulses': npulses, 'radial_number': radial_number, 'scan_id': scan_id, 'rev': r['rev'], } return data
[docs]def dt_from_str(string): """Return datetime object from string formatted correctly""" fmt = '%Y/%m/%d_%H:%M' dt = datetime.datetime.strptime(string, fmt) return dt
[docs]def get_unpacker(fname): """Return a ``msgpack.Unpacker`` which can read the data saved by pulse processor. If ``fname`` is a string or bytes, a file will be opened for it. Otherwise, ``fname`` is treated as a file opened in binary read-only mode. """ _limit = 2**31 - 1 if isinstance(fname, (str, bytes)): f = open(fname, 'rb') else: # we already have a file f = fname unpacker = msgpack.Unpacker( f, ext_hook=ext_hook_unpack, raw=False, max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit, max_map_len=_limit, max_ext_len=_limit, ) return unpacker
[docs]def pack_radials(radials): """ Return msgpacked bytes in the same format produced by the client applications. """ packed = [packb(radial) for radial in radials] return b''.join(packed)
[docs]def skip_to_time(time, unpacker): """Skip to the radial that occurs after the specified time. """ while True: d = unpacker.get_next_radial() radial_time = datetime.datetime.fromtimestamp(d['timestamp']) if radial_time > time: break
[docs]class MsgpackIter: """Iterates over a msgpack.Unpacker object""" def __init__(self, unpacker): self.unpacker = unpacker def __iter__(self): return self def __next__(self): return self.unpacker.unpack()
[docs]class RadialIter(abc.ABC): """ ABC for iterating over radials produced by the pulse processor. Subclasses must provide a method "get_next_radial" for providing data. """ def __init__(self): self.peeked = []
[docs] def peek(self): """ Examine the next value without consuming the iterator. Multiple calls to this function return the same value. returns None if there is no more data. """ if self.peeked: assert len(self.peeked) == 1 return self.peeked[0] else: radial = self.get_next_radial() self.peeked.append(radial) return radial
[docs] @abc.abstractmethod def get_next_radial(self): """ Return the next radial. If there is no data left, return None. """
def __iter__(self): return self def __next__(self): if self.peeked: return self.peeked.pop() else: radial = self.get_next_radial() if radial is None: raise StopIteration return radial
[docs] def iterate_scan(self): """ Yield radials from the current scan. """ first = next(self) scan_id = first['scan_id'] yield first while True: peeked = self.peek() if peeked is None or peeked['scan_id'] != scan_id: break yield next(self)
[docs] def get_next_ppi(self, nradials=4000, scan_id=None): """ Return a list of radials associated with a single ppi. If the scan id changes while this function is running, the function will return a partial PPI. If ``nradials`` radials are yielded before a PPI is complete, yield nradials, regardless of whether they complete a PPI. If ``scan_id`` is passed and is not None, the function will return ``None`` if the first radial's scan id does not match ``scan_id``. """ if scan_id is not None: peeked = self.peek() if peeked is None: return None if peeked[0]['scan_id'] != scan_id: return None try: first_loop = next(self) except StopIteration: return None if first_loop is None: return None first = first_loop[0] # get the first azimuth angle as a reference point. When we detect # that the reference point has been crossed, we'll break and return # the radials we have. first_az = first['az'] scan_id = first['scan_id'] radials = [first_loop] next_rad = self.peek() if next_rad is None: return None next_rad = next_rad[0] if (next_rad['az'] - first_az) % 360 < 180: direction = 1 else: direction = -1 # We fudge the position of the next radial, so that we don't end up # with a teeny tiny radial slice. At the expense of having a slightly # larger final radial. fudge = direction * abs(next_rad['az'] - first_az) * 0.1 for i, radial in enumerate(self, 1): radials.append(radial) if i == nradials: break az = (radial[0]['az'] - first_az) % 360 next_radial = self.peek() if next_radial is None: # Return whatever we've got so far break next_ = next_radial[0] if next_['scan_id'] != scan_id: # Return whatever we've got so far break next_az = (next_['az'] + fudge - (first_az)) % 360 diff = az - (next_az) if direction == 1 and diff > 0: break elif direction == -1 and diff < 0: break elif len(radials) == nradials: break return scan_id, radials
[docs]class MsgpackRadialIter(RadialIter): """Return an object that can iterate over radials.""" def __init__(self, unpacker): super().__init__() self.unpacker = unpacker # TODO: We can remove this isort stuff eventually. If this is still # here in January 2019 or later, rip it out. It was originally added # because the radar would output radials in the wrong order. self.iterable = MsgpackIter(unpacker) first = self.peek() if 'system_settings' in first: self.scan_info = next(self) else: self.scan_info = None
[docs] def get_next_radial(self): try: self._last_seek_pos = self.unpacker.tell() return next(self.iterable) except msgpack.OutOfData: return None
[docs] @classmethod def from_file(cls, path): unpacker = get_unpacker(path) return cls(unpacker)
[docs] def tell(self): """ Tell bytes offset in file. """ if self.peeked: return self._last_seek_pos else: return self.unpacker.tell()
[docs]class NanomsgSubRadialIter(RadialIter): """ Listens on a Nanomsg Subscribe socket and provides radials as they are ready. Uses a nanomsg.SUBSCRIBE socket; for live data, you should use the NanomsgRadialIter class. """ def __init__(self, addr=wni.config.CH1_LOCAL_MOMENT_ADDR, timeout=3000): super().__init__() self.socket = nanomsg.Socket(nanomsg.SUB) self.socket.connect(addr) self.socket.set_string_option(nanomsg.SUB, nanomsg.SUB_SUBSCRIBE, b'') self.socket.set_int_option( nanomsg.SOL_SOCKET, nanomsg.RCVMAXSIZE, -1 ) self.socket.set_int_option( nanomsg.SOL_SOCKET, nanomsg.RCVBUF, int(64E6) ) self.socket.recv_timeout = timeout
[docs] def get_next_radial(self): try: data = self.socket.recv() except nanomsg.NanoMsgAPIError: return None return wni.util.unpackb(data)
[docs]class DataClientRadialIter(RadialIter): """ Make requests for data using a ``wni.data_client.DataClient``. """ def __init__(self, addr=wni.config.CH1_LOCAL_MOMENT_ADDR, timeout=3000): super().__init__() self._backlog = collections.deque(maxlen=1000) self._timeout = timeout self.client = wni.data_client.DataClient( addr, recv_timeout=timeout, send_timeout=timeout )
[docs] def get_next_radial(self): # this is a bit trickier to implement, because we cannot rely on the # timeout on the socket to indicate there's not more data; the server # always gives a response. # if we have data we haven't given yet, give it now. if self._backlog: return self._backlog.popleft() try: # let 1/10 of a second's worth of data accumulate... # we're also trying not to overwhelm the server with unnecessary # requests. if self._timeout >= 0: too_late = time.time() + (self._timeout / 1000) else: # set timeout to 1 year... surely we shouldn't need that much # time too_late = time.time() + 3600 * 24 * 365 while True: sleep_time = min(0.1, too_late - time.time()) # in case we're negative, sleep for 0 seconds sleep_time = max(sleep_time, 0) time.sleep(sleep_time) data = self.client.all() if data: self._backlog.extend(data) return self._backlog.popleft() else: if time.time() > too_late: return None except nanomsg.NanoMsgAPIError: # this is probably a real problem, indicating that the server is # not running. return None return wni.util.unpackb(data)
[docs]def get_scan_metadata(radial): """ Return scan metadata from radial. (Basically returns the same dict with the data keys removed) """ r = radial return { 'rx_gain': [r['ch1_rx_gain'], r['ch2_rx_gain']], 'ref_cal': [r['ch1_ref_cal'], r['ch2_ref_cal']], 'time': datetime.datetime.fromtimestamp(r['timestamp']), 'npulses': r['npulses'], 'prt': r['prt'], 'scan_id': r['scan_id'], 'waveforms': r['waveforms'], }
[docs]class PPISelector: """ Select PPIs from a data file. """ def __init__(self, fname): self.fname = fname self._f = open(self.fname, 'rb') self.map = mmap.mmap( self._f.fileno(), length=0, access=mmap.ACCESS_READ, ) # ppi locations: list of dicts. each dict has the fields # "offset", "length", "num_radials", and "timestamp" self._ppis = [] self.__find_ppis() def __find_ppis(self): riter = MsgpackRadialIter.from_file(self.fname) first = riter.peek() first = first[0] if 'az' not in first.keys(): # this is a data file written on the radar, so the first chunk of # data is a bunch of metadata next(riter) curpos = riter.tell() while True: ppi = riter.get_next_ppi() if ppi is None: break num_radials = len(ppi[1]) lastpos = curpos curpos = riter.tell() self._ppis.append({ 'offset': lastpos, 'length': curpos - lastpos, 'num_radials': num_radials, 'time': ppi[1][0]['timestamp'] }) @functools.lru_cache(maxsize=50) def __getitem__(self, idx): _limit = 2**31 - 1 if isinstance(idx, tuple): raise ValueError('Only 1 PPI may be selected at a time') info = self._ppis[idx] self.map.seek(info['offset']) unpacker = msgpack.Unpacker( self.map, raw=False, ext_hook=ext_hook_unpack, max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit, max_map_len=_limit, max_ext_len=_limit, ) radials = [next(unpacker) for _ in range(info['num_radials'])] return radials
[docs]class PerScanRadialIter: """ Iterate radials within a scan. There is no public constructor for this class; an instance is returned from ``ScanIter.__next__`` """ def __init__(self, radial_iterator): self.riter = radial_iterator # figure out what scan id we need. first = self.riter.peek()[0] self.current_id = first['scan_id'] def __iter__(self): return self def __next__(self): # if the peeked value is part of the current scan, return next # else return a new PerScanRadialIter (with new scan settings) peeked = self.riter.peek()[0] if peeked['scan_id'] != self.current_id: raise StopIteration else: return next(self.riter)
[docs] def ppi_iter(self): """ Iterate over PPIs of the scan. """ while True: ppi = self.riter.get_next_ppi(scan_id=self.current_id) if ppi is None: return else: yield ppi
[docs]class ScanIter: """ ScanIter allows the following pattern: .. code-block:: python3 radial_iterator = NanomsgReqRadialIter(addr=...) for scan_settings, special_radial_iter in ScanIter(): # special radial_iter will yield radials until it is dones # do something with scan_settings, e.g. open a new file for radial in special_radial_iter: # do something with each radial # if you want to work with ppis, use this for loop instead of the # above one: # for scan_id, ppi in special_radial_iter.ppi_iter(): # # do something cool with each ppi """ def __init__(self, radial_iterator: RadialIter, info_sock: nanomsg.Socket): self.riter = radial_iterator self.info_sock = info_sock def __iter__(self): return self def __next__(self): info = self._current_scan_info() new_iterable = PerScanRadialIter(self.riter) return info, new_iterable def _current_scan_info(self): # request doesn't actually matter self.info_sock.send(b'give me data') data = self.info_sock.recv() return unpackb(data)
[docs]def unpack_dataset(item): """Reconstruct a hdfdict dataset. Parameters ---------- item : h5py.Dataset Returns ------- value : Unpacked Data """ value = item[()] if TYPEID in item.attrs: if item.attrs[TYPEID].astype(str) == 'datetime': if hasattr(value, '__iter__'): value = [datetime.datetime.fromtimestamp(ts) for ts in value] else: value = datetime.datetime.fromtimestamp(value) return value
# Code for loading/saving hdf5 from dicts from # https://github.com/SiggiGue/hdfdict/blob/master/hdfdict/hdfdict.py # # Original code licensed under MIT. # Some modifications were made; mostly, no support for laziness.
[docs]def load_hdf(hdf, unpacker=unpack_dataset, *args, **kwargs): """Returns a dictionary containing the groups as keys and the datasets as values from given hdf file. Parameters ---------- hdf : `h5py.File()` or `h5py.Group()` upacker : callable Unpack function gets `value` of type h5py.Dataset. Must return the data you would like to have it in the returned dict. Returns ------- d : dict The dictionary containing all groupnames as keys and datasets as values. """ # we don't want to import at the toplevel because it may not be installed def _recurse(hdfobject, datadict): for key, value in hdfobject.items(): if isinstance(value, h5py.Group): datadict[key] = _recurse(value, {}) elif isinstance(value, h5py.Dataset): value = unpacker(value) datadict[key] = value return datadict data = {} return _recurse(hdf, data)
[docs]def pack_hdf5_dataset(hdfobject, key, value): """Packs a given key value pair into a dataset in the given hdfobject.""" isdt = None if isinstance(value, datetime.datetime): value = value.timestamp() isdt = True if hasattr(value, '__iter__'): if all(isinstance(i, datetime.datetime) for i in value): value = [item.timestamp() for item in value] isdt = True ds = hdfobject.create_dataset(name=key, data=value) if isdt: ds.attrs.create( name=TYPEID, data=b'datetime')
[docs]def dump_hdf(data, hdf, packer=pack_hdf5_dataset, *args, **kwargs): """Adds keys of given dict as groups and values as datasets to the given hdf-file (by string or object) or group object. Parameters ---------- data : dict The dictionary containing only string keys and data values or dicts again. hdf : string (path to file) or `h5py.File()` or `h5py.Group()` packer : callable Callable gets `hdfobject, key, value` as input. `hdfobject` is considered to be either a h5py.File or a h5py.Group. `key` is the name of the dataset. `value` is the dataset to be packed and accepted by h5py. Returns ------- hdf : obj `h5py.Group()` or `h5py.File()` instance """ def _recurse(datadict, hdfobject): for key, value in datadict.items(): if isinstance(key, tuple): key = '_'.join((str(i) for i in key)) if isinstance(value, dict): hdfgroup = hdfobject.create_group(key) _recurse(value, hdfgroup) else: packer(hdfobject, key, value) _recurse(data, hdf) return hdf
def _fixup_settings(settings): """ Modify the scan_settings dict passed in to add fields with expected units. The following modifications are made: * "prt_seconds" is added to scan settings * "wavelength" is added to transceiver settings Since it modifies the settings in place, ``None`` is returned. """ conf = settings['scan_settings'] for ch in 'ch1', 'ch2': ch_settings = conf[ch] for i in range(ch_settings['num_sets']): scan_set = ch_settings['set{}'.format(i)] scan_set['prt_seconds'] = scan_set['prt'] / 1e6 if scan_set['pc_fir_taps'] is None: scan_set['pc_fir_taps'] = [] freq = settings['transceiver']['tx_frequency'] wavelength = C / freq settings['transceiver']['wavelength'] = wavelength
[docs]def write_scan_settings(f, scan_settings, file_format): # add prt in seconds _fixup_settings(scan_settings) to_write = {'settings': scan_settings} if file_format == 'msgpack': f.write(wni.util.packb(to_write)) elif file_format == 'hdf5': dump_hdf(to_write, f) elif file_format == 'matlab': scipy.io.savemat(f, to_write) else: raise NotImplementedError("Unknown format {}".format(file_format))
[docs]def write_ppi(f, ppi, file_format): loops = {} for i in range(len(ppi[0])): as_dict = radials_to_dict(ppi, i) loops['set{}'.format(i)] = as_dict to_dump = {'data': loops} if file_format == 'msgpack': f.write(wni.util.packb(to_dump)) elif file_format == 'hdf5': dump_hdf(to_dump, f) elif file_format == 'matlab': scipy.io.savemat(f, to_dump) else: raise NotImplementedError("Unknown format {}".format(file_format))
def _mat_to_dict(toplevel_data): def _recurse(node, d): if not isinstance(node, scipy.io.matlab.mio5_params.mat_struct): # it's a leaf buddy return node for field in node._fieldnames: d[field] = _recurse(getattr(node, field), {}) return d return _recurse(toplevel_data, {})
[docs]def read_dumped_data(fname): """Reads a data file created by the script ``wni.scripts.dump_ppi``. Returns a dict with two keys: ``"scan_info" and "data")``. ``scan_info`` is a nested dict, corresponding to all scan settings for the PPI. ``data`` is a dict of scan setting sets. The keys of ``data`` are "set0", "set1", ... "set[n]", for n == the number of configured scan settings. If the radar is using a single prt and waveform, "set0" will likely be the only set available. """ ext = os.path.splitext(fname)[1] if ext == '.msgpack': with open(fname, 'rb') as f: unpacker = get_unpacker(f) settings, data = list(unpacker) settings.update(data) return settings elif ext == '.mat': loaded = scipy.io.loadmat(fname, squeeze_me=True, struct_as_record=False) new_data = { 'settings': _mat_to_dict(loaded['settings']), 'data': _mat_to_dict(loaded['data']) } return new_data elif ext == '.hdf5': with h5py.File(fname) as f: return load_hdf(f)