Source code for wni.scan_config

"""
Set up scan configuration.  The hierarchy presented here reflects the hierarchy
in the FPGA and the kernel driver.

* ScanConf:  This is the top-level object. Configures and reads top-level
  settings from the FPGA.  Contains reference to the two channels of the AD9361
  as the list ``ch``, or individually as ``ch1`` and ``ch2``.

* ChannelConfig: Per-channel configuration settings.  Each channel contains a
  list of ``ScanConfSet`` called ``sets``.  Many settings can be set on the
  ``sets``, but a few are per-channel: ``fir_enable``

"""

from collections import OrderedDict
import glob
import logging
import math
import os
import re
import time

import nanomsg
import numpy as np

import wni.config as config
from wni.util import ScanTimerClient, nested_dict_merged
import wni.waveforms
import wni.fir_coef as fir_coef
from wni.signal_util import check_normalized


logger = logging.getLogger(__name__)
c = 3E8

# determined emperically that _RX_LAG should be 154 clk cycles (1.283 us)
_RX_LAG = 154 / 120
# set to ensure that Amp stays high while waveform is active.
_TX_AMP_EXTRA = 1.25


[docs]def us_to_clk(us, clock_freq=config.TXRX_CLK): """Convert microseconds to number of clock cycles.""" return int(math.ceil(us * clock_freq / 1E6))
[docs]def clk_to_us(cycles, clock_freq=config.TXRX_CLK): """Convert clk cycles to number of microseconds.""" return 1E6 * cycles / clock_freq
[docs]class SysfsBinAttr: """Return Return a binary sysfs attribute. gets/sets with raw bytes.""" def __init__(self, path): """ Args: path (str): path to the sysfs attribute """ self._dir = os.path.dirname(path) self._basename = os.path.basename(path)
[docs] def get_path(self, obj): """ Build the sysfs path """ d = self._dir base = self._basename subdir = '' if hasattr(obj, 'subdir'): subdir = obj.subdir path = os.path.join(d, subdir, base) return path
def __get__(self, obj, type): path = self.get_path(obj) try: with open(path, 'rb') as f: return f.read() except FileNotFoundError as fnfe: raise AttributeError() from fnfe except OSError as oe: msg = 'could not read "{}"'.format(path) raise OSError(msg) from oe def __set__(self, obj, value): path = self.get_path(obj) try: with open(path, 'wb') as f: f.write(value) except OSError as oe: msg = 'Could not write {} to "{}"' msg = msg.format(value, path) raise ValueError(msg) from oe
[docs]class IntAttr(SysfsBinAttr): """Get/set sysfs attribute as an int""" def __init__(self, path): super().__init__(path) def __get__(self, obj, type): bytes = super().__get__(obj, type) v = int(bytes) return v def __set__(self, obj, value): to_write = b'%d' % (value) super().__set__(obj, to_write)
[docs]class UsAttr(IntAttr): """ Get and set sysfs files in a microseconds way """ def __init__(self, path, offset=0, clamp=True): """ Adds an offset argument, to account for delays in a system. (So a tx_delay of 0 and an rx_delay of zero means the first sample you receive see is actually the first tx sample) offset is provided in microseconds. if clamp == True, then values < 0 get truncated to 0. """ super().__init__(path) self.offset = offset self.clamp = clamp def __get__(self, obj, type): val = super().__get__(obj, type) us = clk_to_us(val) # we don't need to return the full precision of the float value; # really, we want to ensure that values will "round trip", such that # the following code will always work: # # x.value = 1 # assert x.value == 1 # # We'll do our best effort to make that work. The microseconds can # only be set in increments of 1 / 120, which is a little over three # decimal places of precision. Round to 4 decimal places. ret = us - self.offset return round(ret, 4) def __set__(self, obj, value): v = value + self.offset if v < 0 and self.clamp: v = 0 clk = us_to_clk(v) super().__set__(obj, clk)
[docs]class BoolAttr(IntAttr): """Sysfs file that represents a Boolean as a 0 or 1""" def __get__(self, obj, type): int = super().__get__(obj, type) return bool(int) def __set__(self, obj, value): super().__set__(obj, int(value))
[docs]class StrAttr(SysfsBinAttr): """Decodes / encodes sysfs files as utf-8""" def __get__(self, obj, type): return super().__get__(obj, type).decode().strip() def __set__(self, obj, value): super().__set__(obj, value.encode())
[docs]def p(fname, root=config.WNI_SYSFS): """Return the path to a WNI sysfs file""" if root is None: root = './' return os.path.join(root, fname)
[docs]def iq_to_tx_amp_len(iq): return len(iq) * config.TICKS_PER_SAMPLE + 152
[docs]class ScanConfigSet: """ A single set of scan settings for a single channel. The following attributes have units of microseconds. The scan config set also has attributes with the same name but with the suffix ``_clk``, which has units of clock cycles. Attributes: * delay: the delay at the start of the cpi * tx_amp_delay: microseconds to delay transmit amplifier trigger after prt trigger goes high * tx_amp_length: number of microseconds that the amplfiier trigger is high * tx_delay: microseconds to delay tx waveform trigger after prt trigger goes high * tx_length: number of microseconds the waveform trigger is high * rx_delay: microseconds to delay receive window after prt trigger goes high * rx_length: number of microseconds to count rx data as valid * prt: the pulse repetition time, in microseconds The following attributes are also on each scan set: Attributes * pulses: number of pulses in the scan set * add_jitter: Whether to add a random delay at the end of a CPI * last: whether this is the last scan configuration option in the set * next_set: the index of the next scan set * wfm_addr: waveform start address in block RAM in FPGA * pc_fir_taps: Filter taps to be applied on the processing computer for this scan set's rx data. """ delay = UsAttr(p('delay')) tx_amp_delay = UsAttr(p('tx_amp_delay')) tx_amp_length = UsAttr(p('tx_amp_length'), offset=_TX_AMP_EXTRA) tx_delay = UsAttr(p('tx_delay')) tx_length = UsAttr(p('tx_length')) rx_delay = UsAttr(p('rx_delay'), offset=_RX_LAG) rx_length = UsAttr(p('rx_length')) prt = UsAttr(p('prt')) delay_clk = IntAttr(p('delay')) tx_amp_delay_clk = IntAttr(p('tx_amp_delay')) tx_amp_length_clk = IntAttr(p('tx_amp_length')) tx_delay_clk = IntAttr(p('tx_delay')) tx_length_clk = IntAttr(p('tx_length')) rx_delay_clk = IntAttr(p('rx_delay')) rx_length_clk = IntAttr(p('rx_length')) prt_clk = IntAttr(p('prt')) scan_set_id = IntAttr(p('scan_set_id')) pulses = IntAttr(p('pulses')) add_jitter = BoolAttr(p('add_jitter')) last = BoolAttr(p('last')) next_set = IntAttr(p('next_set')) wfm_addr = IntAttr(p('wfm_addr')) def __init__(self, channel, set_id): # have to go through object.__setattr__ to avoid AttributeError in this # class's __setattr__ self.channel = channel self.subdir = 'ch{}/set{}'.format(channel.id, set_id) def _set_tx_lengths(self, iq): """ Set the length of the tx triggers based on iq samples """ amp_length = iq_to_tx_amp_len(iq) if len(iq) != 0: self.tx_amp_length_clk = amp_length self.tx_length_clk = 1 else: self.tx_amp_length_clk = 0 self.tx_length_clk = 0
[docs] def wfm_set_iq(self, iq, normalized=None): self.wfm.set_iq(iq, normalized) self._set_tx_lengths(iq) return iq
[docs] def wfm_no_tx(self): self.wfm_set_iq([]) self._set_tx_lengths([]) return np.asarray([], dtype='complex64')
[docs] def wfm_chirp(self, nsamples, center, bw, window='boxcar', scale=1, clk_freq=config.SAMPLE_CLK / 1E6): chirp = self.wfm.chirp(nsamples, center, bw, window, scale, clk_freq) self._set_tx_lengths(chirp.iq) return chirp
[docs] def wfm_get_iq(self, normalized=True): return self.wfm.get_iq(normalized)
@property def rx_samples(self): """ Return and set the number of samples to receive. This is the number of I/Q samples that will get DMA'd from the FPGA to the processor. """ rx_length = self.rx_length_clk if rx_length % 4 != 0: logger.warning('rx_length in clock cycles is not a multiple of 4') nsamples_in = rx_length // 4 fir_length = self.channel.fir_len if self.channel.fir_enable: if nsamples_in < fir_length: nsamples_out = 0 else: nsamples_out = (nsamples_in - fir_length) // 6 else: nsamples_out = nsamples_in return nsamples_out @rx_samples.setter def rx_samples(self, val): if val < 0: raise ValueError('Cannot receive less than 0 samples') elif val == 0: self.rx_length_clk = 0 if self.channel.fir_enable: fir_len = self.channel.fir_len nsamples_in = val * 6 + fir_len rx_len = nsamples_in * 4 self.rx_length_clk = rx_len else: self.rx_length_clk = val * 4 @property def wfm(self): try: return self.channel._waveforms[self.scan_set_id] except KeyError: wfm_file = '/dev/ch{}_wfm{}' wfm_file = wfm_file.format(self.channel.id, self.scan_set_id) wfm = wni.waveforms.Waveform(wfm_file) self.channel._waveforms[self.scan_set_id] = wfm return wfm @property def pc_fir_taps(self): """ Set the FIR taps that will be used by the data processor to do additional filtering of the data. """ # see comment in ChannelConfig class for why this is accessed this way. taps = self.channel._pc_fir_taps.get(self.scan_set_id, None) return taps @pc_fir_taps.setter def pc_fir_taps(self, taps): key = self.scan_set_id if taps is None: self.channel._pc_fir_taps[key] = None return taps = np.asarray(taps) taps = taps.astype('complex64') self.channel._pc_fir_taps[key] = taps
[docs] def settings_as_dict(self): """ Return scan settings as a dict """ settings = {} attrs = ( 'add_jitter', 'delay', 'last', 'next_set', 'prt', 'pulses', 'rx_delay', 'rx_length', 'rx_samples', 'tx_amp_delay', 'tx_amp_length', 'tx_delay', 'tx_length', 'wfm_addr', 'pc_fir_taps', ) for attr in attrs: settings[attr] = getattr(self, attr) settings['waveform'] = self.wfm.settings_as_dict() return settings
def _apply_fir_config(self, settings): """ Configure the taps for the FPGA fir filter. This works at a higher level than setting the taps directly with ``pc_fir_taps``. It takes into account the current waveform if ``matched_filter`` is selected. """ settings = settings.copy() fir_type = settings.get('type', 'arb') if fir_type not in ('arb', 'bpf', 'matched_filter'): raise ValueError('bad type for fir_coef: "{}"'.format(fir_type)) if fir_type == 'arb': if 'taps' not in settings: raise ValueError('If you are setting arb coefficients, ' 'you must specify "taps"') self.pc_fir_taps = settings['taps'] elif fir_type == 'bpf': ntaps = settings.get('ntaps', 120) start = settings['start'] stop = settings['stop'] window = settings.get('window', 'hamming') taps = fir_coef.bpf(ntaps, start, stop, window) self.pc_fir_taps = taps elif fir_type == 'matched_filter': # this is probably what will be used most frequently decimation = 1 if self.channel.fir_enable: # assume it's a bandpass filter, since nothing else makes sense decimation = 6 tx_wfm = self.wfm_get_iq() taps = fir_coef.matched_filter_taps(tx_wfm) taps = taps[::decimation] self.pc_fir_taps = taps
[docs] def apply_dict_config(self, config): """ Apply configuration settings from a dict. """ config = config.copy() config.pop('last', None) # validate that all keys are attributes wfm_config = config.pop('waveform', None) if wfm_config: iq = self.wfm.apply_dict_config(wfm_config) self._set_tx_lengths(iq) fir_config = config.pop('fir_config', None) if fir_config is not None: self._apply_fir_config(fir_config) for k in config: if not hasattr(self, k): msg = 'Bad configuration for ScanConfigSet. No key "{}"' msg = msg.format(k) raise ValueError(msg) for attr, value in config.items(): setattr(self, attr, value)
[docs]class ScanSets: """ Getting/setting scan sets. Since the user can change the number of scan settings out from under us, we need to dynamically create them as needed. """ my_attrs = 'channel slices chan_dir'.split() def __init__(self, channel, slices=()): self.channel = channel self.chan_dir = self.channel.subdir # The user should never pass slices in, should only happen in # __getitem__ self.slices = slices def _get_sliced_paths(self): """ Returns sorted sysfs scan sets """ pattern = '{}/{}/set*'.format(config.WNI_SYSFS, self.chan_dir) files = glob.glob(pattern) def sorter(x): return int(x[x.rfind('t') + 1:]) # sort based on set index files.sort(key=sorter) for s in self.slices: files = files[s] return files def __getitem__(self, index): if isinstance(index, slice): new_slices = self.slices + (index,) return ScanSets(self.channel, slices=new_slices) files = self._get_sliced_paths() file = files[index] idx_str = file[file.rfind('t') + 1:] idx = int(idx_str) return ScanConfigSet(self.channel, idx) def __len__(self): files = self._get_sliced_paths() for s in self.slices: files = files[s] return len(files) def __getattr__(self, name): return [getattr(set, name) for set in self] def __setattr__(self, name, value): if name in self.my_attrs: super().__setattr__(name, value) else: for s in self: setattr(s, name, value)
[docs]class ChannelConfig: """ Holder of all channel configuration settings. Attributes: sets: ScanSets instance scan_start_set: Index (within sets) of the first scan set. fir_len: The number of taps of the FIR filter in the FPGA fir_enable: Whether the Rx data goes through the decimating FIR filter in the FPGA fir_aresetn: Active low reset to the FIR filters in the FPGA. num_sets: number of scan sets possible num_used_sets: number of scan sets used by the current scan setup bytes_read: Number of rx data bytes read by userspace bytes_written: Number of rx data bytes written by the FPGA overflow: Number of CPIs lost because userspace couldn't keep up scan_settings_errormessage: a string describing errors with the current scan setup scheduler_mode: the mode of operation of the scheduler. The scheduler can either do its sequence N times ('run_n') or it can run as long as the input trigger is high ('run_while_enabled') enable_packet_tagging: whether packet tagging is enabled. Don't change this fir_normalizer: the scaler applied to the Rx data in the FPGA nloops: Number of times to run the sequence, if scheduler_mode == 'run_n' samples_per_dma_transfer: Number of bytes transferred per DMA transfer """ # TODO: gotta actually instantiate one per set once we have more stuff fir_len = IntAttr(p('fir_len')) fir_enable = IntAttr(p('fir_enable')) fir_normalizer = IntAttr(p('fir_normalizer')) enable_packet_tagging = BoolAttr(p('enable_packet_tagging')) fir_aresetn = IntAttr(p('fir_aresetn')) num_sets = IntAttr(p('num_sets')) nloops = IntAttr(p('nloops')) scan_start_set = IntAttr(p('scan_start_set')) scheduler_mode = StrAttr(p('scheduler_mode')) samples_per_dma_transfer = IntAttr(p('samples_per_dma_transfer')) bytes_per_loop = IntAttr(p('bytes_per_loop')) bytes_read = IntAttr(p('bytes_read')) bytes_written = IntAttr(p('bytes_written')) overflow = IntAttr(p('overflow')) scan_settings_errormessage = StrAttr(p('scan_settings_errormessage')) _available_scheduler_modes = StrAttr(p('available_scheduler_modes')) num_used_sets = IntAttr(p('num_used_sets')) def __init__(self, scan_conf, channel_id): # dynamically get number of sets self.id = channel_id self.subdir = 'ch{}'.format(channel_id) self.channel_dir = '{}/{}'.format(config.WNI_SYSFS, self.subdir) self.sets = ScanSets(self) self.scan_conf = scan_conf self.fpga_fir = fir_coef.FPGAFIRCoef(self) # _pc_fir_taps is weird. Ideally, we would store pc_fir_taps on # ScanConfigSet, but since ScanConfigSet is instantiated whenever it is # accessed through the ScanSets class, it would blow away _pc_fir_taps. # So this dict exists to map {scan_set_id: fir info}. # P. S. pc_fir_taps is the fir filter taps for the processing computer. self._pc_fir_taps = {} # same story with _waveforms as _pc_fir_taps self._waveforms = {}
[docs] def rx_only(self): for set in self.sets: set.tx_length = 0 set.tx_amp_length_clk = 0
def _apply_set_config(self, config): """ Does the Right Thing™ whenever "set*" and "setN" are given as keys. Only applies the settings a single time, and uses nested_dict_merged to merge the dicts. """ # general idea here: figure out the sets we need to update the settings # of. If "set*" was provided, we have to update all the scan sets # settings. Otherwise, we *only* update the "set0", "set1" etc. # settings that were passed in. common_settings = config.pop('set*', {}) if common_settings: # we need to apply the common settings to *all* scan sets. to_set = range(self.num_sets) else: # get the individual scan configurations that are set now. to_set = [] regex = re.compile(r'set(\d+)') for k in config: match = re.match(regex, k) if match: # need to keep track of the indices set_index = int(match.groups()[0]) to_set.append(set_index) for set_index in to_set: set_name = 'set{}'.format(set_index) scan_config = config.pop(set_name, {}) scan_config = nested_dict_merged(common_settings, scan_config) self.sets[set_index].apply_dict_config(scan_config)
[docs] def apply_dict_config(self, config): """ Apply configuration to a channel from a dict. The keys are channel attributes, and the values are the values they will be set to. The keys matching the regular expression "set\\d+" are special. They configure scan settings. The key ``set*`` applies scan settings to all scan sets. Individual scan set settings will override ``set*`` settings. """ config = config.copy() # support round-tripping self.apply_dict_config(self.settings_as_dict()) for key in ['bytes_per_loop', 'bytes_read', 'bytes_written', 'num_used_sets', 'overflow', 'samples_per_dma_transfer']: config.pop(key, None) # set num_sets first so that when we apply scan settings, the correct # number of sets will already be populated num_sets = config.pop('num_sets', None) if num_sets: self.num_sets = num_sets # Also need to get the setting of fir_enable out of the way fir_enable = config.pop('fir_enable', None) if fir_enable is not None: self.fir_enable = fir_enable # Setting the FIR filter stuff can affect the rx_samples of the scan # sets, so we have to set it before setting set* stuff. fir_coef_settings = config.pop('fir_fpga', None) if fir_coef_settings is not None: self.fpga_fir.apply_dict_config(fir_coef_settings) # same thing with FIR length; if it gets overridden we need to set it # now. fir_len = config.pop('fir_len', None) if fir_len is not None: self.fir_len = fir_len # Apply the scan settings for set*, set0, set1, ..., setN. self._apply_set_config(config) for k in config: if not hasattr(self, k): msg = 'Bad configuration for ChannelConfig. No key "{}"' msg = msg.format(k) raise ValueError(msg) for attr, value in config.items(): setattr(self, attr, value)
@property def available_scheduler_modes(self): return self._available_scheduler_modes.split()
[docs] def settings_as_dict(self): """ Return channel settings as a dict """ settings = {} attrs = ( 'bytes_per_loop', 'bytes_read', 'bytes_written', 'enable_packet_tagging', 'fir_aresetn', 'fir_enable', 'fir_len', 'fir_normalizer', 'nloops', 'num_sets', 'num_used_sets', 'overflow', 'samples_per_dma_transfer', 'scan_start_set', 'scheduler_mode', ) for attr in attrs: settings[attr] = getattr(self, attr) for i, set in enumerate(self.sets): setname = 'set{}'.format(i) settings[setname] = set.settings_as_dict() fpga_fir_settings = self.fpga_fir.settings_as_dict() settings['fir_fpga'] = fpga_fir_settings return settings
[docs] def fir_set_bpf(self, numtaps=120, start=0, stop=3, window='hamming', nyq=config.NYQ_MHZ): """ Set the FPGA FIR filter to use a bandpass filter. If start == 0, a low-pass filter is created. If stop == 0, a high-pass filter is created. Otherwise, a band-pass filter is created. """ return self.fpga_fir.bpf(numtaps=numtaps, start=start, stop=stop, window=window, nyq=nyq)
[docs] def fir_set_matched_filter(self, tx_wfm): """ Based on the tx waveform, set the FPGA matched filter coefficients. Args: tx_wfm: complex numpy array of the transmit waveform. """ return self.fpga_fir.set_matched_filter_coef(tx_wfm)
[docs] def fir_get_coef(self, normalized=True): """ Return the complex array of FPGA coefficients Args: normalized: If True (the default), the coefficients are scaled between [-1, 1]. Otherwise, the Int16 representation is returned """ fir_coef = self.fpga_fir.fir_coef if normalized: fir_coef = fir_coef / config.INT_16_MAX return fir_coef
[docs] def fir_set_coef(self, coef, normalized=None): """ Set the FPGA FIR coefficients Args: coef: the complex FIR coefficients. normalized: If None (the default), the function will determine if ``coef`` is normalized or not based on the values of the array. If True, the values must be in the range of [-1, 1]. If False, all the numbers must be between [-INT16_MAX, INT_16_MAX]. """ coef = np.asarray(coef) if normalized is not None: normalized = check_normalized(coef) return self.fpga_fir.set_fir_coef(coef, normalized)
def __getattr__(self, name): """ Allow sets to be accessed by attribute name. So doing channel.sets[0] is equivalent to channel.set0. """ m = re.match(r'set(\d+)$', name) if m is None: raise AttributeError('No scan set "{}"'.format(name)) set_ind = int(m.groups()[0]) return self.sets[set_ind]
class _BroadcastList: """ Applies get/set operations to all elements of a list """ def __init__(self, lst): self.lst = lst def __getattr__(self, name): return [getattr(item, name) for item in self.lst] def __setattr__(self, name, value): if name == 'lst': super().__setattr__(name, value) return for s in self.lst: setattr(s, name, value)
[docs]class Channels: """ Broadcasts settings between channels """ my_attrs = '_ch1 _ch2 _ch scan_conf'.split() def __init__(self, scan_conf): self._ch1 = ChannelConfig(scan_conf, 1) self._ch2 = ChannelConfig(scan_conf, 2) self.scan_conf = scan_conf self._ch = [self._ch1, self._ch2] def __getitem__(self, index): if isinstance(index, slice): raise IndexError('Channels class does not support slicing') return self._ch[index] def __len__(self): return len(self._ch) def __getattr__(self, name): return [getattr(ch, name) for ch in self] @property def sets(self): """ Return a _BroadcastList which passes through all get/set operations """ sets = [self._ch1.sets, self._ch2.sets] return _BroadcastList(sets) def __setattr__(self, name, value): if name in self.my_attrs: super().__setattr__(name, value) else: for s in self: setattr(s, name, value)
[docs]class ScanConf(object): """Holds scan configuration and can initiate a software trigger Uses the sysfs interface. Writing the registers directly will make the kernel module become out of sync with the hardware, which would be bad. Do not write the registers directly. Attributes: ch: list of channels ch1: first channel (equivalent to ch[0]) ch2: second channel (equivalent to ch[1]) fir_config_mode: Whether the FPGA is configuring the FIR filters busy: Whether the system is busy. Usually indicates a running scan revision: version of FPGA code clk_counter: 64-bit timestamp from the FPGA """ fir_config_mode = BoolAttr(p('fir_config_mode')) busy = BoolAttr(p('busy')) revision = IntAttr(p('revision')) clk_counter = IntAttr(p('clk_counter')) max_tx_amp_length = UsAttr(p('max_tx_amp_length')) max_tx_amp_length_clk = IntAttr(p('max_tx_amp_length')) max_duty_cycle = IntAttr(p('max_duty_cycle')) jitter_min = UsAttr(p('jitter_min')) jitter_min_clk = IntAttr(p('jitter_min')) jitter_mask = IntAttr(p('jitter_mask')) min_prt = UsAttr(p('min_prt')) min_prt_clk = IntAttr(p('min_prt')) def __init__(self, sysfs_dir=config.WNI_SYSFS): """ sysfs: the path to the directory containing the sysfs files for the device. """ self.sysfs = sysfs_dir self.ch = Channels(self) self.ch1 = self.ch[0] self.ch2 = self.ch[1] self._interrupt_sock = nanomsg.Socket(nanomsg.REQ) # set the resend interval to the max allowed. This prevents the # request getting sent once a minute in perpetuity. self._interrupt_sock.set_int_option(nanomsg.REQ, nanomsg.REQ_RESEND_IVL, 2**31 - 1) self._interrupt_sock.connect(config.SCAN_INTERRUPT_ADDR) self._clear_buffer_path = os.path.join(sysfs_dir, 'clear_buffer') self.scan_timer = ScanTimerClient() # scan time in seconds (by default, really short) self.scan_time = 0.001
[docs] def trigger(self): """Start the software trigger; run for `self._scan_time` ms. This function immediately returns.""" if not self.ch1.scan_settings_errormessage \ and not self.ch2.scan_settings_errormessage: self.scan_timer.trigger(self.scan_time)
[docs] def interrupt(self): self._interrupt_sock.send_timeout = 20 self._interrupt_sock.recv_timeout = 20 try: self._interrupt_sock.send(b'interrupt') ret = self._interrupt_sock.recv() logger.info('Scan interrupted with message %s', ret.decode()) except nanomsg.NanoMsgAPIError: # You sang a sad song, but no one cried msg = "Attempted to stop a scan, but got no response." logger.warning(msg)
[docs] def wait_for_scan_completion(self): """ Wait for the scan to complete. """ while self.busy: # sleeping for 0.005 seconds results in ~ 0.7% cpu usage. time.sleep(0.005)
[docs] def clear_buffer(self): with open(self._clear_buffer_path, 'wb') as f: f.write(b'1')
[docs] def rx_only(self): for ch in self.ch: for set in ch.sets: set.tx_length = 0 set.tx_amp_length = 0
def __str__(self): return self.__class__.__name__
[docs] def settings_as_dict(self): """Return the scan settings as a nested dict. The returned dict will have the following form: .. code-block:: python returned_d = { 'clk_counter': 3131414141, 'fir_config_mode': False, 'busy': False, 'ch1': { 'fir_enable': True, 'scheduler_mode': "run_while_enabled", 'set0': { 'rx_length': 3, 'waveform': { 'iq': [[0.01 + 0.1j], [0.02 + 0.02j], ...], ...: ... }, 'set1': {...: ...}, }, 'ch2': {...: ...}, } """ settings = {} attrs = ('clk_counter', 'fir_config_mode', 'busy', 'revision') for attrname in attrs: settings[attrname] = getattr(self, attrname) settings['ch1'] = self.ch1.settings_as_dict() settings['ch2'] = self.ch2.settings_as_dict() return settings
[docs] def apply_dict_config(self, config): """ Apply scan settings from a dict. The keys are attributes on scan_conf. The values are the values to set to. There are three special keys: ch1, ch2, and ch*. The value for each of these keys is a set. ch1 configures channel 1. ch2 configures channel 2. ch* configures both channels. """ config = config.copy() # we want to support applying the dict config of the dict returned by # settings_as_dict, so we remove keys that are read-only for key in ['busy', 'fir_config_mode', 'clk_counter', 'revision']: config.pop(key, None) # apply settings to *both* channels first, so that individual settings # can be overwritten. both = config.pop('ch*', {}) ch1 = config.pop('ch1', {}) ch2 = config.pop('ch2', {}) ch1 = nested_dict_merged(both, ch1) ch2 = nested_dict_merged(both, ch2) self.ch1.apply_dict_config(ch1) self.ch2.apply_dict_config(ch2) for k in config: if not hasattr(self, k): msg = 'Bad configuration dict. No key {}' msg = msg.format(k) raise ValueError(msg) for attr, value in config.items(): setattr(self, attr, value)