Source code for wni.data

"""Classes and functions for working with WeatherNews data."""
from __future__ import print_function, absolute_import, division


from collections import namedtuple
import logging
import mmap
import struct

import numpy as np

import wni.config as config
import wni.scan_config
from wni.scan_config import clk_to_us
from wni.util import unpackb

logger = logging.getLogger(__name__)

# size of each range gate, in meters
GATE_SIZE = config.GATE_SIZE


AZEL_FORMAT = 'dd'
AZEL_SIZE = struct.calcsize(AZEL_FORMAT)


[docs]class PacketHeader(object): """Packet header data; includes info about scan setup.""" struct_format = '4sIQ12I' def __init__(self, data: bytes): """ Parse out header information in ``data``. Only the first 72 bytes of ``data`` will be read. """ format = self.struct_format size = struct.calcsize(format) if len(data) < size: msg = ('length of header was {}, should have been {}' .format(len(data), size)) raise ValueError(msg) self.bytes = h = data[:size] # header: clock, revision, clock counter, lots of other fields header = struct.unpack(format, h) self.header_id = header[0] if self.header_id != b'OMG!': msg = 'packet start should be OMG!, not {}'.format(self.header_id) raise ValueError(msg) (self.header_id, self.revision, self.clktime, self.tx_amp_delay, self.tx_amp_length, self.tx_delay, self.tx_length, self.rx_delay, self.rx_length, self.pulses_per_cpi, self.prt_interval, self.fir_length, self.current_pulse, self.fir_normalizer, self.fir_enable) = header # the length in bytes of the packet, excluding the header. # every 4 RX pulses pushes data to the FIFO, and each time 8 bytes # get pushed. So the number of bytes in the packet is # rx_length * 8 (bytes / pulse) * 4 (pulses / clk_cycle). Since # rx_length is in clock cycles, the units come out as bytes. nsamples_in = self.rx_length // 4 if self.fir_enable: if nsamples_in < self.fir_length: nsamples_out = 0 else: nsamples_out = (nsamples_in - self.fir_length) // 6 else: nsamples_out = nsamples_in self.data_length = nsamples_out * 4 def __repr__(self): return 'PacketHeader({})'.format(repr(self.bytes)) def __str__(self): """Return a string with lots of the pulse information.""" return 'PacketHeader for pulse {}/{}: tx_amp {}, {}; tx {}, {}; rx {}, {}; prt {}; clktime {}'.format( self.pulse_num, self.pulses, self.tx_amp_delay, self.tx_amp_length, self.tx_delay, self.tx_length, self.rx_delay, self.rx_length, self.prt, self.clktime, )
[docs]def parse_radials(data, nradials=1): """ Parse specified number of radials from ``data``. Args: data: the buffer of radar data. nradials: the number of radials to return. If the number of radials in the ``data`` set is less than nradials, an exception is raised. Returns: (tuple): tuple containing: **radials**: a list of Radials. **bytes_read**: bytes read from ``data``. """ header = PacketHeader(data) packet_length = header.data_length + header.HEADER_LENGTH found = 0 end_byte = len(data) packet_start = 0 radials, packets = [], [] while len(radials) < nradials: packet_end = packet_start + packet_length this_data = data[packet_start:packet_end] header = this_data[:4] if header == b'OMG!': packet = Packet(this_data) packets.append(packet) packet_start += packet_length found += 1 else: azel_bytes = data[packet_start:packet_start + AZEL_SIZE] az, el = struct.unpack(AZEL_FORMAT, azel_bytes) packet_start += AZEL_SIZE radials.append(Radial(packets, az, el)) if packet_start == end_byte: break return radials, packet_start
[docs]class IQFileHeader: """ Parse header data dumped by :func:`wni.processes.data_dumper`. """ def __init__(self, bytes): feed_bytes, = struct.unpack('I', bytes[:4]) header_data = bytes[4:feed_bytes + 4] scan_info = unpackb(header_data) # waveform stuff here wfm1 = scan_info['wfm1'] wfm2 = scan_info['wfm2'] self.scan_length = scan_info['scan_length'] self.waveforms = np.array([wfm1, wfm2]) self.start_time = scan_info['start_time'] self.bytes_consumed = feed_bytes + 4
[docs] @classmethod def fromfile(cls, fname): with open(fname, 'rb') as f: # 30 kB header_len, = struct.unpack('I', f.read(4)) f.seek(0) bytes = f.read(header_len + 4) return cls(bytes)
Radial = namedtuple('Radial', 'packets az el'.split())
[docs]class IQData(object): """ Parse data from the FPGA into a Python object. """ def __init__(self, radar_data, waveforms=None, scaninfo=()): """ Args: radar_data (bytes): Pulse data from FPGA waveforms (np.array or None): Tx waveforms scaninfo: Any metadata associated with the scan. """ self._raw_data = memoryview(radar_data) self.az = [] self.el = [] self.packets = [] read = self._extract_packets() self._bytes_consumed = read # Matched filters self.waveforms = waveforms self.i = np.zeros([2, len(self.packets), len(self.packets[0].i1)], dtype=np.int16) self.q = np.zeros(self.i.shape, dtype=np.int16) for idx, packet in enumerate(self.packets): self.i[0, idx, :] = packet.i1 self.i[1, idx, :] = packet.i2 self.q[0, idx, :] = packet.q1 self.q[1, idx, :] = packet.q2 self.numgates = self.i.shape[2] # set info for scan based on first header (it _should_ be the same for # every pulse, but the hardware does not enforce this) h = self.packets[0].header self.header = h self.scaninfo = scaninfo self.pulses = self.num_pulses # this is the delay between the tx start and rx start. _hardware_delay = h.rx_delay - h.tx_delay rx_delay = _hardware_delay - wni.scan_config._RX_LAG start_gate = (rx_delay // 4) * GATE_SIZE if h.fir_enable: gate_size = GATE_SIZE * config.FIR_DECIMATION else: gate_size = GATE_SIZE end_gate = start_gate + (self.numgates - 1) * gate_size self.gates = np.linspace(start_gate, end_gate, self.numgates) self._iq = None self._range_correction = None def _calc_filtered_iq(self, channel): filtered = self._iq_filtered_conv(channel) return filtered
[docs] def savemat(self, fname): """Dumps iq (unfiltered) data to a format readable by Matlab.""" import scipy.io data = { 'iq': self.i + 1j*self.q, 'num_pulses': self.num_pulses, 'prt': self.header.prt, 'gates': self.gates, 'az': self.az, 'el': self.el, 'header': [p.header for p in self.packets], 'scaninfo': self.scaninfo, } scipy.io.savemat(fname, data)
def _extract_packets(self): radials, offset = parse_radials(self._raw_data) for radial in radials: self.packets.extend(radial.packets) self.az.append(radial.az) self.el.append(radial.el) self.num_pulses = len(self.packets) return offset def _az_to_degrees(self, az): return (az % 8000) * 360 / 8000 def _el_to_degrees(self, el): return el % 8000 @property def iq(self): if self._iq is None: self._iq = self.i + 1j*self.q return self._iq def _iq_filtered_fft(self, channel): """Implement matched filter using fft.""" if channel != 0 and channel != 1: raise ValueError('channel must be 0 or 1, not {}'.format(channel)) iq = self.iq[channel] h = self.h[channel] nbins = nextpow2(iq.shape[1]) X = np.fft.fft(iq, nbins) H = np.fft.fft(h, nbins) H_2 = np.matlib.repmat(H, iq.shape[0], 1) Y = X * H_2 y = np.fft.ifft(Y) start = h.shape[0] // 2 - 1 y = y[:, start: start + iq.shape[1]] y.real /= np.sum(np.abs(h.real)) y.imag /= np.sum(np.abs(h.imag)) return y def _iq_filtered_conv(self, channel): """Implement matched filter using convolution.""" if channel != 0 and channel != 1: raise ValueError('channel must be 0 or 1, not {}'.format(channel)) iq = self.iq[channel] h = self.h[channel] y = np.array([np.convolve(row, h, 'same') for row in iq]) y.real /= np.sum(np.abs(h.real)) y.imag /= np.sum(np.abs(h.imag)) return y def _calc_doppler_spectrum(self, iq, n, normalize): """Implementation of _doppler_spectrum_unfiltered and doppler_spectrum""" fft = np.fft.fft(iq, n, 0) shifted = np.fft.fftshift(fft, 0) magnitude = np.abs(shifted) magnitude = magnitude.astype('float32') if normalize: magnitude /= magnitude.max() return magnitude def _doppler_spectrum_unfiltered(self, n=32, normalize=False): """Return the Doppler spectrum of the unfiltered iq data.""" if n < len(self.packets): n = nextpow2(len(self.packets)) return [self._calc_doppler_spectrum(iq, n, normalize) for iq in self.iq]
[docs] def doppler_spectrum(self, n=32, normalize=False): """Return the Doppler spectrum of the data.""" if n < len(self.packets): n = nextpow2(len(self.packets)) return [self._calc_doppler_spectrum(iq, n, normalize) for iq in self.iq_filtered]
[docs] def doppler_spectrum_windowed(self, n=32, normalize=False): if n < len(self.packets): n = nextpow2(len(self.packets)) window = np.hanning(self.iq_filtered[0].shape[0]) windowed_iq = [(iq.T * window).T for iq in self.iq_filtered] return [self._calc_doppler_spectrum(iq, n, normalize) for iq in windowed_iq]
[docs] def calc_doppler_velocity(self, channel, wavelength=config.WAVELENGTH): """Calculate the doppler velocity""" # use eqn. 6.18 and 6.19 of "Doppler Weather and Radar Observations" iq = self.iq[channel, :, :] if False: iq0 = iq[:-1, :] iq1 = iq[1:, :] i0 = iq0.real q0 = iq0.imag i1 = iq1.real q1 = iq1.imag real_part = i0 * i1 + q0 * q1 imag_part = i0 * q1 - q0 * i1 r = np.sum(real_part, 0) i = np.sum(imag_part, 0) # divide by the number of pulses - 1 r /= (iq.shape[0] - 1) i /= (iq.shape[0] - 1) phase = np.arctan2(i, r) else: iq0 = iq[:-1, :] iq1 = iq[1:, :] rotations = iq0.conj() * iq1 total_rotation = np.mean(rotations, 0) phase = np.arctan2(total_rotation.imag, total_rotation.real) prt_us = clk_to_us(self.header.prt, config.TXRX_CLK) prt = prt_us / 1E6 factor = -wavelength / (4 * np.pi * prt) return phase * factor
[docs] def calc_reflectivity(self, channel, cal=0, range_correct=True): """Calculate the reflectity. """ iq = self.iq[channel] power = iq.real**2 + iq.imag**2 power_avg = np.mean(power, 0) # range correction if range_correct: power_avg *= self.gates**2 power_db = 10 * np.log10(power_avg) reflectivity = power_db + cal return reflectivity
[docs] @classmethod def fromfile(cls, filename, *args, **kwargs): with open(filename, 'rb') as f: data = f.read() return cls(data, filename, *args, **kwargs)
[docs]class IQDataReader: """ Iterate over a file dumped by :func:``wni.processees.data_dumper``. Yields a :class:IQData instance for every iteration. """ def __init__(self, fname, has_header=True): self.fname = fname # set self.map to None becuase __del__ gets called if the open() call # fails, and raises AttributeError on self.map. self.map = None self._file = open(fname, 'rb') self.map = mmap.mmap(self._file.fileno(), length=0, access=mmap.ACCESS_READ) if has_header: self.header = IQFileHeader.fromfile(fname) self.data_start = self.header.bytes_consumed else: self.header = None self.data_start = 0 self._current_offset = self.data_start self._read_size = int(self.__guess_radial_size() * 1.2) # a list of offsets at which radials can be parsed. self.radial_offsets = [] # list of radials' offsets that could not be parsed due to a malformed # packet. self._bad_radials = [] def __guess_radial_size(self): """Guess how much space (in bytes) a radial takes up.""" bytes = self.map[self.data_start:self.data_start + PacketHeader.HEADER_LENGTH] header = PacketHeader(bytes) length = (header.data_length + header.HEADER_LENGTH) * header.pulses return length
[docs] def parse_radial_at(self, pos=None): """ Parse radial at the specified offset in the memorymapped file. If pos is None, parse the radial at ``self._current_offset`` """ if pos is None: pos = self._current_offset bytes = self.map[pos:pos + self._read_size] iq_data = IQData(bytes, scaninfo=self.header) return iq_data
def __next__(self): try: # check to see if we're actually done with the file. if self._current_offset == len(self.map): raise StopIteration iqdata = self.parse_radial_at(self._current_offset) except ValueError as ve: if str(ve).startswith('packet start should be OMG!'): logger.warning('Malformed I/Q data! Skipping ahead.') self._bad_radials.append(self._current_offset) self.map.seek(self._current_offset) next_start = self.map.find(b'OMG!') if next_start == -1: raise StopIteration self._current_offset = next_start # recursive call here is dangerous if I haven't thought through # edge cases; it could a StackOverflow return self.__next__() else: raise self.radial_offsets.append(self._current_offset) self._current_offset += iqdata._bytes_consumed return iqdata def __iter__(self): return self
[docs] def close(self): if self.map is not None: self.map.close() self._file.close()
def __del__(self): self.close()
[docs]class Packet(object): """ Parse a single packet in the format it is in coming from the FPGA to the PS. A packet consists of a packet header and I/Q data. """ def __init__(self, packet_data): self.header = PacketHeader(packet_data) start = self.header.HEADER_LENGTH stop = start + self.header.data_length data = np.frombuffer(packet_data[start:stop], dtype='int16') self.i1 = data[::4] self.q1 = data[1::4] self.i2 = data[2::4] self.q2 = data[3::4] assert self.i1.size == self.i2.size == self.q1.size == self.q2.size def __str__(self): return str(self.header).replace('Header', '')
[docs]def nextpow2(x): """Return the next power of 2 greater than or equal to x.""" return int(np.power(2, np.ceil(np.log(x) / np.log(2))))
assert nextpow2(2) == 2 assert nextpow2(3) == 4 assert nextpow2(255) == 256 assert nextpow2(257) == 512