Source code for wni.waveforms

"""
Provides an interface to the waveforms stuff exposed by the kernel
"""

import logging
import os

import numpy as np
import wni.signal as signal

import wni.config as config
from wni.signal_util import fix_window, check_normalized


logger = logging.getLogger(__name__)

try:
    win_module = signal.windows.windows
except AttributeError:
    # In older versions of scipy, windows is a module instead of a package
    win_module = signal.windows
wins = win_module._win_equiv_raw
NO_PARAM_WINDOWS = sorted(k[0] for k, v in wins.items() if not v[1])
NO_PARAM_WINDOWS.append('rectangular')
PARAM_WINDOWS = sorted(k[0] for k, v in wins.items() if v[1])
del wins
# delete this because it is the only one that requires two parameters
PARAM_WINDOWS.remove('general gaussian')

WINDOWS = NO_PARAM_WINDOWS + PARAM_WINDOWS


[docs]def to_int12(iq): """ Set iq scaled from [-1, 1] to int12 values. """ iq = np.asarray(iq, dtype='complex128') iq = np.round(iq * config.INT_12_MAX) return iq
[docs]def calc_tx_length(nsamples): """Calculate the tx time in microseconds.""" # transmit time in microseconds tx_time = nsamples * config.SAMPLE_TIME_US # The trigger has to stop before the samples are finished or # the waveform will be repeated again. tx_time -= config.SAMPLE_TIME_US if tx_time < 0.1: tx_time = 0.1 return tx_time
[docs]class Chirp: """ A bag of values representing a linear frequency modulated (LFM) chirp. """ def __init__(self, nsamples, center, bw, window='boxcar', scale=1, clk_freq=config.SAMPLE_CLK / 1E6, phase=0): self.nsamples = nsamples self.center = center self.bw = bw self.window = fix_window(window) self.clk = clk_freq self.scale = scale self.phase = phase self.__calc_chirp() def __calc_chirp(self): """Return I/Q samples of a linear chirp""" bw2 = self.bw / 2 t = self.t = np.arange(self.nsamples) / self.clk i = self.i = signal.chirp(t, self.center - bw2, t[-1], self.center + bw2, phi=self.phase) q = self.q = signal.chirp(t, self.center - bw2, t[-1], self.center + bw2, phi=self.phase - 90) windowed = signal.get_window(self.window, self.nsamples) i *= windowed q *= windowed self.iq = self.i + 1j * self.q self.iq *= self.scale def __repr__(self): fmt = 'Chirp(nsamples=%d, center=%f, bw=%f, window=%r, scale=%f, clk_freq=%f)' str = fmt % (self.nsamples, self.center, self.bw, self.window, self.scale, self.clk) return str
[docs]class Waveform: """ Get and set Tx waveforms. """ def __init__(self, path): if not os.path.exists(path): raise ValueError('Path "{}" does not exist'.format(path)) self.path = path self._chirp = None
[docs] def no_tx(self): self.write_iq([]) return []
[docs] def set_iq(self, iq, normalized=None): """ Write I/Q samples to the waveform file. Args: iq: complex numpy array of I/Q samples normalized: whether the I/Q samples are normalized (all samples between [-1, 1]) or not (all samples fit in 12-bit ints) """ iq = np.asarray(iq, dtype='complex64') normalized = check_normalized(iq, base=12, expected=normalized) packed = self._pack_iq(iq, normalized) with open(self.path, 'wb') as f: f.write(packed)
[docs] def set_iq_int12(self, iq): """ Write I/Q samples to the waveform file. Args: iq: complex numpy array of I/Q samples, which all fit in 12-bit ints """ return self.set_iq(iq, normalized=False)
[docs] def set_iq_normalized(self, iq): """ Writes normalized I/Q samples to the waveorm file. Args: iq: complex numpy array of I/Q samples. All samples must be between [-1, 1] inclusive. """ self.set_iq(iq, normalized=True)
[docs] def get_iq(self, normalized=True): with open(self.path, 'rb') as f: # note: can't use fromfile() becuase char device won't let it b = f.read() packed = np.fromstring(b, dtype='int16') i = packed[::2] q = packed[1::2] iq = i + 1j * q if normalized: iq /= config.INT_12_MAX iq = iq.astype('complex64') return iq
[docs] def get_iq_int12(self): return self.get_iq(normalized=False)
[docs] def get_iq_normalized(self): return self.get_iq(normalized=True)
def _pack_iq(self, iq, normalized): """ Packs iq in a way that can be written to /dev/chx_wfmy files """ if normalized: iq = iq * config.INT_12_MAX iq = np.round(iq) i = iq.real.astype('int16') q = iq.imag.astype('int16') packed = np.empty((i.size + q.size,), dtype=i.dtype) packed[::2] = i packed[1::2] = q return packed
[docs] def chirp(self, nsamples, center, bw, window='boxcar', scale=1, clk_freq=config.SAMPLE_CLK / 1E6, phase=0): """ Set the current waveform to the specified LFM chirp. Args: nsamples: Number of samples in the chirp. center: The center frequency in MHz bw: The bandwidth in MHz window: The amplitude window scale: A number between [0, 1]. Scale the Tx waveform by this factor. clk_freq: Clock frequency in MHz. """ self._chirp = Chirp(nsamples, center, bw, window, scale, clk_freq, phase=phase) self.set_iq_normalized(self._chirp.iq) return self._chirp
@property def waveform(self): """Return the scaled waveform, so that max amplitude corresponds to [-1, 1].""" return self.get_iq_normalized() @waveform.setter def waveform(self, iq): self.set_iq(iq)
[docs] def apply_dict_config(self, config): """ Apply waveform configuration from a dict. Returns the I/Q samples that were set. Configuration keys: * "type": either "chirp" or "arb". If "type" is not specified, "arb" is assumed. If "chirp" is given, the following keys are required: * "nsamples": Number of samples in the chirp. * "center": Center frequency in the chirp, in MHz. The value can be between +/-15. * "bandwidth": The bandwidth of the chirp, in MHz. The value can be between +/-15. The following keys are optional for chirp: * "window": The name of the amplitude window to apply. Defaults to "hanning". * "phase": The phase offset of the waveform. The following keys are required if the type is "arb" * "iq": A complex numpy array of samples. If all values fall between -1 and 1, they are scaled appropriately to fit in the 12-bit DAC. Otherwise, the values are used directly as is. * "scale": Scale to apply to the digital waveform. Defaults to 0.25. The DAC performs better whenever this value is not near 1. """ config = config.copy() wfm_type = config.get('type', 'arb') if wfm_type not in ('chirp', 'arb'): raise ValueError('Bad type for waveform: "{}"'.format(wfm_type)) scale = config.get('scale', 0.25) if wfm_type == 'chirp': needs_keys = 'nsamples', 'center', 'bandwidth' missing_keys = [x for x in needs_keys if x not in config] if missing_keys: msg = "Chirp requires parameters '{}'".format(missing_keys) raise ValueError(msg) center = config['center'] bw = config['bandwidth'] nsamples = config['nsamples'] window = config.get('window', 'hanning') phase = config.get('phase', 0) chirp = self.chirp(nsamples, center, bw, window, scale, phase=phase) iq = chirp.iq elif wfm_type == 'arb': iq = np.asarray(config['iq']) normalized = check_normalized(iq) self.set_iq(iq, normalized) return iq
[docs] def settings_as_dict(self): """Return the current waveform information serialized as a dict""" settings = {} if self._chirp is not None: settings['type'] = 'chirp' settings.update({ 'nsamples': self._chirp.nsamples, 'center': self._chirp.center, 'bandwidth': self._chirp.bw, 'window': self._chirp.window, 'clk': self._chirp.clk, 'scale': self._chirp.scale, 'phase': self._chirp.phase, }) else: settings['type'] = 'arb' settings['scale'] = 1 settings['iq'] = self.waveform return settings
def __len__(self): with open(self.path, 'rb') as f: b = f.read() return len(b / 4)