Source code for wni.radar

"""
A single object which coordinates all the various clients.
"""

import collections
import time

import wni.az_el
import wni.config as config
import wni.util
import wni.data_client
import wni.processes


# the default settings for the entire system.
# Here's the overview
# * 1 kHz prf
# * 10 µs pulse width, 5 MHz chirp
# * FPGA FIR filter is enabled, with low-pass filter taps
# * does not change the LO
# * Exports 1 scan set only
_scan_conf = {
    'delay': 0,
    'prt': 1000,
    'pulses': 50,
    'rx_delay': 0,
    'rx_length': 200,
    'next_set': 0,
    'add_jitter': False,
}
_channel_conf = {
    'num_sets': 1,
    'set0': _scan_conf,
    'fir_enable': True,
    'enable_packet_tagging': True,
    'scan_start_set': 0,
    'scheduler_mode': 'run_while_enabled',
    # set nloops, so if the user changes scheduler_mode they'll have
    # *something* for nloops
    'nloops': 1,
    # set up the low-pass filter in the FPGA
    'fir_fpga': {
        'type': 'bpf',
        'ntaps': 120,
        'start': 0,  # makes it a low-pass filter
        'stop': 8,  # we need up to 7.5 MHz, 8 will not cut any off hopefully
        'window': 'hamming',
    }
}

_wfm1 = {
    'type': 'chirp',
    # 300 samples is a 10 µs pulse (it's a 30 MHz clock)
    'nsamples': 300,
    'bandwidth': 5,
    'center': 5,
}
# wfm 2 needs a separate center frequency
_wfm2 = _wfm1.copy()
_wfm2['center'] = -5
DEFAULT_SETTINGS = {
    'scan_settings': {
        'ch*': _channel_conf,
        'ch1': {
            'set0': {
                'waveform': _wfm1,
                'fir_config': {'type': 'matched_filter'},
            }
        },
        'ch2': {
            'set0': {
                'waveform': _wfm2,
                'fir_config': {'type': 'matched_filter'},
            }
        },
    },
    'system_config': {
        'ch1_ref_cal': 0,
        'ch2_ref_cal': 1,
        'dump_to_disk': False,
        'filter_cpu': True,
        'do_range_correction': True,
        'processing_in_gui': True,
        'processing_interval': 1,
        'calc_reflectivity': 1,
        'calc_velocity': True,
        'calc_mag_R1': False,
        'output_iq': False,
        'doppler_processing': False,
        'output_psd': False,
        'decimation': 1,
        'plot_power': True,
        'plot_velocity': True,
        'plot_frequency': False,
        'plot_phase': False,
        'flip_velocity_sign': config.FLIP_VELOCITY_SIGN,
    },
    'transceiver': {
        'ch1_rx_rf_gain': 0,
        'ch2_rx_rf_gain': 0,
        'ch1_tx_attenuation': 89000,
        'ch2_tx_attenuation': 89000,
        'digital_loopback': False,
        'rf_loopback': False,
        'tx_rf_bandwidth': int(15e6),
        'rx_rf_bandwidth': int(15e6),
        'tx_sampling_freq': config.SAMPLE_CLK,
        'rx_rfdc_tracking': True,
        'rx_bbdc_tracking': True,
        'rx_quad_tracking': True,
        # turn the calibration stuff on by default
    },
}

del _scan_conf
del _channel_conf
del _wfm1
del _wfm2


[docs]class Radar: """ A single class to give access to all of the radar. Args: settings (dict): If settings is given, it must be a mapping, the keys of which are described in the :ref:`documentation <configuration-dict>`. Scans can be configured by providing a settings map to the :class:`~wni.radar.Radar` object. To set the scan configuration, call the :meth:`~wni.radar.Radar.apply_dict_config` method. To get the current settings, call :meth:`~wni.radar.Radar.settings_as_dict`. """ def __init__(self, settings=None, recv_timeout=10000): # set _stuff_to_close now, so that if initialization fails for whatever # reason, the __del__ method won't raise an exception self._nanoclients = () self._just_close = () self._closed = False self.ch1_moment_client = None self.ch2_moment_client = None self.scan_conf = wni.util.ScanConfClient(recv_timeout=recv_timeout) self.system_conf = wni.util.SystemConfigClient(recv_timeout=recv_timeout) self.transceiver = wni.util.AD9361Client(recv_timeout=recv_timeout) self.notifier = wni.util.Notifier() self.pedestal = wni.az_el.AzElController(recv_timeout=recv_timeout) self.ch1_moment_client = wni.data_client.DataClient(config.CH1_LOCAL_MOMENT_ADDR) self.ch2_moment_client = wni.data_client.DataClient(config.CH2_LOCAL_MOMENT_ADDR) self.scan_timer = wni.util.ScanTimerClient(recv_timeout=recv_timeout) self.ch1_iq_client = wni.data_client.DataClient(config.CH1_LOCAL_IQ_ADDR) self.ch2_iq_client = wni.data_client.DataClient(config.CH2_LOCAL_IQ_ADDR) self.ch1 = self.scan_conf.ch1 self.ch2 = self.scan_conf.ch2 self.ch = (self.ch1, self.ch2) self.position_indicator = wni.util.PositionIndicatorClient(recv_timeout=recv_timeout) self.settings_revision = 1 self._nanoclients = ( self.scan_conf, self.system_conf, self.transceiver, ) # objects that have a close() method that need to be called self._just_close = ( self.ch1_moment_client, self.ch2_moment_client, self.notifier, ) # process manager clients self.uzpm = wni.processes.NanoProcessManagerClient(config.UZ_PM_ADDR) self.pcpm = wni.processes.NanoProcessManagerClient(config.PC_PM_ADDR) if settings is not None: self.apply_dict_config(settings)
[docs] def apply_default_settings(self): """ Apply the default settings. This is more useful for development than actually using the system, or for getting the system into a known state. """ self.apply_dict_config(DEFAULT_SETTINGS)
[docs] def apply_dict_config(self, settings): """ Apply configuration of the radar from a dict. See the :ref:`dict configuration docs <configuration-dict>` for more info. """ if self.busy: # can't apply config while a scan is running, don't even try. # This is a *huge* problem if the sampling frequency of the ad9361 # is changed through the "transceiver" config key. raise RuntimeError('The radar is running a scan and cannot apply ' 'the configuration.') settings = settings.copy() settings.pop('time', None) settings.pop('rev', None) delegate_conf = { 'scan_settings': self.scan_conf, 'system_config': self.system_conf, 'transceiver': self.transceiver, } # Protect overwriting our own attributes. if 'system_conf' in settings: msg = 'Bad key "system_conf", use "system_config" instead' raise ValueError(msg) if 'scan_conf' in settings: msg = 'Bad key "scan_conf", use "scan_settings" instead' raise ValueError(msg) for name, client in delegate_conf.items(): conf = settings.pop(name, None) if conf is None: continue client.apply_dict_config(conf) for attr, value in settings.items(): if not hasattr(self, attr): msg = 'Bad configuration option "{}"' msg = msg.format(attr) raise ValueError(msg) setattr(self, attr, value)
def current_position(self): """ Return the current (az, el) tuple. """ return self.position_indicator.current_position() def current_az_speed(self): return self.position_indicator.current_az_speed()
[docs] def settings_as_dict(self): """ Return the current radar settings as a dict. For more information about the configuration dict, see :ref:`the docs <configuration-dict>` """ d = collections.OrderedDict([ ('system_config', self.system_conf.settings_as_dict()), ('scan_settings', self.scan_conf.settings_as_dict()), ('transceiver', self.transceiver.settings_as_dict()), ]) d['rev'] = self.settings_revision return d
def rx_only(self, channel=(0, 1)): """ Puts the specified channel(s) in receive-only mode. Disables transmit and whatnot. """ if isinstance(channel, int): channel = (channel,) for ch in channel: self.scan_conf.ch[ch].rx_only() @property def digital_loopback(self): """Get or set whether the radar is in digital loopback""" return self.transceiver.digital_loopback @digital_loopback.setter def digital_loopback(self, val): self.transceiver.digital_loopback = val @property def rf_loopback(self): """Get or set whether the radar is in RF loopback""" return self.transceiver.rf_loopback @rf_loopback.setter def rf_loopback(self, val): self.transceiver.rf_loopback = val def trigger(self, scan_time, wait=True, flush_old_data=True): """ Trigger a scan to run for ``scan_time`` seconds if wait == True, do not return to the caller until everything is finished. """ if self.busy: raise RuntimeError('Scan already running') self.notifier.flush_queue() if flush_old_data: # get the latest data, but throw it away self.ch1_moment_client.sync() self.ch2_moment_client.sync() self.ch1_iq_client.sync() self.ch2_iq_client.sync() self.scan_conf.scan_time = scan_time for ch in self.scan_conf.ch: errmsg = ch.scan_settings_errormessage if errmsg: raise ValueError(errmsg) self.scan_conf.trigger() if wait: # wait for start message, stop message # add five seconds to account for other respondents which may # respond before this one. if scan_time < 0: # timeout in 2 years scan_time = 60 * 60 * 365 * 2 self.notifier.wait_for_msg(timeout=scan_time + 5) self.notifier.wait_for_msg(timeout=scan_time + 5) else: self.notifier.wait_for_msg(timeout=5) self.notifier.sync_start() def _latest(self, dataclient, timeout): latest = dataclient.latest() deadline = time.monotonic() + timeout while not latest: if time.monotonic() > deadline: raise TimeoutError('Could not fetch more recent data') latest = dataclient.latest() time.sleep(0.05) return latest def ch1_latest_moment(self, timeout=1): """ Return the latest ch1 data. """ return self._latest(self.ch1_moment_client, timeout) def ch2_latest_moment(self, timeout=1): """ Return the latest ch2 data. """ return self._latest(self.ch2_moment_client, timeout) def latest_moment(self, timeout=1): """ Return a tuple of the latest (ch1, ch2) data """ return self.ch1_latest_moment(timeout), self.ch2_latest_moment(timeout) ch1_latest = ch1_latest_moment ch2_latest = ch2_latest_moment latest = latest_moment def ch1_latest_iq(self, timeout=1): """ Return the latest I/Q data """ return self._latest(self.ch1_iq_client, timeout) def ch2_latest_iq(self, timeout=1): """ Return the latest I/Q data """ return self._latest(self.ch2_iq_client, timeout) def latest_iq(self, timeout=1): """ Return a tuple of the latest (ch1, ch2) data """ return self.ch1_latest_iq(timeout), self.ch2_latest_iq(timeout) def trigger_and_wait(self, scan_time): """Trigger a scan and do not return until scan completion""" self.trigger(scan_time) @property def sampling_frequency(self): """Get or set the sampling frequency.""" return self.transceiver.tx_sampling_freq @sampling_frequency.setter def sampling_frequency(self, value): if self.busy: msg = 'Cannot set sampling frequency while radar is running' raise RuntimeError(msg) self.transceiver.tx_sampling_freq = value self.scan_conf.ch.fir_aresetn = 0 time.sleep(0.001) self.scan_conf.ch.fir_aresetn = 1 @property def center_frequency(self): """The center RF frequency of the radar.""" return self.transceiver.tx_frequency @center_frequency.setter def center_frequency(self, freq): if self.busy: msg = 'Cannot change center frequency while a scan is running.' raise RuntimeError(msg) self.transceiver.tx_frequency = freq self.transceiver.rx_frequency = freq
[docs] def do_vcp(self, settings, scan_time=-1, wait=True): """ Do a Volume Coverage Pattern (vcp). Args: settings: a dict, with all the same keys as accepted in apply_dict_config, with an extra required key: "vcp". The value of "vcp" must be a dict that scan_timer.do_vcp() understands. block: If True, do not return until the radar is finished with its scan. """ if not self.busy: self.notifier.flush_queue() settings = settings.copy() vcp_settings = settings.pop('vcp') scan_time = vcp_settings.get('scan_time', -1) self.apply_dict_config(settings) for ch in self.scan_conf.ch: errmsg = ch.scan_settings_errormessage if errmsg: raise ValueError(errmsg) self.scan_timer.do_vcp(vcp_settings) if wait: # wait for start message, stop message # add five seconds to account for other respondents which may # respond before this one. if scan_time < 0: # timeout in 2 years scan_time = 60 * 60 * 365 * 2 self.notifier.wait_for_msg(timeout=scan_time + 5) self.notifier.wait_for_msg(timeout=scan_time + 5)
@property def busy(self): """Indicates whether the radar is busy""" return self.scan_conf.busy def stop_scanning(self): """Stop a currently running scan.""" self.scan_conf.interrupt() loops = 0 while self.busy: time.sleep(0.01) loops += 1 if loops > 500: raise RuntimeError('Scan failed to stop as it should have done') def wait_for_scan_completion(self): self.scan_conf.wait_for_scan_completion() def close(self): """ Close our connection to the radar. """ if not self._closed: try: for closable in self._nanoclients: closable.close_nanoclient() for closable in self._just_close: closable.close() finally: self._closed = True def __enter__(self): return self def __exit__(self, *exc_info): self.close() def __del__(self): self.close()