Source code for wni.waveforms
"""
Provides an interface to the waveforms stuff exposed by the kernel
"""
import logging
import os
import numpy as np
import wni.signal as signal
import wni.config as config
from wni.signal_util import fix_window, check_normalized
logger = logging.getLogger(__name__)
try:
win_module = signal.windows.windows
except AttributeError:
# In older versions of scipy, windows is a module instead of a package
win_module = signal.windows
wins = win_module._win_equiv_raw
NO_PARAM_WINDOWS = sorted(k[0] for k, v in wins.items() if not v[1])
NO_PARAM_WINDOWS.append('rectangular')
PARAM_WINDOWS = sorted(k[0] for k, v in wins.items() if v[1])
del wins
# delete this because it is the only one that requires two parameters
PARAM_WINDOWS.remove('general gaussian')
WINDOWS = NO_PARAM_WINDOWS + PARAM_WINDOWS
[docs]def to_int12(iq):
"""
Set iq scaled from [-1, 1] to int12 values.
"""
iq = np.asarray(iq, dtype='complex128')
iq = np.round(iq * config.INT_12_MAX)
return iq
[docs]def calc_tx_length(nsamples):
"""Calculate the tx time in microseconds."""
# transmit time in microseconds
tx_time = nsamples * config.SAMPLE_TIME_US
# The trigger has to stop before the samples are finished or
# the waveform will be repeated again.
tx_time -= config.SAMPLE_TIME_US
if tx_time < 0.1:
tx_time = 0.1
return tx_time
[docs]class Chirp:
"""
A bag of values representing a linear frequency modulated (LFM) chirp.
"""
def __init__(self, nsamples, center, bw, window='boxcar', scale=1,
clk_freq=config.SAMPLE_CLK / 1E6, phase=0):
self.nsamples = nsamples
self.center = center
self.bw = bw
self.window = fix_window(window)
self.clk = clk_freq
self.scale = scale
self.phase = phase
self.__calc_chirp()
def __calc_chirp(self):
"""Return I/Q samples of a linear chirp"""
bw2 = self.bw / 2
t = self.t = np.arange(self.nsamples) / self.clk
i = self.i = signal.chirp(t, self.center - bw2, t[-1], self.center +
bw2, phi=self.phase)
q = self.q = signal.chirp(t, self.center - bw2, t[-1], self.center +
bw2, phi=self.phase - 90)
windowed = signal.get_window(self.window, self.nsamples)
i *= windowed
q *= windowed
self.iq = self.i + 1j * self.q
self.iq *= self.scale
def __repr__(self):
fmt = 'Chirp(nsamples=%d, center=%f, bw=%f, window=%r, scale=%f, clk_freq=%f)'
str = fmt % (self.nsamples, self.center, self.bw, self.window,
self.scale, self.clk)
return str
[docs]class Waveform:
"""
Get and set Tx waveforms.
"""
def __init__(self, path):
if not os.path.exists(path):
raise ValueError('Path "{}" does not exist'.format(path))
self.path = path
self._chirp = None
[docs] def set_iq(self, iq, normalized=None):
"""
Write I/Q samples to the waveform file.
Args:
iq: complex numpy array of I/Q samples
normalized: whether the I/Q samples are normalized (all samples
between [-1, 1]) or not (all samples fit in 12-bit ints)
"""
iq = np.asarray(iq, dtype='complex64')
normalized = check_normalized(iq, base=12, expected=normalized)
packed = self._pack_iq(iq, normalized)
with open(self.path, 'wb') as f:
f.write(packed)
[docs] def set_iq_int12(self, iq):
"""
Write I/Q samples to the waveform file.
Args:
iq: complex numpy array of I/Q samples, which all fit in 12-bit
ints
"""
return self.set_iq(iq, normalized=False)
[docs] def set_iq_normalized(self, iq):
"""
Writes normalized I/Q samples to the waveorm file.
Args:
iq: complex numpy array of I/Q samples. All samples must be
between [-1, 1] inclusive.
"""
self.set_iq(iq, normalized=True)
[docs] def get_iq(self, normalized=True):
with open(self.path, 'rb') as f:
# note: can't use fromfile() becuase char device won't let it
b = f.read()
packed = np.fromstring(b, dtype='int16')
i = packed[::2]
q = packed[1::2]
iq = i + 1j * q
if normalized:
iq /= config.INT_12_MAX
iq = iq.astype('complex64')
return iq
def _pack_iq(self, iq, normalized):
"""
Packs iq in a way that can be written to /dev/chx_wfmy files
"""
if normalized:
iq = iq * config.INT_12_MAX
iq = np.round(iq)
i = iq.real.astype('int16')
q = iq.imag.astype('int16')
packed = np.empty((i.size + q.size,), dtype=i.dtype)
packed[::2] = i
packed[1::2] = q
return packed
[docs] def chirp(self, nsamples, center, bw, window='boxcar', scale=1,
clk_freq=config.SAMPLE_CLK / 1E6, phase=0):
"""
Set the current waveform to the specified LFM chirp.
Args:
nsamples: Number of samples in the chirp.
center: The center frequency in MHz
bw: The bandwidth in MHz
window: The amplitude window
scale: A number between [0, 1]. Scale the Tx waveform by this
factor.
clk_freq: Clock frequency in MHz.
"""
self._chirp = Chirp(nsamples, center, bw, window, scale, clk_freq,
phase=phase)
self.set_iq_normalized(self._chirp.iq)
return self._chirp
@property
def waveform(self):
"""Return the scaled waveform, so that max amplitude corresponds to
[-1, 1]."""
return self.get_iq_normalized()
@waveform.setter
def waveform(self, iq):
self.set_iq(iq)
[docs] def apply_dict_config(self, config):
"""
Apply waveform configuration from a dict. Returns the I/Q samples
that were set.
Configuration keys:
* "type": either "chirp" or "arb". If "type" is not specified, "arb"
is assumed. If "chirp" is given, the following keys are
required:
* "nsamples": Number of samples in the chirp.
* "center": Center frequency in the chirp, in MHz. The value
can be between +/-15.
* "bandwidth": The bandwidth of the chirp, in MHz. The value
can be between +/-15.
The following keys are optional for chirp:
* "window": The name of the amplitude window to apply.
Defaults to "hanning".
* "phase": The phase offset of the waveform.
The following keys are required if the type is "arb"
* "iq": A complex numpy array of samples. If all values fall
between -1 and 1, they are scaled appropriately to fit in the
12-bit DAC. Otherwise, the values are used directly as is.
* "scale": Scale to apply to the digital waveform. Defaults to
0.25. The DAC performs better whenever this value is not near 1.
"""
config = config.copy()
wfm_type = config.get('type', 'arb')
if wfm_type not in ('chirp', 'arb'):
raise ValueError('Bad type for waveform: "{}"'.format(wfm_type))
scale = config.get('scale', 0.25)
if wfm_type == 'chirp':
needs_keys = 'nsamples', 'center', 'bandwidth'
missing_keys = [x for x in needs_keys if x not in config]
if missing_keys:
msg = "Chirp requires parameters '{}'".format(missing_keys)
raise ValueError(msg)
center = config['center']
bw = config['bandwidth']
nsamples = config['nsamples']
window = config.get('window', 'hanning')
phase = config.get('phase', 0)
chirp = self.chirp(nsamples, center, bw, window, scale, phase=phase)
iq = chirp.iq
elif wfm_type == 'arb':
iq = np.asarray(config['iq'])
normalized = check_normalized(iq)
self.set_iq(iq, normalized)
return iq
[docs] def settings_as_dict(self):
"""Return the current waveform information serialized as a dict"""
settings = {}
if self._chirp is not None:
settings['type'] = 'chirp'
settings.update({
'nsamples': self._chirp.nsamples,
'center': self._chirp.center,
'bandwidth': self._chirp.bw,
'window': self._chirp.window,
'clk': self._chirp.clk,
'scale': self._chirp.scale,
'phase': self._chirp.phase,
})
else:
settings['type'] = 'arb'
settings['scale'] = 1
settings['iq'] = self.waveform
return settings
def __len__(self):
with open(self.path, 'rb') as f:
b = f.read()
return len(b / 4)