"""
Set up scan configuration. The hierarchy presented here reflects the hierarchy
in the FPGA and the kernel driver.
* ScanConf: This is the top-level object. Configures and reads top-level
settings from the FPGA. Contains reference to the two channels of the AD9361
as the list ``ch``, or individually as ``ch1`` and ``ch2``.
* ChannelConfig: Per-channel configuration settings. Each channel contains a
list of ``ScanConfSet`` called ``sets``. Many settings can be set on the
``sets``, but a few are per-channel: ``fir_enable``
"""
from collections import OrderedDict
import glob
import logging
import math
import os
import re
import time
import nanomsg
import numpy as np
import wni.config as config
from wni.util import ScanTimerClient, nested_dict_merged
import wni.waveforms
import wni.fir_coef as fir_coef
from wni.signal_util import check_normalized
logger = logging.getLogger(__name__)
c = 3E8
# determined emperically that _RX_LAG should be 154 clk cycles (1.283 us)
_RX_LAG = 154 / 120
# set to ensure that Amp stays high while waveform is active.
_TX_AMP_EXTRA = 1.25
[docs]def us_to_clk(us, clock_freq=config.TXRX_CLK):
"""Convert microseconds to number of clock cycles."""
return int(math.ceil(us * clock_freq / 1E6))
[docs]def clk_to_us(cycles, clock_freq=config.TXRX_CLK):
"""Convert clk cycles to number of microseconds."""
return 1E6 * cycles / clock_freq
[docs]class SysfsBinAttr:
"""Return Return a binary sysfs attribute. gets/sets with raw bytes."""
def __init__(self, path):
"""
Args:
path (str): path to the sysfs attribute
"""
self._dir = os.path.dirname(path)
self._basename = os.path.basename(path)
[docs] def get_path(self, obj):
"""
Build the sysfs path
"""
d = self._dir
base = self._basename
subdir = ''
if hasattr(obj, 'subdir'):
subdir = obj.subdir
path = os.path.join(d, subdir, base)
return path
def __get__(self, obj, type):
path = self.get_path(obj)
try:
with open(path, 'rb') as f:
return f.read()
except FileNotFoundError as fnfe:
raise AttributeError() from fnfe
except OSError as oe:
msg = 'could not read "{}"'.format(path)
raise OSError(msg) from oe
def __set__(self, obj, value):
path = self.get_path(obj)
try:
with open(path, 'wb') as f:
f.write(value)
except OSError as oe:
msg = 'Could not write {} to "{}"'
msg = msg.format(value, path)
raise ValueError(msg) from oe
[docs]class IntAttr(SysfsBinAttr):
"""Get/set sysfs attribute as an int"""
def __init__(self, path):
super().__init__(path)
def __get__(self, obj, type):
bytes = super().__get__(obj, type)
v = int(bytes)
return v
def __set__(self, obj, value):
to_write = b'%d' % (value)
super().__set__(obj, to_write)
[docs]class UsAttr(IntAttr):
"""
Get and set sysfs files in a microseconds way
"""
def __init__(self, path, offset=0, clamp=True):
"""
Adds an offset argument, to account for delays in a system.
(So a tx_delay of 0 and an rx_delay of zero means the first sample you
receive see is actually the first tx sample)
offset is provided in microseconds.
if clamp == True, then values < 0 get truncated to 0.
"""
super().__init__(path)
self.offset = offset
self.clamp = clamp
def __get__(self, obj, type):
val = super().__get__(obj, type)
us = clk_to_us(val)
# we don't need to return the full precision of the float value;
# really, we want to ensure that values will "round trip", such that
# the following code will always work:
#
# x.value = 1
# assert x.value == 1
#
# We'll do our best effort to make that work. The microseconds can
# only be set in increments of 1 / 120, which is a little over three
# decimal places of precision. Round to 4 decimal places.
ret = us - self.offset
return round(ret, 4)
def __set__(self, obj, value):
v = value + self.offset
if v < 0 and self.clamp:
v = 0
clk = us_to_clk(v)
super().__set__(obj, clk)
[docs]class BoolAttr(IntAttr):
"""Sysfs file that represents a Boolean as a 0 or 1"""
def __get__(self, obj, type):
int = super().__get__(obj, type)
return bool(int)
def __set__(self, obj, value):
super().__set__(obj, int(value))
[docs]class StrAttr(SysfsBinAttr):
"""Decodes / encodes sysfs files as utf-8"""
def __get__(self, obj, type):
return super().__get__(obj, type).decode().strip()
def __set__(self, obj, value):
super().__set__(obj, value.encode())
[docs]def p(fname, root=config.WNI_SYSFS):
"""Return the path to a WNI sysfs file"""
if root is None:
root = './'
return os.path.join(root, fname)
[docs]def iq_to_tx_amp_len(iq):
return len(iq) * config.TICKS_PER_SAMPLE + 152
[docs]class ScanConfigSet:
"""
A single set of scan settings for a single channel.
The following attributes have units of microseconds. The scan config set
also has attributes with the same name but with the suffix ``_clk``, which
has units of clock cycles.
Attributes:
* delay: the delay at the start of the cpi
* tx_amp_delay: microseconds to delay transmit amplifier trigger after
prt trigger goes high
* tx_amp_length: number of microseconds that the amplfiier trigger is
high
* tx_delay: microseconds to delay tx waveform trigger after prt trigger
goes high
* tx_length: number of microseconds the waveform trigger is high
* rx_delay: microseconds to delay receive window after prt trigger goes
high
* rx_length: number of microseconds to count rx data as valid
* prt: the pulse repetition time, in microseconds
The following attributes are also on each scan set:
Attributes
* pulses: number of pulses in the scan set
* add_jitter: Whether to add a random delay at the end of a CPI
* last: whether this is the last scan configuration option in the set
* next_set: the index of the next scan set
* wfm_addr: waveform start address in block RAM in FPGA
* pc_fir_taps: Filter taps to be applied on the processing computer for
this scan set's rx data.
"""
delay = UsAttr(p('delay'))
tx_amp_delay = UsAttr(p('tx_amp_delay'))
tx_amp_length = UsAttr(p('tx_amp_length'), offset=_TX_AMP_EXTRA)
tx_delay = UsAttr(p('tx_delay'))
tx_length = UsAttr(p('tx_length'))
rx_delay = UsAttr(p('rx_delay'), offset=_RX_LAG)
rx_length = UsAttr(p('rx_length'))
prt = UsAttr(p('prt'))
delay_clk = IntAttr(p('delay'))
tx_amp_delay_clk = IntAttr(p('tx_amp_delay'))
tx_amp_length_clk = IntAttr(p('tx_amp_length'))
tx_delay_clk = IntAttr(p('tx_delay'))
tx_length_clk = IntAttr(p('tx_length'))
rx_delay_clk = IntAttr(p('rx_delay'))
rx_length_clk = IntAttr(p('rx_length'))
prt_clk = IntAttr(p('prt'))
scan_set_id = IntAttr(p('scan_set_id'))
pulses = IntAttr(p('pulses'))
add_jitter = BoolAttr(p('add_jitter'))
last = BoolAttr(p('last'))
next_set = IntAttr(p('next_set'))
wfm_addr = IntAttr(p('wfm_addr'))
def __init__(self, channel, set_id):
# have to go through object.__setattr__ to avoid AttributeError in this
# class's __setattr__
self.channel = channel
self.subdir = 'ch{}/set{}'.format(channel.id, set_id)
def _set_tx_lengths(self, iq):
"""
Set the length of the tx triggers based on iq samples
"""
amp_length = iq_to_tx_amp_len(iq)
if len(iq) != 0:
self.tx_amp_length_clk = amp_length
self.tx_length_clk = 1
else:
self.tx_amp_length_clk = 0
self.tx_length_clk = 0
[docs] def wfm_set_iq(self, iq, normalized=None):
self.wfm.set_iq(iq, normalized)
self._set_tx_lengths(iq)
return iq
[docs] def wfm_no_tx(self):
self.wfm_set_iq([])
self._set_tx_lengths([])
return np.asarray([], dtype='complex64')
[docs] def wfm_chirp(self, nsamples, center, bw, window='boxcar', scale=1,
clk_freq=config.SAMPLE_CLK / 1E6):
chirp = self.wfm.chirp(nsamples, center, bw, window, scale, clk_freq)
self._set_tx_lengths(chirp.iq)
return chirp
[docs] def wfm_get_iq(self, normalized=True):
return self.wfm.get_iq(normalized)
@property
def rx_samples(self):
"""
Return and set the number of samples to receive. This is the number of
I/Q samples that will get DMA'd from the FPGA to the processor.
"""
rx_length = self.rx_length_clk
if rx_length % 4 != 0:
logger.warning('rx_length in clock cycles is not a multiple of 4')
nsamples_in = rx_length // 4
fir_length = self.channel.fir_len
if self.channel.fir_enable:
if nsamples_in < fir_length:
nsamples_out = 0
else:
nsamples_out = (nsamples_in - fir_length) // 6
else:
nsamples_out = nsamples_in
return nsamples_out
@rx_samples.setter
def rx_samples(self, val):
if val < 0:
raise ValueError('Cannot receive less than 0 samples')
elif val == 0:
self.rx_length_clk = 0
if self.channel.fir_enable:
fir_len = self.channel.fir_len
nsamples_in = val * 6 + fir_len
rx_len = nsamples_in * 4
self.rx_length_clk = rx_len
else:
self.rx_length_clk = val * 4
@property
def wfm(self):
try:
return self.channel._waveforms[self.scan_set_id]
except KeyError:
wfm_file = '/dev/ch{}_wfm{}'
wfm_file = wfm_file.format(self.channel.id, self.scan_set_id)
wfm = wni.waveforms.Waveform(wfm_file)
self.channel._waveforms[self.scan_set_id] = wfm
return wfm
@property
def pc_fir_taps(self):
"""
Set the FIR taps that will be used by the data processor to do
additional filtering of the data.
"""
# see comment in ChannelConfig class for why this is accessed this way.
taps = self.channel._pc_fir_taps.get(self.scan_set_id, None)
return taps
@pc_fir_taps.setter
def pc_fir_taps(self, taps):
key = self.scan_set_id
if taps is None:
self.channel._pc_fir_taps[key] = None
return
taps = np.asarray(taps)
taps = taps.astype('complex64')
self.channel._pc_fir_taps[key] = taps
[docs] def settings_as_dict(self):
"""
Return scan settings as a dict
"""
settings = {}
attrs = (
'add_jitter',
'delay',
'last',
'next_set',
'prt',
'pulses',
'rx_delay',
'rx_length',
'rx_samples',
'tx_amp_delay',
'tx_amp_length',
'tx_delay',
'tx_length',
'wfm_addr',
'pc_fir_taps',
)
for attr in attrs:
settings[attr] = getattr(self, attr)
settings['waveform'] = self.wfm.settings_as_dict()
return settings
def _apply_fir_config(self, settings):
"""
Configure the taps for the FPGA fir filter. This works at a higher
level than setting the taps directly with ``pc_fir_taps``. It takes
into account the current waveform if ``matched_filter`` is selected.
"""
settings = settings.copy()
fir_type = settings.get('type', 'arb')
if fir_type not in ('arb', 'bpf', 'matched_filter'):
raise ValueError('bad type for fir_coef: "{}"'.format(fir_type))
if fir_type == 'arb':
if 'taps' not in settings:
raise ValueError('If you are setting arb coefficients, '
'you must specify "taps"')
self.pc_fir_taps = settings['taps']
elif fir_type == 'bpf':
ntaps = settings.get('ntaps', 120)
start = settings['start']
stop = settings['stop']
window = settings.get('window', 'hamming')
taps = fir_coef.bpf(ntaps, start, stop, window)
self.pc_fir_taps = taps
elif fir_type == 'matched_filter':
# this is probably what will be used most frequently
decimation = 1
if self.channel.fir_enable:
# assume it's a bandpass filter, since nothing else makes sense
decimation = 6
tx_wfm = self.wfm_get_iq()
taps = fir_coef.matched_filter_taps(tx_wfm)
taps = taps[::decimation]
self.pc_fir_taps = taps
[docs] def apply_dict_config(self, config):
"""
Apply configuration settings from a dict.
"""
config = config.copy()
config.pop('last', None)
# validate that all keys are attributes
wfm_config = config.pop('waveform', None)
if wfm_config:
iq = self.wfm.apply_dict_config(wfm_config)
self._set_tx_lengths(iq)
fir_config = config.pop('fir_config', None)
if fir_config is not None:
self._apply_fir_config(fir_config)
for k in config:
if not hasattr(self, k):
msg = 'Bad configuration for ScanConfigSet. No key "{}"'
msg = msg.format(k)
raise ValueError(msg)
for attr, value in config.items():
setattr(self, attr, value)
[docs]class ScanSets:
"""
Getting/setting scan sets. Since the user can change the number of scan
settings out from under us, we need to dynamically create them as needed.
"""
my_attrs = 'channel slices chan_dir'.split()
def __init__(self, channel, slices=()):
self.channel = channel
self.chan_dir = self.channel.subdir
# The user should never pass slices in, should only happen in
# __getitem__
self.slices = slices
def _get_sliced_paths(self):
"""
Returns sorted sysfs scan sets
"""
pattern = '{}/{}/set*'.format(config.WNI_SYSFS, self.chan_dir)
files = glob.glob(pattern)
def sorter(x):
return int(x[x.rfind('t') + 1:])
# sort based on set index
files.sort(key=sorter)
for s in self.slices:
files = files[s]
return files
def __getitem__(self, index):
if isinstance(index, slice):
new_slices = self.slices + (index,)
return ScanSets(self.channel, slices=new_slices)
files = self._get_sliced_paths()
file = files[index]
idx_str = file[file.rfind('t') + 1:]
idx = int(idx_str)
return ScanConfigSet(self.channel, idx)
def __len__(self):
files = self._get_sliced_paths()
for s in self.slices:
files = files[s]
return len(files)
def __getattr__(self, name):
return [getattr(set, name) for set in self]
def __setattr__(self, name, value):
if name in self.my_attrs:
super().__setattr__(name, value)
else:
for s in self:
setattr(s, name, value)
[docs]class ChannelConfig:
"""
Holder of all channel configuration settings.
Attributes:
sets: ScanSets instance
scan_start_set: Index (within sets) of the first scan set.
fir_len: The number of taps of the FIR filter in the FPGA
fir_enable: Whether the Rx data goes through the decimating FIR filter in
the FPGA
fir_aresetn: Active low reset to the FIR filters in the FPGA.
num_sets: number of scan sets possible
num_used_sets: number of scan sets used by the current scan setup
bytes_read: Number of rx data bytes read by userspace
bytes_written: Number of rx data bytes written by the FPGA
overflow: Number of CPIs lost because userspace couldn't keep up
scan_settings_errormessage: a string describing errors with the current
scan setup
scheduler_mode: the mode of operation of the scheduler. The scheduler can
either do its sequence N times ('run_n') or it can run as long as the
input trigger is high ('run_while_enabled')
enable_packet_tagging: whether packet tagging is enabled. Don't change
this
fir_normalizer: the scaler applied to the Rx data in the FPGA
nloops: Number of times to run the sequence, if scheduler_mode ==
'run_n'
samples_per_dma_transfer: Number of bytes transferred per DMA transfer
"""
# TODO: gotta actually instantiate one per set once we have more stuff
fir_len = IntAttr(p('fir_len'))
fir_enable = IntAttr(p('fir_enable'))
fir_normalizer = IntAttr(p('fir_normalizer'))
enable_packet_tagging = BoolAttr(p('enable_packet_tagging'))
fir_aresetn = IntAttr(p('fir_aresetn'))
num_sets = IntAttr(p('num_sets'))
nloops = IntAttr(p('nloops'))
scan_start_set = IntAttr(p('scan_start_set'))
scheduler_mode = StrAttr(p('scheduler_mode'))
samples_per_dma_transfer = IntAttr(p('samples_per_dma_transfer'))
bytes_per_loop = IntAttr(p('bytes_per_loop'))
bytes_read = IntAttr(p('bytes_read'))
bytes_written = IntAttr(p('bytes_written'))
overflow = IntAttr(p('overflow'))
scan_settings_errormessage = StrAttr(p('scan_settings_errormessage'))
_available_scheduler_modes = StrAttr(p('available_scheduler_modes'))
num_used_sets = IntAttr(p('num_used_sets'))
def __init__(self, scan_conf, channel_id):
# dynamically get number of sets
self.id = channel_id
self.subdir = 'ch{}'.format(channel_id)
self.channel_dir = '{}/{}'.format(config.WNI_SYSFS, self.subdir)
self.sets = ScanSets(self)
self.scan_conf = scan_conf
self.fpga_fir = fir_coef.FPGAFIRCoef(self)
# _pc_fir_taps is weird. Ideally, we would store pc_fir_taps on
# ScanConfigSet, but since ScanConfigSet is instantiated whenever it is
# accessed through the ScanSets class, it would blow away _pc_fir_taps.
# So this dict exists to map {scan_set_id: fir info}.
# P. S. pc_fir_taps is the fir filter taps for the processing computer.
self._pc_fir_taps = {}
# same story with _waveforms as _pc_fir_taps
self._waveforms = {}
[docs] def rx_only(self):
for set in self.sets:
set.tx_length = 0
set.tx_amp_length_clk = 0
def _apply_set_config(self, config):
"""
Does the Right Thing™ whenever "set*" and "setN" are given as keys.
Only applies the settings a single time, and uses nested_dict_merged to
merge the dicts.
"""
# general idea here: figure out the sets we need to update the settings
# of. If "set*" was provided, we have to update all the scan sets
# settings. Otherwise, we *only* update the "set0", "set1" etc.
# settings that were passed in.
common_settings = config.pop('set*', {})
if common_settings:
# we need to apply the common settings to *all* scan sets.
to_set = range(self.num_sets)
else:
# get the individual scan configurations that are set now.
to_set = []
regex = re.compile(r'set(\d+)')
for k in config:
match = re.match(regex, k)
if match:
# need to keep track of the indices
set_index = int(match.groups()[0])
to_set.append(set_index)
for set_index in to_set:
set_name = 'set{}'.format(set_index)
scan_config = config.pop(set_name, {})
scan_config = nested_dict_merged(common_settings, scan_config)
self.sets[set_index].apply_dict_config(scan_config)
[docs] def apply_dict_config(self, config):
"""
Apply configuration to a channel from a dict.
The keys are channel attributes, and the values are the values they
will be set to. The keys matching the regular expression "set\\d+" are
special. They configure scan settings. The key ``set*`` applies scan
settings to all scan sets. Individual scan set settings will override
``set*`` settings.
"""
config = config.copy()
# support round-tripping self.apply_dict_config(self.settings_as_dict())
for key in ['bytes_per_loop', 'bytes_read', 'bytes_written',
'num_used_sets', 'overflow', 'samples_per_dma_transfer']:
config.pop(key, None)
# set num_sets first so that when we apply scan settings, the correct
# number of sets will already be populated
num_sets = config.pop('num_sets', None)
if num_sets:
self.num_sets = num_sets
# Also need to get the setting of fir_enable out of the way
fir_enable = config.pop('fir_enable', None)
if fir_enable is not None:
self.fir_enable = fir_enable
# Setting the FIR filter stuff can affect the rx_samples of the scan
# sets, so we have to set it before setting set* stuff.
fir_coef_settings = config.pop('fir_fpga', None)
if fir_coef_settings is not None:
self.fpga_fir.apply_dict_config(fir_coef_settings)
# same thing with FIR length; if it gets overridden we need to set it
# now.
fir_len = config.pop('fir_len', None)
if fir_len is not None:
self.fir_len = fir_len
# Apply the scan settings for set*, set0, set1, ..., setN.
self._apply_set_config(config)
for k in config:
if not hasattr(self, k):
msg = 'Bad configuration for ChannelConfig. No key "{}"'
msg = msg.format(k)
raise ValueError(msg)
for attr, value in config.items():
setattr(self, attr, value)
@property
def available_scheduler_modes(self):
return self._available_scheduler_modes.split()
[docs] def settings_as_dict(self):
"""
Return channel settings as a dict
"""
settings = {}
attrs = (
'bytes_per_loop',
'bytes_read',
'bytes_written',
'enable_packet_tagging',
'fir_aresetn',
'fir_enable',
'fir_len',
'fir_normalizer',
'nloops',
'num_sets',
'num_used_sets',
'overflow',
'samples_per_dma_transfer',
'scan_start_set',
'scheduler_mode',
)
for attr in attrs:
settings[attr] = getattr(self, attr)
for i, set in enumerate(self.sets):
setname = 'set{}'.format(i)
settings[setname] = set.settings_as_dict()
fpga_fir_settings = self.fpga_fir.settings_as_dict()
settings['fir_fpga'] = fpga_fir_settings
return settings
[docs] def fir_set_bpf(self, numtaps=120, start=0, stop=3, window='hamming', nyq=config.NYQ_MHZ):
"""
Set the FPGA FIR filter to use a bandpass filter.
If start == 0, a low-pass filter is created.
If stop == 0, a high-pass filter is created.
Otherwise, a band-pass filter is created.
"""
return self.fpga_fir.bpf(numtaps=numtaps, start=start, stop=stop, window=window, nyq=nyq)
[docs] def fir_set_matched_filter(self, tx_wfm):
"""
Based on the tx waveform, set the FPGA matched filter coefficients.
Args:
tx_wfm: complex numpy array of the transmit waveform.
"""
return self.fpga_fir.set_matched_filter_coef(tx_wfm)
[docs] def fir_get_coef(self, normalized=True):
"""
Return the complex array of FPGA coefficients
Args:
normalized: If True (the default), the coefficients are scaled
between [-1, 1]. Otherwise, the Int16 representation is
returned
"""
fir_coef = self.fpga_fir.fir_coef
if normalized:
fir_coef = fir_coef / config.INT_16_MAX
return fir_coef
[docs] def fir_set_coef(self, coef, normalized=None):
"""
Set the FPGA FIR coefficients
Args:
coef: the complex FIR coefficients.
normalized: If None (the default), the function will determine if
``coef`` is normalized or not based on the values of the array.
If True, the values must be in the range of [-1, 1]. If False,
all the numbers must be between [-INT16_MAX, INT_16_MAX].
"""
coef = np.asarray(coef)
if normalized is not None:
normalized = check_normalized(coef)
return self.fpga_fir.set_fir_coef(coef, normalized)
def __getattr__(self, name):
"""
Allow sets to be accessed by attribute name.
So doing channel.sets[0] is equivalent to channel.set0.
"""
m = re.match(r'set(\d+)$', name)
if m is None:
raise AttributeError('No scan set "{}"'.format(name))
set_ind = int(m.groups()[0])
return self.sets[set_ind]
class _BroadcastList:
"""
Applies get/set operations to all elements of a list
"""
def __init__(self, lst):
self.lst = lst
def __getattr__(self, name):
return [getattr(item, name) for item in self.lst]
def __setattr__(self, name, value):
if name == 'lst':
super().__setattr__(name, value)
return
for s in self.lst:
setattr(s, name, value)
[docs]class Channels:
"""
Broadcasts settings between channels
"""
my_attrs = '_ch1 _ch2 _ch scan_conf'.split()
def __init__(self, scan_conf):
self._ch1 = ChannelConfig(scan_conf, 1)
self._ch2 = ChannelConfig(scan_conf, 2)
self.scan_conf = scan_conf
self._ch = [self._ch1, self._ch2]
def __getitem__(self, index):
if isinstance(index, slice):
raise IndexError('Channels class does not support slicing')
return self._ch[index]
def __len__(self):
return len(self._ch)
def __getattr__(self, name):
return [getattr(ch, name) for ch in self]
@property
def sets(self):
"""
Return a _BroadcastList which passes through all get/set operations
"""
sets = [self._ch1.sets, self._ch2.sets]
return _BroadcastList(sets)
def __setattr__(self, name, value):
if name in self.my_attrs:
super().__setattr__(name, value)
else:
for s in self:
setattr(s, name, value)
[docs]class ScanConf(object):
"""Holds scan configuration and can initiate a software trigger
Uses the sysfs interface. Writing the registers directly will make
the kernel module become out of sync with the hardware, which would
be bad. Do not write the registers directly.
Attributes:
ch: list of channels
ch1: first channel (equivalent to ch[0])
ch2: second channel (equivalent to ch[1])
fir_config_mode: Whether the FPGA is configuring the FIR filters
busy: Whether the system is busy. Usually indicates a running scan
revision: version of FPGA code
clk_counter: 64-bit timestamp from the FPGA
"""
fir_config_mode = BoolAttr(p('fir_config_mode'))
busy = BoolAttr(p('busy'))
revision = IntAttr(p('revision'))
clk_counter = IntAttr(p('clk_counter'))
max_tx_amp_length = UsAttr(p('max_tx_amp_length'))
max_tx_amp_length_clk = IntAttr(p('max_tx_amp_length'))
max_duty_cycle = IntAttr(p('max_duty_cycle'))
jitter_min = UsAttr(p('jitter_min'))
jitter_min_clk = IntAttr(p('jitter_min'))
jitter_mask = IntAttr(p('jitter_mask'))
min_prt = UsAttr(p('min_prt'))
min_prt_clk = IntAttr(p('min_prt'))
def __init__(self, sysfs_dir=config.WNI_SYSFS):
"""
sysfs: the path to the directory containing the sysfs files for
the device.
"""
self.sysfs = sysfs_dir
self.ch = Channels(self)
self.ch1 = self.ch[0]
self.ch2 = self.ch[1]
self._interrupt_sock = nanomsg.Socket(nanomsg.REQ)
# set the resend interval to the max allowed. This prevents the
# request getting sent once a minute in perpetuity.
self._interrupt_sock.set_int_option(nanomsg.REQ,
nanomsg.REQ_RESEND_IVL, 2**31 - 1)
self._interrupt_sock.connect(config.SCAN_INTERRUPT_ADDR)
self._clear_buffer_path = os.path.join(sysfs_dir, 'clear_buffer')
self.scan_timer = ScanTimerClient()
# scan time in seconds (by default, really short)
self.scan_time = 0.001
[docs] def trigger(self):
"""Start the software trigger; run for `self._scan_time` ms. This
function immediately returns."""
if not self.ch1.scan_settings_errormessage \
and not self.ch2.scan_settings_errormessage:
self.scan_timer.trigger(self.scan_time)
[docs] def interrupt(self):
self._interrupt_sock.send_timeout = 20
self._interrupt_sock.recv_timeout = 20
try:
self._interrupt_sock.send(b'interrupt')
ret = self._interrupt_sock.recv()
logger.info('Scan interrupted with message %s', ret.decode())
except nanomsg.NanoMsgAPIError:
# You sang a sad song, but no one cried
msg = "Attempted to stop a scan, but got no response."
logger.warning(msg)
[docs] def wait_for_scan_completion(self):
"""
Wait for the scan to complete.
"""
while self.busy:
# sleeping for 0.005 seconds results in ~ 0.7% cpu usage.
time.sleep(0.005)
[docs] def clear_buffer(self):
with open(self._clear_buffer_path, 'wb') as f:
f.write(b'1')
[docs] def rx_only(self):
for ch in self.ch:
for set in ch.sets:
set.tx_length = 0
set.tx_amp_length = 0
def __str__(self):
return self.__class__.__name__
[docs] def settings_as_dict(self):
"""Return the scan settings as a nested dict. The returned dict will
have the following form:
.. code-block:: python
returned_d = {
'clk_counter': 3131414141,
'fir_config_mode': False,
'busy': False,
'ch1': {
'fir_enable': True,
'scheduler_mode': "run_while_enabled",
'set0': {
'rx_length': 3,
'waveform': {
'iq': [[0.01 + 0.1j], [0.02 + 0.02j], ...],
...: ...
},
'set1': {...: ...},
},
'ch2': {...: ...},
}
"""
settings = {}
attrs = ('clk_counter', 'fir_config_mode', 'busy', 'revision')
for attrname in attrs:
settings[attrname] = getattr(self, attrname)
settings['ch1'] = self.ch1.settings_as_dict()
settings['ch2'] = self.ch2.settings_as_dict()
return settings
[docs] def apply_dict_config(self, config):
"""
Apply scan settings from a dict.
The keys are attributes on scan_conf. The values are the values to set
to. There are three special keys: ch1, ch2, and ch*. The value for
each of these keys is a set. ch1 configures channel 1. ch2 configures
channel 2. ch* configures both channels.
"""
config = config.copy()
# we want to support applying the dict config of the dict returned by
# settings_as_dict, so we remove keys that are read-only
for key in ['busy', 'fir_config_mode', 'clk_counter', 'revision']:
config.pop(key, None)
# apply settings to *both* channels first, so that individual settings
# can be overwritten.
both = config.pop('ch*', {})
ch1 = config.pop('ch1', {})
ch2 = config.pop('ch2', {})
ch1 = nested_dict_merged(both, ch1)
ch2 = nested_dict_merged(both, ch2)
self.ch1.apply_dict_config(ch1)
self.ch2.apply_dict_config(ch2)
for k in config:
if not hasattr(self, k):
msg = 'Bad configuration dict. No key {}'
msg = msg.format(k)
raise ValueError(msg)
for attr, value in config.items():
setattr(self, attr, value)