from __future__ import print_function, division, absolute_import
import logging
import time
from enum import IntEnum
import os
from . import ad9361_swig
import wni.config as config
from wni.gpio import GPIO
from wni.txrx_spi import HMC807
logger = logging.getLogger(__name__)
[docs]class BistMode(IntEnum):
INJ_TX = 1
INJ_RX = 2
[docs]class ENSMMode(IntEnum):
TX = 0
RX = 1
FDD = 3
WAIT = 4
def _get_bist_mode(mode):
"""Convert human-passed BistMode to an enum value"""
bist_mode_map = {
'rx': BistMode.INJ_RX,
'tx': BistMode.INJ_TX,
'disable': BistMode.DISABLE,
bist_modes = list(BistMode)
if mode in bist_modes:
return mode
# better be a string
mode = mode.lower()
return bist_mode_map[mode]
[docs]class AD9361(object):
Implements some functions from C API that can be called from Python.
# Reference clock, in Hz.
REF_CLK = int(40E6)
# change the min attenuation if you need to!
# throw some flags on the class so we don't have to import anything other
# than the class when using this class
# dig_tune flags
DO_ODELAY = ad9361_swig.DO_ODELAY
DO_IDELAY = ad9361_swig.DO_IDELAY
MAX_SAMPLING_FREQ = int(61.44e6)
# number of times to try tuning digital interface when sampling rate is
# changed.
_retries = 5
def __init__(self, spidev_name=config.AD9361_SPI):
self.spidev_name = spidev_name
self.init_param = ad9361_swig.get_init_param()
self.tx_fir_config = ad9361_swig.get_tx_fir_config()
self.rx_fir_config = ad9361_swig.get_rx_fir_config()
self._rf_loopback = False
[docs] def init(self):
"""Initializes the ad9361, configuring clocks and stuff."""
# initialize the AD9361. This relies on all the C code in this
# directory.
self.phy = ad9361_swig.get_ad9361_phy()
# make the gain control manual so the output is consistent.
# in ad9361_globals.h, this attribute is set to 2, to skip tuning both
# interfaces. This is because at the top speeds, it may have issues
# doing the tuning. Now we set skipmode to 0, and the digital
# interface will get tuned by setting the tx_sampling_freq.
self.phy.pdata.dig_interface_tune_skipmode = 0
# sometimes the first setting is funky; we're INTENTIONALLY setting
# tx sampling frequency twice.
self.tx_sampling_freq = config.SAMPLE_CLK
self.tx_sampling_freq = config.SAMPLE_CLK
def __setup_external_hw(self):
Get access to the GPIOs that are external to the AD9361 but are used to
control the AD9361.
# Make sure the gpios are unexported before we try to use them. The
# process *should* close them whenever it exits, but ya know what,
# sometimes things just don't go as planned.
# enable RX channel on AD9361
self._rx_enable = GPIO(961, force_export=True)
self._rf_loopback_ctrl1 = GPIO(962, force_export=True)
self._rf_loopback_ctrl2 = GPIO(963, force_export=True)
self.rf_loopback = False
# In the ad9361 datasheet this pip is called TXNRX
self._tx_enable = GPIO(915, force_export=True)
# sets the LO on RF board
if config.HAS_XKU_BOARD:
self.hmc0 = HMC807.from_chardev(config.XKU_SPI0)
if config.XKU_SPI1 is None:
self.hmc1 = None
self.hmc1 = HMC807.from_chardev(config.XKU_SPI1)
self.hmc0 = None
self.hmc1 = None
def tx_frequency(self):
"""The transmit center frequency of the radar, taking into account a
UDC if it is present.
ad9361_freq = self.tx_lo_freq
if self.hmc0 is not None:
return self.hmc0.frequency_mult * 100E6 - ad9361_freq
return float(ad9361_freq)
def tx_frequency(self, freq):
if self.hmc0 is not None:
desired = self.hmc0.frequency_mult * 100E6 - freq
desired = freq
self.tx_lo_freq = int(round(desired))
def rx_frequency(self):
"""The receive center frequency of the radar, taking into account a UDC
if it is present.
ad9361_freq = self.rx_lo_freq
if self.hmc0 is not None:
return self.hmc0.frequency_mult * 100E6 - ad9361_freq
return float(ad9361_freq)
def rx_frequency(self, freq):
if self.hmc0 is not None:
desired = self.hmc0.frequency_mult * 100E6 - freq
desired = freq
self.rx_lo_freq = int(round(desired))
def _set_loopback(self, gpio_ctrl, enable):
"""Sets RF loopback on the TXRX board.
gpio_ctrl (GPIO): gpio control to tx_sw_ctrl[1 or 2] on txrx board
enable (bool): if True, enable loopback; else enable normal operation
direction = 'high' if enable else 'low'
gpio_ctrl.direction = direction
def rf_loopback(self):
return self._rf_loopback
def rf_loopback(self, val):
if not (val == 0 or val == 1):
msg = 'rf_loopback must be set to either True or False, not "%s"' % val
raise ValueError(msg)
self._set_loopback(self._rf_loopback_ctrl1, val)
self._set_loopback(self._rf_loopback_ctrl2, val)
self._rf_loopback = val
def digital_loopback(self):
return bool(self.mask_read(ad9361_swig.REG_OBSERVE_CONFIG, 1))
def digital_loopback(self, val):
if not (val == 0 or val == 1):
msg = 'digital_loopback must be set to either True or False, not "%s"' % val
raise ValueError(msg)
self.mask_write(ad9361_swig.REG_OBSERVE_CONFIG, 1, int(val))
[docs] def disable_rx(self):
"""Disable rx chain by setting GPIO low."""
self._rx_enable.direction = 'low'
[docs] def disable_tx(self):
"""Disable rx chain by setting GPIO low."""
self._tx_enable.direction = 'low'
[docs] def enable_rx(self):
"""Disable rx chain by setting GPIO low."""
self._rx_enable.direction = 'high'
[docs] def enable_tx(self):
"""Disable rx chain by setting GPIO low."""
self._tx_enable.direction = 'high'
[docs] def reg_read(self, reg):
"""Read the specified register."""
return ad9361_swig.spi_read(reg)
[docs] def reg_write(self, reg, val):
"""Write `val` at `reg` address."""
return ad9361_swig.spi_write(reg, val)
[docs] def set_tx_fir_config(self):
return ad9361_swig.ad9361_set_tx_fir_config(self.phy, self.tx_fir_config)
[docs] def set_rx_fir_config(self):
return ad9361_swig.ad9361_set_rx_fir_config(self.phy, self.rx_fir_config)
[docs] def set_rx_gain_control_mode(self, mode):
"""Sets the gain control mode for both channels."""
ad9361_swig.ad9361_set_rx_gain_control_mode(self.phy, 0, mode)
ad9361_swig.ad9361_set_rx_gain_control_mode(self.phy, 1, mode)
def _bbpll_integer_frequency_word(self):
"""This word represents floor(BBPLL_FREQ / REF_CLOCK_FREQ)
See page 20 of the register map manual.
return self.reg_read(0x44)
def _bbpll_fractional_frequency_word(self):
This word represents
floor((BBPLL_FREQ/REF_CLK_FREQ - floor(BBPLL_FREQ / REF_CLK_FREQ)) * 2088960)
# least to most significant bytes
least = self.reg_read(0x43)
mid = self.reg_read(0x42)
most = self.reg_read(0x41)
joined = (most << 16) | (mid << 8) | least
return joined
def bbpll_freq(self):
Calculated based on equations on pg 20 of the AD9361 register map manual.
N_integer = self._bbpll_integer_frequency_word
N_fractional = self._bbpll_fractional_frequency_word
return self.REF_CLK * (N_integer - N_fractional / 2088960)
def bist_loopback(self):
return ad9361_swig.get_bist_loopback()
def bist_loopback(self, mode):
return ad9361_swig.bist_loopback(mode)
[docs] def bist_prbs(self, mode='Rx'):
mode = _get_bist_mode(mode)
def tx_lo_freq(self):
"""Get or set the current transmit LO frequency (Hz)"""
return ad9361_swig.ad9361_get_tx_lo_freq(self.phy)[1]
def tx_lo_freq(self, freq_Hz):
freq_Hz = int(round(freq_Hz))
return ad9361_swig.ad9361_set_tx_lo_freq(self.phy, freq_Hz)
def rx_lo_freq(self):
"""Get or set the current receive LO frequency (Hz)"""
return ad9361_swig.ad9361_get_rx_lo_freq(self.phy)[1]
def rx_lo_freq(self, freq_Hz):
freq_Hz = int(round(freq_Hz))
return ad9361_swig.ad9361_set_rx_lo_freq(self.phy, freq_Hz)
def _get_tx_attenuation(self, channel):
"""Return tx attenuation in dBm"""
return ad9361_swig.ad9361_get_tx_attenuation(self.phy, channel)[1]
def _set_tx_attenuation(self, channel, attenuation_mdb):
"""Return tx attenuation for channel in dBm"""
if attenuation_mdb < self.MIN_ATTENUATION:
attenuation_mdb = self.MIN_ATTENUATION
return ad9361_swig.ad9361_set_tx_attenuation(self.phy, channel, attenuation_mdb)
def ch1_tx_attenuation(self):
"""Get and set channel 1 tx attenuation in dBm"""
return self._get_tx_attenuation(0)
def ch1_tx_attenuation(self, value):
return self._set_tx_attenuation(0, value)
def ch2_tx_attenuation(self):
"""Get and set channel 2 tx attenuation in dBm"""
return self._get_tx_attenuation(1)
def ch2_tx_attenuation(self, value):
return self._set_tx_attenuation(1, value)
def tx_rf_bandwidth(self):
"""Get the tx bandwidth in Hz."""
return ad9361_swig.ad9361_get_tx_rf_bandwidth(self.phy)[1]
def tx_rf_bandwidth(self, bandwidth_Hz):
"""Sets the tx bandwidth in Hz.
The tx bandwidth will be set to the nearest value it can be.
return ad9361_swig.ad9361_set_tx_rf_bandwidth(self.phy, bandwidth_Hz)
def rx_rf_bandwidth(self):
return ad9361_swig.ad9361_get_rx_rf_bandwidth(self.phy)[1]
def rx_rf_bandwidth(self, bandwidth_Hz):
return ad9361_swig.ad9361_set_rx_rf_bandwidth(self.phy, bandwidth_Hz)
def tx_sampling_freq(self):
"""Get or set the tx sampling frequency.
The sample clock from the AD9361 goes into the FPGA. Setting the
sampling frequency must always be accompanied by resetting the
sysfs file fir_aresetn.
ret, freq = ad9361_swig.ad9361_get_tx_sampling_freq(self.phy)
if ret != 0:
raise ValueError('tx sampling frequency could not be gotten.')
return freq
def tx_sampling_freq(self, freq_Hz):
if freq_Hz > self.MAX_SAMPLING_FREQ:
msg = "Maximum sampling frequency is {:.2f} MHz; received {} MHz"
msg = msg.format(self.MAX_SAMPLING_FREQ / 1e6, freq_Hz / 1e6)
raise ValueError(msg)
freq_Hz = int(freq_Hz)
ad9361_swig.ad9361_set_tx_sampling_freq(self.phy, freq_Hz)
for _ in range(self._retries):
ret = self.dig_tune(0)
if ret == 0:
msg = 'could not tune sampling frequency to {:.3f} MHz'
raise RuntimeError(msg.format(freq_Hz / 1e6,))
def tx_auto_cal(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_tx_auto_cal_en_dis(self.phy)[1]
def tx_auto_cal(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_tx_auto_cal_en_dis(self.phy, val)
def rx_rfdc_tracking(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_rx_rfdc_track_en_dis(self.phy)[1]
def rx_rfdc_tracking(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_rx_rfdc_track_en_dis(self.phy, val)
def rx_bbdc_tracking(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_rx_bbdc_track_en_dis(self.phy)[1]
def rx_bbdc_tracking(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_rx_bbdc_track_en_dis(self.phy, val)
def rx_quad_tracking(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_rx_quad_track_en_dis(self.phy)[1]
def rx_quad_tracking(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_rx_quad_track_en_dis(self.phy, val)
def _adjust_signal_delays(self, frequency):
"""Adjusts rx and tx data and clk signals and delays to the values they
need to be based on the selected sampling frequency.
if frequency < 15E6:
msg = 'Attempting to set the signal delays for a value less '
msg += 'than 15 MHz. No guarantees.'
# let the previous check fall through to here on purpose; in other
# words this is _not_ supposed to be an `elif`
if frequency <= 20E6:
self.rx_data_delay = 0xF
elif frequency <= 25E6:
self.rx_data_delay = 0xC
elif frequency <= 30E6:
self.rx_data_delay = 0x9
msg = 'Attempting to set the signal delays for a value '
msg += 'greater than 30 MHz. No guarantees.'
self.rx_data_delay = 0x9
[docs] def mask_write(self, reg, mask, value):
current = self.reg_read(reg)
new = (value & mask) | (current & ~mask)
self.reg_write(reg, new)
[docs] def mask_read(self, reg, mask):
current = self.reg_read(reg)
masked = current & mask
return masked
[docs] def bist_tone(self, freq=1, dB=0, mask=0, mode='rx'):
"""Run the built-in self-test with some defaults."""
freq = round(int(freq))
mode = _get_bist_mode(mode)
ad9361_swig.set_bist_tone(mode, freq, dB, mask)
[docs] def bist_disable(self):
ad9361_swig.set_bist_tone(BistMode.DISABLE, 0, 0, 0)
def _axi_setup(self):
Ensure that the AXI registers are set up in the correct way.
os.system('env -i bash /home/root/wni/sw/scripts/')
[docs] def dig_tune(self, max_freq=False, flags=BE_VERBOSE | BE_MOREVERBOSE):
"""Tune the AD9361 digital interface. if max_freq == False, the current tx
sampling frequency is the only frequency that will be tuned.
Otherwise, all frequencies will be stepped through.
# swig expects an integer
max_freq = int(max_freq)
ret = ad9361_swig.ad9361_dig_tune(self.phy, max_freq, flags)
# tuning the digital interface causes issues.
return ret
[docs] def reset(self):
"""Resets the ad9361 using GPIO if possible"""
return ad9361_swig.ad9361_reset(self.phy)
def ensm_mode(self):
return ENSMMode(ad9361_swig.ad9361_get_en_state_machine_mode(self.phy)[1])
def ensm_mode(self, mode):
return ad9361_swig.ad9361_set_en_state_machine_mode(self.phy, mode)
def _get_rx_rf_gain(self, channel):
"""Get the gain of the specified channel in dB"""
errcode, gain = ad9361_swig.ad9361_get_rx_rf_gain(self.phy, channel)
return gain
def _set_rx_rf_gain(self, channel, gain):
"""Sets the gain of the specified channel in dB"""
ret = ad9361_swig.ad9361_set_rx_rf_gain(self.phy, channel, gain)
return ret
def ch1_rx_rf_gain(self):
"""Get and set channel 1 rx rf gain """
gain = self._get_rx_rf_gain(0)
return gain
def ch1_rx_rf_gain(self, value):
self._set_rx_rf_gain(0, value)
def ch2_rx_rf_gain(self):
"""Get and set channel 2 rx rf gain """
return self._get_rx_rf_gain(1)
def ch2_rx_rf_gain(self, value):
self._set_rx_rf_gain(1, value)
[docs] def set_rx_lo_internal_external(self, int_ext):
"""Set whether the LO is internal or external"""
return ad9361_swig.ad9361_get_rx_lo_int_ext(self.phy, int_ext)
[docs] def set_tx_lo_internal_external(self, int_ext):
"""Set whether the LO is internal or external"""
return ad9361_swig.ad9361_get_tx_lo_int_ext(self.phy, int_ext)
[docs] def tx_quad_calib(self):
ad9361_swig.ad9361_do_calib(self.phy, ad9361_swig.TX_QUAD_CAL, -1)
[docs] def rf_dc_calib(self):
ad9361_swig.ad9361_do_calib(self.phy, ad9361_swig.RFDC_CAL, -1)
[docs] def recalibrate(self):
def _get_rx_data_clk_delay(self):
return self.reg_read(ad9361_swig.REG_RX_CLOCK_DATA_DELAY)
def _get_tx_data_clk_delay(self):
return self.reg_read(ad9361_swig.REG_TX_CLOCK_DATA_DELAY)
def rx_data_delay(self):
"""Get and set the rx data delay amounts. When the sampling clock
changes, these values need to change as well.
data_clk_delay = self._get_rx_data_clk_delay()
return data_clk_delay & 0xF
def rx_data_delay(self, value):
clk_delay = self.rx_clk_delay
writeback = (clk_delay << 4) | value
self.reg_write(ad9361_swig.REG_RX_CLOCK_DATA_DELAY, writeback)
def rx_clk_delay(self):
"""Get and set the rx clk delay amounts. When the sampling clock
changes, these values need to change as well.
data_clk_delay = self._get_rx_data_clk_delay()
return (data_clk_delay & 0xF0) >> 4
def rx_clk_delay(self, value):
data_delay = self.rx_data_delay
writeback = (value << 4) | data_delay
self.reg_write(ad9361_swig.REG_RX_CLOCK_DATA_DELAY, writeback)
def tx_data_delay(self):
"""Get and set the tx data delay amounts. When the sampling clock
changes, these values need to change as well.
data_clk_delay = self._get_tx_data_clk_delay()
return data_clk_delay & 0xF
def tx_data_delay(self, value):
clk_delay = self.tx_clk_delay
writeback = (clk_delay << 4) | value
self.reg_write(ad9361_swig.REG_TX_CLOCK_DATA_DELAY, writeback)
def tx_clk_delay(self):
"""Get and set the tx clk delay amounts. When the sampling clock
changes, these values need to change as well.
data_clk_delay = self._get_tx_data_clk_delay()
return (data_clk_delay & 0xF0) >> 4
def tx_clk_delay(self, value):
data_delay = self.tx_data_delay
writeback = (value << 4) | data_delay
self.reg_write(ad9361_swig.REG_TX_CLOCK_DATA_DELAY, writeback)
[docs] def settings_as_dict(self):
Return current transceiver settings as a dict.
attrs = (
'tx_frequency', 'rx_frequency', 'ch1_rx_rf_gain', 'ch2_rx_rf_gain',
'ch1_tx_attenuation', 'ch2_tx_attenuation', 'rf_loopback',
'digital_loopback', 'rx_lo_freq', 'tx_lo_freq', 'tx_rf_bandwidth',
'rx_rf_bandwidth', 'tx_sampling_freq', 'rx_rfdc_tracking',
'rx_bbdc_tracking', 'rx_quad_tracking', 'rx_data_delay',
'rx_clk_delay', 'tx_data_delay', 'tx_clk_delay',
settings = {attr: getattr(self, attr) for attr in attrs}
return settings
[docs] def apply_dict_config(self, config):
Apply configuration from a dict.
config = config.copy()
# changing the LO causes other things to happen (the Rx gain can be
# changed automatically) so we need to make sure we apply that first
# check for invalid settings: can't set both LO and rx_frequency, since
# they're really the same thing. If they're both provided rx_frequency
# and tx_frequency will be the values that are actually used.
set_first = 'rx_lo_freq', 'rx_frequency', 'tx_lo_freq', 'tx_frequency'
for attr in set_first:
value = config.pop(attr, None)
if value is not None:
setattr(self, attr, value)
for attr, value in config.items():
if not hasattr(self, attr):
msg = 'Bad configuration for AD9361. No attribute "{}"'
msg = msg.format(attr)
raise ValueError(msg)
setattr(self, attr, value)