"""
Utilities for scripting with the data produced by the WNI radar.
"""
import abc
import collections
import datetime
import functools
import mmap
import os
import time
import msgpack
import nanomsg
import wni.util
from wni.util import ext_hook_unpack, packb, unpackb
import wni.config
# speed of light in m/s
C = 299792458.0
OUTPUT_DATA_FORMATS = 'matlab', 'hdf5', 'msgpack'
# pylint: disable=invalid-name
# we don't need scipy.io or h5py unless we're running certain scripts.
try:
import scipy.io
except ImportError:
pass
try:
import h5py
except ImportError:
pass
TYPEID = '__type__'
[docs]def remove_none(waveforms):
new = []
for wfm in waveforms:
new.append({k: v for k, v in wfm.items() if v is not None})
return new
[docs]def radials_to_dict(radials: list, scan_set=0):
"""
Return a consolidated dict of radials.
Args:
radials: the list of radials, as returned from a ``DataClient``.
scan_set: Index into each radial within radials.
"""
radials = [radial[scan_set] for radial in radials]
extract = functools.partial(extract_keys, list_of_dicts=radials)
az = extract('az')
el = extract('el')
iq = extract('iq')
ref = extract('ref')
velocity = extract('velocity')
mag_R1 = extract('mag_R1')
npulses = extract('npulses')
radial_number = extract('radial_number')
scan_id = extract('scan_id')
timestamp = extract('timestamp')
# first radial
r = radials[0]
data = {
'az': az,
'el': el,
'ref': ref,
'velocity': velocity,
'mag_R1': mag_R1,
'iq': iq,
'gates': r['gates'],
'channel': r['channel'],
'timestamp': timestamp,
'npulses': npulses,
'radial_number': radial_number,
'scan_id': scan_id,
'rev': r['rev'],
}
return data
[docs]def dt_from_str(string):
"""Return datetime object from string formatted correctly"""
fmt = '%Y/%m/%d_%H:%M'
dt = datetime.datetime.strptime(string, fmt)
return dt
[docs]def get_unpacker(fname):
"""Return a ``msgpack.Unpacker`` which can read the data saved by pulse
processor. If ``fname`` is a string or bytes, a file will be opened for
it. Otherwise, ``fname`` is treated as a file opened in binary read-only
mode.
"""
_limit = 2**31 - 1
if isinstance(fname, (str, bytes)):
f = open(fname, 'rb')
else:
# we already have a file
f = fname
unpacker = msgpack.Unpacker(
f,
ext_hook=ext_hook_unpack, raw=False,
max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit,
max_map_len=_limit, max_ext_len=_limit,
)
return unpacker
[docs]def pack_radials(radials):
"""
Return msgpacked bytes in the same format produced by the client
applications.
"""
packed = [packb(radial) for radial in radials]
return b''.join(packed)
[docs]def skip_to_time(time, unpacker):
"""Skip to the radial that occurs after the specified time. """
while True:
d = unpacker.get_next_radial()
radial_time = datetime.datetime.fromtimestamp(d['timestamp'])
if radial_time > time:
break
[docs]class MsgpackIter:
"""Iterates over a msgpack.Unpacker object"""
def __init__(self, unpacker):
self.unpacker = unpacker
def __iter__(self):
return self
def __next__(self):
return self.unpacker.unpack()
[docs]class RadialIter(abc.ABC):
"""
ABC for iterating over radials produced by the pulse processor. Subclasses
must provide a method "get_next_radial" for providing data.
"""
def __init__(self):
self.peeked = []
[docs] def peek(self):
"""
Examine the next value without consuming the iterator.
Multiple calls to this function return the same value.
returns None if there is no more data.
"""
if self.peeked:
assert len(self.peeked) == 1
return self.peeked[0]
else:
radial = self.get_next_radial()
self.peeked.append(radial)
return radial
[docs] @abc.abstractmethod
def get_next_radial(self):
"""
Return the next radial. If there is no data left, return None.
"""
def __iter__(self):
return self
def __next__(self):
if self.peeked:
return self.peeked.pop()
else:
radial = self.get_next_radial()
if radial is None:
raise StopIteration
return radial
[docs] def iterate_scan(self):
"""
Yield radials from the current scan.
"""
first = next(self)
scan_id = first['scan_id']
yield first
while True:
peeked = self.peek()
if peeked is None or peeked['scan_id'] != scan_id:
break
yield next(self)
[docs] def get_next_ppi(self, nradials=4000, scan_id=None):
"""
Return a list of radials associated with a single ppi. If the scan
id changes while this function is running, the function will return a
partial PPI.
If ``nradials`` radials are yielded before a PPI is complete, yield
nradials, regardless of whether they complete a PPI.
If ``scan_id`` is passed and is not None, the function will return
``None`` if the first radial's scan id does not match ``scan_id``.
"""
if scan_id is not None:
peeked = self.peek()
if peeked is None:
return None
if peeked[0]['scan_id'] != scan_id:
return None
try:
first_loop = next(self)
except StopIteration:
return None
if first_loop is None:
return None
first = first_loop[0]
# get the first azimuth angle as a reference point. When we detect
# that the reference point has been crossed, we'll break and return
# the radials we have.
first_az = first['az']
scan_id = first['scan_id']
radials = [first_loop]
next_rad = self.peek()
if next_rad is None:
return None
next_rad = next_rad[0]
if (next_rad['az'] - first_az) % 360 < 180:
direction = 1
else:
direction = -1
# We fudge the position of the next radial, so that we don't end up
# with a teeny tiny radial slice. At the expense of having a slightly
# larger final radial.
fudge = direction * abs(next_rad['az'] - first_az) * 0.1
for i, radial in enumerate(self, 1):
radials.append(radial)
if i == nradials:
break
az = (radial[0]['az'] - first_az) % 360
next_radial = self.peek()
if next_radial is None:
# Return whatever we've got so far
break
next_ = next_radial[0]
if next_['scan_id'] != scan_id:
# Return whatever we've got so far
break
next_az = (next_['az'] + fudge - (first_az)) % 360
diff = az - (next_az)
if direction == 1 and diff > 0:
break
elif direction == -1 and diff < 0:
break
elif len(radials) == nradials:
break
return scan_id, radials
[docs]class MsgpackRadialIter(RadialIter):
"""Return an object that can iterate over radials."""
def __init__(self, unpacker):
super().__init__()
self.unpacker = unpacker
# TODO: We can remove this isort stuff eventually. If this is still
# here in January 2019 or later, rip it out. It was originally added
# because the radar would output radials in the wrong order.
self.iterable = MsgpackIter(unpacker)
first = self.peek()
if 'system_settings' in first:
self.scan_info = next(self)
else:
self.scan_info = None
[docs] def get_next_radial(self):
try:
self._last_seek_pos = self.unpacker.tell()
return next(self.iterable)
except msgpack.OutOfData:
return None
[docs] @classmethod
def from_file(cls, path):
unpacker = get_unpacker(path)
return cls(unpacker)
[docs] def tell(self):
"""
Tell bytes offset in file.
"""
if self.peeked:
return self._last_seek_pos
else:
return self.unpacker.tell()
[docs]class NanomsgSubRadialIter(RadialIter):
"""
Listens on a Nanomsg Subscribe socket and provides radials as they are
ready.
Uses a nanomsg.SUBSCRIBE socket; for live data, you should use the
NanomsgRadialIter class.
"""
def __init__(self, addr=wni.config.CH1_LOCAL_MOMENT_ADDR, timeout=3000):
super().__init__()
self.socket = nanomsg.Socket(nanomsg.SUB)
self.socket.connect(addr)
self.socket.set_string_option(nanomsg.SUB, nanomsg.SUB_SUBSCRIBE, b'')
self.socket.set_int_option(
nanomsg.SOL_SOCKET, nanomsg.RCVMAXSIZE, -1
)
self.socket.set_int_option(
nanomsg.SOL_SOCKET, nanomsg.RCVBUF, int(64E6)
)
self.socket.recv_timeout = timeout
[docs] def get_next_radial(self):
try:
data = self.socket.recv()
except nanomsg.NanoMsgAPIError:
return None
return wni.util.unpackb(data)
[docs]class DataClientRadialIter(RadialIter):
"""
Make requests for data using a ``wni.data_client.DataClient``.
"""
def __init__(self, addr=wni.config.CH1_LOCAL_MOMENT_ADDR, timeout=3000):
super().__init__()
self._backlog = collections.deque(maxlen=1000)
self._timeout = timeout
self.client = wni.data_client.DataClient(
addr, recv_timeout=timeout, send_timeout=timeout
)
[docs] def get_next_radial(self):
# this is a bit trickier to implement, because we cannot rely on the
# timeout on the socket to indicate there's not more data; the server
# always gives a response.
# if we have data we haven't given yet, give it now.
if self._backlog:
return self._backlog.popleft()
try:
# let 1/10 of a second's worth of data accumulate...
# we're also trying not to overwhelm the server with unnecessary
# requests.
if self._timeout >= 0:
too_late = time.time() + (self._timeout / 1000)
else:
# set timeout to 1 year... surely we shouldn't need that much
# time
too_late = time.time() + 3600 * 24 * 365
while True:
sleep_time = min(0.1, too_late - time.time())
# in case we're negative, sleep for 0 seconds
sleep_time = max(sleep_time, 0)
time.sleep(sleep_time)
data = self.client.all()
if data:
self._backlog.extend(data)
return self._backlog.popleft()
else:
if time.time() > too_late:
return None
except nanomsg.NanoMsgAPIError:
# this is probably a real problem, indicating that the server is
# not running.
return None
return wni.util.unpackb(data)
[docs]class PPISelector:
"""
Select PPIs from a data file.
"""
def __init__(self, fname):
self.fname = fname
self._f = open(self.fname, 'rb')
self.map = mmap.mmap(
self._f.fileno(),
length=0,
access=mmap.ACCESS_READ,
)
# ppi locations: list of dicts. each dict has the fields
# "offset", "length", "num_radials", and "timestamp"
self._ppis = []
self.__find_ppis()
def __find_ppis(self):
riter = MsgpackRadialIter.from_file(self.fname)
first = riter.peek()
first = first[0]
if 'az' not in first.keys():
# this is a data file written on the radar, so the first chunk of
# data is a bunch of metadata
next(riter)
curpos = riter.tell()
while True:
ppi = riter.get_next_ppi()
if ppi is None:
break
num_radials = len(ppi[1])
lastpos = curpos
curpos = riter.tell()
self._ppis.append({
'offset': lastpos,
'length': curpos - lastpos,
'num_radials': num_radials,
'time': ppi[1][0]['timestamp']
})
@functools.lru_cache(maxsize=50)
def __getitem__(self, idx):
_limit = 2**31 - 1
if isinstance(idx, tuple):
raise ValueError('Only 1 PPI may be selected at a time')
info = self._ppis[idx]
self.map.seek(info['offset'])
unpacker = msgpack.Unpacker(
self.map, raw=False, ext_hook=ext_hook_unpack,
max_bin_len=_limit, max_str_len=_limit, max_array_len=_limit,
max_map_len=_limit, max_ext_len=_limit,
)
radials = [next(unpacker) for _ in range(info['num_radials'])]
return radials
[docs]class PerScanRadialIter:
"""
Iterate radials within a scan. There is no public constructor for this
class; an instance is returned from ``ScanIter.__next__``
"""
def __init__(self, radial_iterator):
self.riter = radial_iterator
# figure out what scan id we need.
first = self.riter.peek()[0]
self.current_id = first['scan_id']
def __iter__(self):
return self
def __next__(self):
# if the peeked value is part of the current scan, return next
# else return a new PerScanRadialIter (with new scan settings)
peeked = self.riter.peek()[0]
if peeked['scan_id'] != self.current_id:
raise StopIteration
else:
return next(self.riter)
[docs] def ppi_iter(self):
"""
Iterate over PPIs of the scan.
"""
while True:
ppi = self.riter.get_next_ppi(scan_id=self.current_id)
if ppi is None:
return
else:
yield ppi
[docs]class ScanIter:
"""
ScanIter allows the following pattern:
.. code-block:: python3
radial_iterator = NanomsgReqRadialIter(addr=...)
for scan_settings, special_radial_iter in ScanIter():
# special radial_iter will yield radials until it is dones
# do something with scan_settings, e.g. open a new file
for radial in special_radial_iter:
# do something with each radial
# if you want to work with ppis, use this for loop instead of the
# above one:
# for scan_id, ppi in special_radial_iter.ppi_iter():
# # do something cool with each ppi
"""
def __init__(self, radial_iterator: RadialIter, info_sock: nanomsg.Socket):
self.riter = radial_iterator
self.info_sock = info_sock
def __iter__(self):
return self
def __next__(self):
info = self._current_scan_info()
new_iterable = PerScanRadialIter(self.riter)
return info, new_iterable
def _current_scan_info(self):
# request doesn't actually matter
self.info_sock.send(b'give me data')
data = self.info_sock.recv()
return unpackb(data)
[docs]def unpack_dataset(item):
"""Reconstruct a hdfdict dataset.
Parameters
----------
item : h5py.Dataset
Returns
-------
value : Unpacked Data
"""
value = item[()]
if TYPEID in item.attrs:
if item.attrs[TYPEID].astype(str) == 'datetime':
if hasattr(value, '__iter__'):
value = [datetime.datetime.fromtimestamp(ts) for ts in value]
else:
value = datetime.datetime.fromtimestamp(value)
return value
# Code for loading/saving hdf5 from dicts from
# https://github.com/SiggiGue/hdfdict/blob/master/hdfdict/hdfdict.py
#
# Original code licensed under MIT.
# Some modifications were made; mostly, no support for laziness.
[docs]def load_hdf(hdf, unpacker=unpack_dataset, *args, **kwargs):
"""Returns a dictionary containing the groups as keys and the datasets as
values from given hdf file.
Parameters
----------
hdf : `h5py.File()` or `h5py.Group()`
upacker : callable
Unpack function gets `value` of type h5py.Dataset.
Must return the data you would like to have it in the returned dict.
Returns
-------
d : dict
The dictionary containing all groupnames as keys and
datasets as values.
"""
# we don't want to import at the toplevel because it may not be installed
def _recurse(hdfobject, datadict):
for key, value in hdfobject.items():
if isinstance(value, h5py.Group):
datadict[key] = _recurse(value, {})
elif isinstance(value, h5py.Dataset):
value = unpacker(value)
datadict[key] = value
return datadict
data = {}
return _recurse(hdf, data)
[docs]def pack_hdf5_dataset(hdfobject, key, value):
"""Packs a given key value pair into a dataset in the given hdfobject."""
isdt = None
if isinstance(value, datetime.datetime):
value = value.timestamp()
isdt = True
if hasattr(value, '__iter__'):
if all(isinstance(i, datetime.datetime) for i in value):
value = [item.timestamp() for item in value]
isdt = True
ds = hdfobject.create_dataset(name=key, data=value)
if isdt:
ds.attrs.create(
name=TYPEID,
data=b'datetime')
[docs]def dump_hdf(data, hdf, packer=pack_hdf5_dataset, *args, **kwargs):
"""Adds keys of given dict as groups and values as datasets
to the given hdf-file (by string or object) or group object.
Parameters
----------
data : dict
The dictionary containing only string keys and
data values or dicts again.
hdf : string (path to file) or `h5py.File()` or `h5py.Group()`
packer : callable
Callable gets `hdfobject, key, value` as input.
`hdfobject` is considered to be either a h5py.File or a h5py.Group.
`key` is the name of the dataset.
`value` is the dataset to be packed and accepted by h5py.
Returns
-------
hdf : obj
`h5py.Group()` or `h5py.File()` instance
"""
def _recurse(datadict, hdfobject):
for key, value in datadict.items():
if isinstance(key, tuple):
key = '_'.join((str(i) for i in key))
if isinstance(value, dict):
hdfgroup = hdfobject.create_group(key)
_recurse(value, hdfgroup)
else:
packer(hdfobject, key, value)
_recurse(data, hdf)
return hdf
def _fixup_settings(settings):
"""
Modify the scan_settings dict passed in to add fields with expected units.
The following modifications are made:
* "prt_seconds" is added to scan settings
* "wavelength" is added to transceiver settings
Since it modifies the settings in place, ``None`` is returned.
"""
conf = settings['scan_settings']
for ch in 'ch1', 'ch2':
ch_settings = conf[ch]
for i in range(ch_settings['num_sets']):
scan_set = ch_settings['set{}'.format(i)]
scan_set['prt_seconds'] = scan_set['prt'] / 1e6
if scan_set['pc_fir_taps'] is None:
scan_set['pc_fir_taps'] = []
freq = settings['transceiver']['tx_frequency']
wavelength = C / freq
settings['transceiver']['wavelength'] = wavelength
[docs]def write_scan_settings(f, scan_settings, file_format):
# add prt in seconds
_fixup_settings(scan_settings)
to_write = {'settings': scan_settings}
if file_format == 'msgpack':
f.write(wni.util.packb(to_write))
elif file_format == 'hdf5':
dump_hdf(to_write, f)
elif file_format == 'matlab':
scipy.io.savemat(f, to_write)
else:
raise NotImplementedError("Unknown format {}".format(file_format))
[docs]def write_ppi(f, ppi, file_format):
loops = {}
for i in range(len(ppi[0])):
as_dict = radials_to_dict(ppi, i)
loops['set{}'.format(i)] = as_dict
to_dump = {'data': loops}
if file_format == 'msgpack':
f.write(wni.util.packb(to_dump))
elif file_format == 'hdf5':
dump_hdf(to_dump, f)
elif file_format == 'matlab':
scipy.io.savemat(f, to_dump)
else:
raise NotImplementedError("Unknown format {}".format(file_format))
def _mat_to_dict(toplevel_data):
def _recurse(node, d):
if not isinstance(node, scipy.io.matlab.mio5_params.mat_struct):
# it's a leaf buddy
return node
for field in node._fieldnames:
d[field] = _recurse(getattr(node, field), {})
return d
return _recurse(toplevel_data, {})
[docs]def read_dumped_data(fname):
"""Reads a data file created by the script ``wni.scripts.dump_ppi``.
Returns a dict with two keys: ``"scan_info" and "data")``. ``scan_info``
is a nested dict, corresponding to all scan settings for the PPI. ``data``
is a dict of scan setting sets. The keys of ``data`` are "set0", "set1",
... "set[n]", for n == the number of configured scan settings. If the
radar is using a single prt and waveform, "set0" will likely be the only
set available.
"""
ext = os.path.splitext(fname)[1]
if ext == '.msgpack':
with open(fname, 'rb') as f:
unpacker = get_unpacker(f)
settings, data = list(unpacker)
settings.update(data)
return settings
elif ext == '.mat':
loaded = scipy.io.loadmat(fname, squeeze_me=True,
struct_as_record=False)
new_data = {
'settings': _mat_to_dict(loaded['settings']),
'data': _mat_to_dict(loaded['data'])
}
return new_data
elif ext == '.hdf5':
with h5py.File(fname) as f:
return load_hdf(f)