"""
A single object which coordinates all the various clients.
"""
import collections
import time
import wni.az_el
import wni.config as config
import wni.util
import wni.data_client
import wni.processes
# the default settings for the entire system.
# Here's the overview
# * 1 kHz prf
# * 10 µs pulse width, 5 MHz chirp
# * FPGA FIR filter is enabled, with low-pass filter taps
# * does not change the LO
# * Exports 1 scan set only
_scan_conf = {
'delay': 0,
'prt': 1000,
'pulses': 50,
'rx_delay': 0,
'rx_length': 200,
'next_set': 0,
'add_jitter': False,
}
_channel_conf = {
'num_sets': 1,
'set0': _scan_conf,
'fir_enable': True,
'enable_packet_tagging': True,
'scan_start_set': 0,
'scheduler_mode': 'run_while_enabled',
# set nloops, so if the user changes scheduler_mode they'll have
# *something* for nloops
'nloops': 1,
# set up the low-pass filter in the FPGA
'fir_fpga': {
'type': 'bpf',
'ntaps': 120,
'start': 0, # makes it a low-pass filter
'stop': 8, # we need up to 7.5 MHz, 8 will not cut any off hopefully
'window': 'hamming',
}
}
_wfm1 = {
'type': 'chirp',
# 300 samples is a 10 µs pulse (it's a 30 MHz clock)
'nsamples': 300,
'bandwidth': 5,
'center': 5,
}
# wfm 2 needs a separate center frequency
_wfm2 = _wfm1.copy()
_wfm2['center'] = -5
DEFAULT_SETTINGS = {
'scan_settings': {
'ch*': _channel_conf,
'ch1': {
'set0': {
'waveform': _wfm1,
'fir_config': {'type': 'matched_filter'},
}
},
'ch2': {
'set0': {
'waveform': _wfm2,
'fir_config': {'type': 'matched_filter'},
}
},
},
'system_config': {
'ch1_ref_cal': 0,
'ch2_ref_cal': 1,
'dump_to_disk': False,
'filter_cpu': True,
'do_range_correction': True,
'processing_in_gui': True,
'processing_interval': 1,
'calc_reflectivity': 1,
'calc_velocity': True,
'calc_mag_R1': False,
'output_iq': False,
'doppler_processing': False,
'output_psd': False,
'decimation': 1,
'plot_power': True,
'plot_velocity': True,
'plot_frequency': False,
'plot_phase': False,
'flip_velocity_sign': config.FLIP_VELOCITY_SIGN,
},
'transceiver': {
'ch1_rx_rf_gain': 0,
'ch2_rx_rf_gain': 0,
'ch1_tx_attenuation': 89000,
'ch2_tx_attenuation': 89000,
'digital_loopback': False,
'rf_loopback': False,
'tx_rf_bandwidth': int(15e6),
'rx_rf_bandwidth': int(15e6),
'tx_sampling_freq': config.SAMPLE_CLK,
'rx_rfdc_tracking': True,
'rx_bbdc_tracking': True,
'rx_quad_tracking': True,
# turn the calibration stuff on by default
},
}
del _scan_conf
del _channel_conf
del _wfm1
del _wfm2
[docs]class Radar:
"""
A single class to give access to all of the radar.
Args:
settings (dict): If settings is given, it must be a mapping, the
keys of which are described in the :ref:`documentation
<configuration-dict>`.
Scans can be configured by providing a settings map to the
:class:`~wni.radar.Radar` object. To set the scan configuration, call the
:meth:`~wni.radar.Radar.apply_dict_config` method. To get the current
settings, call :meth:`~wni.radar.Radar.settings_as_dict`.
"""
def __init__(self, settings=None, recv_timeout=10000):
# set _stuff_to_close now, so that if initialization fails for whatever
# reason, the __del__ method won't raise an exception
self._nanoclients = ()
self._just_close = ()
self._closed = False
self.ch1_moment_client = None
self.ch2_moment_client = None
self.scan_conf = wni.util.ScanConfClient(recv_timeout=recv_timeout)
self.system_conf = wni.util.SystemConfigClient(recv_timeout=recv_timeout)
self.transceiver = wni.util.AD9361Client(recv_timeout=recv_timeout)
self.notifier = wni.util.Notifier()
self.pedestal = wni.az_el.AzElController(recv_timeout=recv_timeout)
self.ch1_moment_client = wni.data_client.DataClient(config.CH1_LOCAL_MOMENT_ADDR)
self.ch2_moment_client = wni.data_client.DataClient(config.CH2_LOCAL_MOMENT_ADDR)
self.scan_timer = wni.util.ScanTimerClient(recv_timeout=recv_timeout)
self.ch1_iq_client = wni.data_client.DataClient(config.CH1_LOCAL_IQ_ADDR)
self.ch2_iq_client = wni.data_client.DataClient(config.CH2_LOCAL_IQ_ADDR)
self.ch1 = self.scan_conf.ch1
self.ch2 = self.scan_conf.ch2
self.ch = (self.ch1, self.ch2)
self.position_indicator = wni.util.PositionIndicatorClient(recv_timeout=recv_timeout)
self.settings_revision = 1
self._nanoclients = (
self.scan_conf,
self.system_conf,
self.transceiver,
)
# objects that have a close() method that need to be called
self._just_close = (
self.ch1_moment_client,
self.ch2_moment_client,
self.notifier,
)
# process manager clients
self.uzpm = wni.processes.NanoProcessManagerClient(config.UZ_PM_ADDR)
self.pcpm = wni.processes.NanoProcessManagerClient(config.PC_PM_ADDR)
if settings is not None:
self.apply_dict_config(settings)
[docs] def apply_default_settings(self):
"""
Apply the default settings.
This is more useful for development than actually using the system, or
for getting the system into a known state.
"""
self.apply_dict_config(DEFAULT_SETTINGS)
[docs] def apply_dict_config(self, settings):
"""
Apply configuration of the radar from a dict. See the :ref:`dict
configuration docs <configuration-dict>` for more info.
"""
if self.busy:
# can't apply config while a scan is running, don't even try.
# This is a *huge* problem if the sampling frequency of the ad9361
# is changed through the "transceiver" config key.
raise RuntimeError('The radar is running a scan and cannot apply '
'the configuration.')
settings = settings.copy()
settings.pop('time', None)
settings.pop('rev', None)
delegate_conf = {
'scan_settings': self.scan_conf,
'system_config': self.system_conf,
'transceiver': self.transceiver,
}
# Protect overwriting our own attributes.
if 'system_conf' in settings:
msg = 'Bad key "system_conf", use "system_config" instead'
raise ValueError(msg)
if 'scan_conf' in settings:
msg = 'Bad key "scan_conf", use "scan_settings" instead'
raise ValueError(msg)
for name, client in delegate_conf.items():
conf = settings.pop(name, None)
if conf is None:
continue
client.apply_dict_config(conf)
for attr, value in settings.items():
if not hasattr(self, attr):
msg = 'Bad configuration option "{}"'
msg = msg.format(attr)
raise ValueError(msg)
setattr(self, attr, value)
def current_position(self):
"""
Return the current (az, el) tuple.
"""
return self.position_indicator.current_position()
def current_az_speed(self):
return self.position_indicator.current_az_speed()
[docs] def settings_as_dict(self):
"""
Return the current radar settings as a dict. For more information
about the configuration dict, see :ref:`the docs <configuration-dict>`
"""
d = collections.OrderedDict([
('system_config', self.system_conf.settings_as_dict()),
('scan_settings', self.scan_conf.settings_as_dict()),
('transceiver', self.transceiver.settings_as_dict()),
])
d['rev'] = self.settings_revision
return d
def rx_only(self, channel=(0, 1)):
"""
Puts the specified channel(s) in receive-only mode. Disables transmit
and whatnot.
"""
if isinstance(channel, int):
channel = (channel,)
for ch in channel:
self.scan_conf.ch[ch].rx_only()
@property
def digital_loopback(self):
"""Get or set whether the radar is in digital loopback"""
return self.transceiver.digital_loopback
@digital_loopback.setter
def digital_loopback(self, val):
self.transceiver.digital_loopback = val
@property
def rf_loopback(self):
"""Get or set whether the radar is in RF loopback"""
return self.transceiver.rf_loopback
@rf_loopback.setter
def rf_loopback(self, val):
self.transceiver.rf_loopback = val
def trigger(self, scan_time, wait=True, flush_old_data=True):
"""
Trigger a scan to run for ``scan_time`` seconds
if wait == True, do not return to the caller until everything is
finished.
"""
if self.busy:
raise RuntimeError('Scan already running')
self.notifier.flush_queue()
if flush_old_data:
# get the latest data, but throw it away
self.ch1_moment_client.sync()
self.ch2_moment_client.sync()
self.ch1_iq_client.sync()
self.ch2_iq_client.sync()
self.scan_conf.scan_time = scan_time
for ch in self.scan_conf.ch:
errmsg = ch.scan_settings_errormessage
if errmsg:
raise ValueError(errmsg)
self.scan_conf.trigger()
if wait:
# wait for start message, stop message
# add five seconds to account for other respondents which may
# respond before this one.
if scan_time < 0:
# timeout in 2 years
scan_time = 60 * 60 * 365 * 2
self.notifier.wait_for_msg(timeout=scan_time + 5)
self.notifier.wait_for_msg(timeout=scan_time + 5)
else:
self.notifier.wait_for_msg(timeout=5)
self.notifier.sync_start()
def _latest(self, dataclient, timeout):
latest = dataclient.latest()
deadline = time.monotonic() + timeout
while not latest:
if time.monotonic() > deadline:
raise TimeoutError('Could not fetch more recent data')
latest = dataclient.latest()
time.sleep(0.05)
return latest
def ch1_latest_moment(self, timeout=1):
"""
Return the latest ch1 data.
"""
return self._latest(self.ch1_moment_client, timeout)
def ch2_latest_moment(self, timeout=1):
"""
Return the latest ch2 data.
"""
return self._latest(self.ch2_moment_client, timeout)
def latest_moment(self, timeout=1):
"""
Return a tuple of the latest (ch1, ch2) data
"""
return self.ch1_latest_moment(timeout), self.ch2_latest_moment(timeout)
ch1_latest = ch1_latest_moment
ch2_latest = ch2_latest_moment
latest = latest_moment
def ch1_latest_iq(self, timeout=1):
"""
Return the latest I/Q data
"""
return self._latest(self.ch1_iq_client, timeout)
def ch2_latest_iq(self, timeout=1):
"""
Return the latest I/Q data
"""
return self._latest(self.ch2_iq_client, timeout)
def latest_iq(self, timeout=1):
"""
Return a tuple of the latest (ch1, ch2) data
"""
return self.ch1_latest_iq(timeout), self.ch2_latest_iq(timeout)
def trigger_and_wait(self, scan_time):
"""Trigger a scan and do not return until scan completion"""
self.trigger(scan_time)
@property
def sampling_frequency(self):
"""Get or set the sampling frequency."""
return self.transceiver.tx_sampling_freq
@sampling_frequency.setter
def sampling_frequency(self, value):
if self.busy:
msg = 'Cannot set sampling frequency while radar is running'
raise RuntimeError(msg)
self.transceiver.tx_sampling_freq = value
self.scan_conf.ch.fir_aresetn = 0
time.sleep(0.001)
self.scan_conf.ch.fir_aresetn = 1
@property
def center_frequency(self):
"""The center RF frequency of the radar."""
return self.transceiver.tx_frequency
@center_frequency.setter
def center_frequency(self, freq):
if self.busy:
msg = 'Cannot change center frequency while a scan is running.'
raise RuntimeError(msg)
self.transceiver.tx_frequency = freq
self.transceiver.rx_frequency = freq
[docs] def do_vcp(self, settings, scan_time=-1, wait=True):
"""
Do a Volume Coverage Pattern (vcp).
Args:
settings: a dict, with all the same keys as accepted in
apply_dict_config, with an extra required key: "vcp". The value
of "vcp" must be a dict that scan_timer.do_vcp() understands.
block: If True, do not return until the radar is finished with its
scan.
"""
if not self.busy:
self.notifier.flush_queue()
settings = settings.copy()
vcp_settings = settings.pop('vcp')
scan_time = vcp_settings.get('scan_time', -1)
self.apply_dict_config(settings)
for ch in self.scan_conf.ch:
errmsg = ch.scan_settings_errormessage
if errmsg:
raise ValueError(errmsg)
self.scan_timer.do_vcp(vcp_settings)
if wait:
# wait for start message, stop message
# add five seconds to account for other respondents which may
# respond before this one.
if scan_time < 0:
# timeout in 2 years
scan_time = 60 * 60 * 365 * 2
self.notifier.wait_for_msg(timeout=scan_time + 5)
self.notifier.wait_for_msg(timeout=scan_time + 5)
@property
def busy(self):
"""Indicates whether the radar is busy"""
return self.scan_conf.busy
def stop_scanning(self):
"""Stop a currently running scan."""
self.scan_conf.interrupt()
loops = 0
while self.busy:
time.sleep(0.01)
loops += 1
if loops > 500:
raise RuntimeError('Scan failed to stop as it should have done')
def wait_for_scan_completion(self):
self.scan_conf.wait_for_scan_completion()
def close(self):
"""
Close our connection to the radar.
"""
if not self._closed:
try:
for closable in self._nanoclients:
closable.close_nanoclient()
for closable in self._just_close:
closable.close()
finally:
self._closed = True
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def __del__(self):
self.close()