from __future__ import print_function, division, absolute_import
import logging
import time
from enum import IntEnum
import os
from . import ad9361_swig
import wni.config as config
from wni.gpio import GPIO
from wni.txrx_spi import HMC807
logger = logging.getLogger(__name__)
[docs]class BistMode(IntEnum):
DISABLE = 0
INJ_TX = 1
INJ_RX = 2
[docs]class ENSMMode(IntEnum):
TX = 0
RX = 1
ALERT = 2
FDD = 3
WAIT = 4
SLEEP = 5
PINCTRL = 6
PINCTRL_FDD_INDEP = 7
def _get_bist_mode(mode):
"""Convert human-passed BistMode to an enum value"""
bist_mode_map = {
'rx': BistMode.INJ_RX,
'tx': BistMode.INJ_TX,
'disable': BistMode.DISABLE,
}
bist_modes = list(BistMode)
if mode in bist_modes:
return mode
else:
# better be a string
mode = mode.lower()
return bist_mode_map[mode]
[docs]class AD9361(object):
"""
Implements some functions from C API that can be called from Python.
"""
# Reference clock, in Hz.
REF_CLK = int(40E6)
# change the min attenuation if you need to!
MIN_ATTENUATION = 0
# throw some flags on the class so we don't have to import anything other
# than the class when using this class
# dig_tune flags
BE_VERBOSE = ad9361_swig.BE_VERBOSE
BE_MOREVERBOSE = ad9361_swig.BE_MOREVERBOSE
DO_ODELAY = ad9361_swig.DO_ODELAY
DO_IDELAY = ad9361_swig.DO_IDELAY
MAX_SAMPLING_FREQ = int(61.44e6)
# number of times to try tuning digital interface when sampling rate is
# changed.
_retries = 5
def __init__(self, spidev_name=config.AD9361_SPI):
self.spidev_name = spidev_name
self.init_param = ad9361_swig.get_init_param()
self.tx_fir_config = ad9361_swig.get_tx_fir_config()
self.rx_fir_config = ad9361_swig.get_rx_fir_config()
self._rf_loopback = False
self.__setup_external_hw()
self.init()
[docs] def init(self):
"""Initializes the ad9361, configuring clocks and stuff."""
# initialize the AD9361. This relies on all the C code in this
# directory.
ad9361_swig.init(
self.spidev_name,
self.init_param,
self.tx_fir_config,
self.rx_fir_config
)
self.phy = ad9361_swig.get_ad9361_phy()
# make the gain control manual so the output is consistent.
self.set_rx_gain_control_mode(ad9361_swig.RF_GAIN_MGC)
# in ad9361_globals.h, this attribute is set to 2, to skip tuning both
# interfaces. This is because at the top speeds, it may have issues
# doing the tuning. Now we set skipmode to 0, and the digital
# interface will get tuned by setting the tx_sampling_freq.
self.phy.pdata.dig_interface_tune_skipmode = 0
# sometimes the first setting is funky; we're INTENTIONALLY setting
# tx sampling frequency twice.
self.tx_sampling_freq = config.SAMPLE_CLK
self.tx_sampling_freq = config.SAMPLE_CLK
def __setup_external_hw(self):
"""
Get access to the GPIOs that are external to the AD9361 but are used to
control the AD9361.
"""
# Make sure the gpios are unexported before we try to use them. The
# process *should* close them whenever it exits, but ya know what,
# sometimes things just don't go as planned.
# enable RX channel on AD9361
self._rx_enable = GPIO(961, force_export=True)
self._rf_loopback_ctrl1 = GPIO(962, force_export=True)
self._rf_loopback_ctrl2 = GPIO(963, force_export=True)
self.rf_loopback = False
# In the ad9361 datasheet this pip is called TXNRX
self._tx_enable = GPIO(915, force_export=True)
self.enable_tx()
self.enable_rx()
# sets the LO on RF board
if config.HAS_XKU_BOARD:
self.hmc0 = HMC807.from_chardev(config.XKU_SPI0)
self.hmc0.init_registers()
if config.XKU_SPI1 is None:
self.hmc1 = None
else:
self.hmc1 = HMC807.from_chardev(config.XKU_SPI1)
self.hmc1.init_registers()
else:
self.hmc0 = None
self.hmc1 = None
@property
def tx_frequency(self):
"""The transmit center frequency of the radar, taking into account a
UDC if it is present.
"""
ad9361_freq = self.tx_lo_freq
if self.hmc0 is not None:
return self.hmc0.frequency_mult * 100E6 - ad9361_freq
else:
return float(ad9361_freq)
@tx_frequency.setter
def tx_frequency(self, freq):
if self.hmc0 is not None:
desired = self.hmc0.frequency_mult * 100E6 - freq
else:
desired = freq
self.tx_lo_freq = int(round(desired))
@property
def rx_frequency(self):
"""The receive center frequency of the radar, taking into account a UDC
if it is present.
"""
ad9361_freq = self.rx_lo_freq
if self.hmc0 is not None:
return self.hmc0.frequency_mult * 100E6 - ad9361_freq
else:
return float(ad9361_freq)
@rx_frequency.setter
def rx_frequency(self, freq):
if self.hmc0 is not None:
desired = self.hmc0.frequency_mult * 100E6 - freq
else:
desired = freq
self.rx_lo_freq = int(round(desired))
def _set_loopback(self, gpio_ctrl, enable):
"""Sets RF loopback on the TXRX board.
Args:
gpio_ctrl (GPIO): gpio control to tx_sw_ctrl[1 or 2] on txrx board
enable (bool): if True, enable loopback; else enable normal operation
"""
direction = 'high' if enable else 'low'
gpio_ctrl.direction = direction
@property
def rf_loopback(self):
return self._rf_loopback
@rf_loopback.setter
def rf_loopback(self, val):
if not (val == 0 or val == 1):
msg = 'rf_loopback must be set to either True or False, not "%s"' % val
raise ValueError(msg)
self._set_loopback(self._rf_loopback_ctrl1, val)
self._set_loopback(self._rf_loopback_ctrl2, val)
self._rf_loopback = val
@property
def digital_loopback(self):
return bool(self.mask_read(ad9361_swig.REG_OBSERVE_CONFIG, 1))
@digital_loopback.setter
def digital_loopback(self, val):
if not (val == 0 or val == 1):
msg = 'digital_loopback must be set to either True or False, not "%s"' % val
raise ValueError(msg)
self.mask_write(ad9361_swig.REG_OBSERVE_CONFIG, 1, int(val))
[docs] def disable_rx(self):
"""Disable rx chain by setting GPIO low."""
self._rx_enable.direction = 'low'
[docs] def disable_tx(self):
"""Disable rx chain by setting GPIO low."""
self._tx_enable.direction = 'low'
[docs] def enable_rx(self):
"""Disable rx chain by setting GPIO low."""
self._rx_enable.direction = 'high'
[docs] def enable_tx(self):
"""Disable rx chain by setting GPIO low."""
self._tx_enable.direction = 'high'
[docs] def reg_read(self, reg):
"""Read the specified register."""
return ad9361_swig.spi_read(reg)
[docs] def reg_write(self, reg, val):
"""Write `val` at `reg` address."""
return ad9361_swig.spi_write(reg, val)
[docs] def set_tx_fir_config(self):
return ad9361_swig.ad9361_set_tx_fir_config(self.phy, self.tx_fir_config)
[docs] def set_rx_fir_config(self):
return ad9361_swig.ad9361_set_rx_fir_config(self.phy, self.rx_fir_config)
[docs] def set_rx_gain_control_mode(self, mode):
"""Sets the gain control mode for both channels."""
ad9361_swig.ad9361_set_rx_gain_control_mode(self.phy, 0, mode)
ad9361_swig.ad9361_set_rx_gain_control_mode(self.phy, 1, mode)
@property
def _bbpll_integer_frequency_word(self):
"""This word represents floor(BBPLL_FREQ / REF_CLOCK_FREQ)
See page 20 of the register map manual.
"""
return self.reg_read(0x44)
@property
def _bbpll_fractional_frequency_word(self):
"""
This word represents
floor((BBPLL_FREQ/REF_CLK_FREQ - floor(BBPLL_FREQ / REF_CLK_FREQ)) * 2088960)
"""
# least to most significant bytes
least = self.reg_read(0x43)
mid = self.reg_read(0x42)
most = self.reg_read(0x41)
joined = (most << 16) | (mid << 8) | least
return joined
@property
def bbpll_freq(self):
"""
Calculated based on equations on pg 20 of the AD9361 register map manual.
"""
N_integer = self._bbpll_integer_frequency_word
N_fractional = self._bbpll_fractional_frequency_word
return self.REF_CLK * (N_integer - N_fractional / 2088960)
@property
def bist_loopback(self):
return ad9361_swig.get_bist_loopback()
@bist_loopback.setter
def bist_loopback(self, mode):
return ad9361_swig.bist_loopback(mode)
[docs] def bist_prbs(self, mode='Rx'):
mode = _get_bist_mode(mode)
ad9361_swig.bist_prbs(mode)
@property
def tx_lo_freq(self):
"""Get or set the current transmit LO frequency (Hz)"""
return ad9361_swig.ad9361_get_tx_lo_freq(self.phy)[1]
@tx_lo_freq.setter
def tx_lo_freq(self, freq_Hz):
freq_Hz = int(round(freq_Hz))
return ad9361_swig.ad9361_set_tx_lo_freq(self.phy, freq_Hz)
@property
def rx_lo_freq(self):
"""Get or set the current receive LO frequency (Hz)"""
return ad9361_swig.ad9361_get_rx_lo_freq(self.phy)[1]
@rx_lo_freq.setter
def rx_lo_freq(self, freq_Hz):
freq_Hz = int(round(freq_Hz))
return ad9361_swig.ad9361_set_rx_lo_freq(self.phy, freq_Hz)
def _get_tx_attenuation(self, channel):
"""Return tx attenuation in dBm"""
return ad9361_swig.ad9361_get_tx_attenuation(self.phy, channel)[1]
def _set_tx_attenuation(self, channel, attenuation_mdb):
"""Return tx attenuation for channel in dBm"""
if attenuation_mdb < self.MIN_ATTENUATION:
attenuation_mdb = self.MIN_ATTENUATION
return ad9361_swig.ad9361_set_tx_attenuation(self.phy, channel, attenuation_mdb)
@property
def ch1_tx_attenuation(self):
"""Get and set channel 1 tx attenuation in dBm"""
return self._get_tx_attenuation(0)
@ch1_tx_attenuation.setter
def ch1_tx_attenuation(self, value):
return self._set_tx_attenuation(0, value)
@property
def ch2_tx_attenuation(self):
"""Get and set channel 2 tx attenuation in dBm"""
return self._get_tx_attenuation(1)
@ch2_tx_attenuation.setter
def ch2_tx_attenuation(self, value):
return self._set_tx_attenuation(1, value)
@property
def tx_rf_bandwidth(self):
"""Get the tx bandwidth in Hz."""
return ad9361_swig.ad9361_get_tx_rf_bandwidth(self.phy)[1]
@tx_rf_bandwidth.setter
def tx_rf_bandwidth(self, bandwidth_Hz):
"""Sets the tx bandwidth in Hz.
The tx bandwidth will be set to the nearest value it can be.
"""
return ad9361_swig.ad9361_set_tx_rf_bandwidth(self.phy, bandwidth_Hz)
@property
def rx_rf_bandwidth(self):
return ad9361_swig.ad9361_get_rx_rf_bandwidth(self.phy)[1]
@rx_rf_bandwidth.setter
def rx_rf_bandwidth(self, bandwidth_Hz):
return ad9361_swig.ad9361_set_rx_rf_bandwidth(self.phy, bandwidth_Hz)
@property
def tx_sampling_freq(self):
"""Get or set the tx sampling frequency.
Warning:
The sample clock from the AD9361 goes into the FPGA. Setting the
sampling frequency must always be accompanied by resetting the
sysfs file fir_aresetn.
"""
ret, freq = ad9361_swig.ad9361_get_tx_sampling_freq(self.phy)
if ret != 0:
raise ValueError('tx sampling frequency could not be gotten.')
return freq
@tx_sampling_freq.setter
def tx_sampling_freq(self, freq_Hz):
if freq_Hz > self.MAX_SAMPLING_FREQ:
msg = "Maximum sampling frequency is {:.2f} MHz; received {} MHz"
msg = msg.format(self.MAX_SAMPLING_FREQ / 1e6, freq_Hz / 1e6)
raise ValueError(msg)
freq_Hz = int(freq_Hz)
ad9361_swig.ad9361_set_tx_sampling_freq(self.phy, freq_Hz)
for _ in range(self._retries):
ret = self.dig_tune(0)
if ret == 0:
break
else:
msg = 'could not tune sampling frequency to {:.3f} MHz'
raise RuntimeError(msg.format(freq_Hz / 1e6,))
@property
def tx_auto_cal(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_tx_auto_cal_en_dis(self.phy)[1]
@tx_auto_cal.setter
def tx_auto_cal(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_tx_auto_cal_en_dis(self.phy, val)
@property
def rx_rfdc_tracking(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_rx_rfdc_track_en_dis(self.phy)[1]
@rx_rfdc_tracking.setter
def rx_rfdc_tracking(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_rx_rfdc_track_en_dis(self.phy, val)
@property
def rx_bbdc_tracking(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_rx_bbdc_track_en_dis(self.phy)[1]
@rx_bbdc_tracking.setter
def rx_bbdc_tracking(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_rx_bbdc_track_en_dis(self.phy, val)
@property
def rx_quad_tracking(self):
"""Get or set tx auto cal state"""
return ad9361_swig.ad9361_get_rx_quad_track_en_dis(self.phy)[1]
@rx_quad_tracking.setter
def rx_quad_tracking(self, val):
val = int(bool(val))
ad9361_swig.ad9361_set_rx_quad_track_en_dis(self.phy, val)
def _adjust_signal_delays(self, frequency):
"""Adjusts rx and tx data and clk signals and delays to the values they
need to be based on the selected sampling frequency.
"""
if frequency < 15E6:
msg = 'Attempting to set the signal delays for a value less '
msg += 'than 15 MHz. No guarantees.'
logger.warn(msg)
# let the previous check fall through to here on purpose; in other
# words this is _not_ supposed to be an `elif`
if frequency <= 20E6:
self.rx_data_delay = 0xF
elif frequency <= 25E6:
self.rx_data_delay = 0xC
elif frequency <= 30E6:
self.rx_data_delay = 0x9
else:
msg = 'Attempting to set the signal delays for a value '
msg += 'greater than 30 MHz. No guarantees.'
logger.warn(msg)
self.rx_data_delay = 0x9
[docs] def mask_write(self, reg, mask, value):
current = self.reg_read(reg)
new = (value & mask) | (current & ~mask)
self.reg_write(reg, new)
[docs] def mask_read(self, reg, mask):
current = self.reg_read(reg)
masked = current & mask
return masked
[docs] def bist_tone(self, freq=1, dB=0, mask=0, mode='rx'):
"""Run the built-in self-test with some defaults."""
freq = round(int(freq))
mode = _get_bist_mode(mode)
ad9361_swig.set_bist_tone(mode, freq, dB, mask)
[docs] def bist_disable(self):
ad9361_swig.set_bist_tone(BistMode.DISABLE, 0, 0, 0)
def _axi_setup(self):
"""
Ensure that the AXI registers are set up in the correct way.
"""
os.system('env -i bash /home/root/wni/sw/scripts/axi_ad9361_setup.sh')
[docs] def dig_tune(self, max_freq=False, flags=BE_VERBOSE | BE_MOREVERBOSE):
"""Tune the AD9361 digital interface. if max_freq == False, the current tx
sampling frequency is the only frequency that will be tuned.
Otherwise, all frequencies will be stepped through.
"""
# swig expects an integer
max_freq = int(max_freq)
ret = ad9361_swig.ad9361_dig_tune(self.phy, max_freq, flags)
# tuning the digital interface causes issues.
self._axi_setup()
return ret
[docs] def reset(self):
"""Resets the ad9361 using GPIO if possible"""
return ad9361_swig.ad9361_reset(self.phy)
@property
def ensm_mode(self):
return ENSMMode(ad9361_swig.ad9361_get_en_state_machine_mode(self.phy)[1])
@ensm_mode.setter
def ensm_mode(self, mode):
return ad9361_swig.ad9361_set_en_state_machine_mode(self.phy, mode)
def _get_rx_rf_gain(self, channel):
"""Get the gain of the specified channel in dB"""
errcode, gain = ad9361_swig.ad9361_get_rx_rf_gain(self.phy, channel)
return gain
def _set_rx_rf_gain(self, channel, gain):
"""Sets the gain of the specified channel in dB"""
ret = ad9361_swig.ad9361_set_rx_rf_gain(self.phy, channel, gain)
return ret
@property
def ch1_rx_rf_gain(self):
"""Get and set channel 1 rx rf gain """
gain = self._get_rx_rf_gain(0)
return gain
@ch1_rx_rf_gain.setter
def ch1_rx_rf_gain(self, value):
self._set_rx_rf_gain(0, value)
@property
def ch2_rx_rf_gain(self):
"""Get and set channel 2 rx rf gain """
return self._get_rx_rf_gain(1)
@ch2_rx_rf_gain.setter
def ch2_rx_rf_gain(self, value):
self._set_rx_rf_gain(1, value)
[docs] def set_rx_lo_internal_external(self, int_ext):
"""Set whether the LO is internal or external"""
return ad9361_swig.ad9361_get_rx_lo_int_ext(self.phy, int_ext)
[docs] def set_tx_lo_internal_external(self, int_ext):
"""Set whether the LO is internal or external"""
return ad9361_swig.ad9361_get_tx_lo_int_ext(self.phy, int_ext)
[docs] def tx_quad_calib(self):
ad9361_swig.ad9361_do_calib(self.phy, ad9361_swig.TX_QUAD_CAL, -1)
[docs] def rf_dc_calib(self):
ad9361_swig.ad9361_do_calib(self.phy, ad9361_swig.RFDC_CAL, -1)
[docs] def recalibrate(self):
self.rf_dc_calib()
time.sleep(0.5)
self.tx_quad_calib()
def _get_rx_data_clk_delay(self):
return self.reg_read(ad9361_swig.REG_RX_CLOCK_DATA_DELAY)
def _get_tx_data_clk_delay(self):
return self.reg_read(ad9361_swig.REG_TX_CLOCK_DATA_DELAY)
@property
def rx_data_delay(self):
"""Get and set the rx data delay amounts. When the sampling clock
changes, these values need to change as well.
"""
data_clk_delay = self._get_rx_data_clk_delay()
return data_clk_delay & 0xF
@rx_data_delay.setter
def rx_data_delay(self, value):
clk_delay = self.rx_clk_delay
writeback = (clk_delay << 4) | value
self.reg_write(ad9361_swig.REG_RX_CLOCK_DATA_DELAY, writeback)
@property
def rx_clk_delay(self):
"""Get and set the rx clk delay amounts. When the sampling clock
changes, these values need to change as well.
"""
data_clk_delay = self._get_rx_data_clk_delay()
return (data_clk_delay & 0xF0) >> 4
@rx_clk_delay.setter
def rx_clk_delay(self, value):
data_delay = self.rx_data_delay
writeback = (value << 4) | data_delay
self.reg_write(ad9361_swig.REG_RX_CLOCK_DATA_DELAY, writeback)
@property
def tx_data_delay(self):
"""Get and set the tx data delay amounts. When the sampling clock
changes, these values need to change as well.
"""
data_clk_delay = self._get_tx_data_clk_delay()
return data_clk_delay & 0xF
@tx_data_delay.setter
def tx_data_delay(self, value):
clk_delay = self.tx_clk_delay
writeback = (clk_delay << 4) | value
self.reg_write(ad9361_swig.REG_TX_CLOCK_DATA_DELAY, writeback)
@property
def tx_clk_delay(self):
"""Get and set the tx clk delay amounts. When the sampling clock
changes, these values need to change as well.
"""
data_clk_delay = self._get_tx_data_clk_delay()
return (data_clk_delay & 0xF0) >> 4
@tx_clk_delay.setter
def tx_clk_delay(self, value):
data_delay = self.tx_data_delay
writeback = (value << 4) | data_delay
self.reg_write(ad9361_swig.REG_TX_CLOCK_DATA_DELAY, writeback)
[docs] def settings_as_dict(self):
"""
Return current transceiver settings as a dict.
"""
attrs = (
'tx_frequency', 'rx_frequency', 'ch1_rx_rf_gain', 'ch2_rx_rf_gain',
'ch1_tx_attenuation', 'ch2_tx_attenuation', 'rf_loopback',
'digital_loopback', 'rx_lo_freq', 'tx_lo_freq', 'tx_rf_bandwidth',
'rx_rf_bandwidth', 'tx_sampling_freq', 'rx_rfdc_tracking',
'rx_bbdc_tracking', 'rx_quad_tracking', 'rx_data_delay',
'rx_clk_delay', 'tx_data_delay', 'tx_clk_delay',
)
settings = {attr: getattr(self, attr) for attr in attrs}
return settings
[docs] def apply_dict_config(self, config):
"""
Apply configuration from a dict.
"""
config = config.copy()
# changing the LO causes other things to happen (the Rx gain can be
# changed automatically) so we need to make sure we apply that first
# check for invalid settings: can't set both LO and rx_frequency, since
# they're really the same thing. If they're both provided rx_frequency
# and tx_frequency will be the values that are actually used.
set_first = 'rx_lo_freq', 'rx_frequency', 'tx_lo_freq', 'tx_frequency'
for attr in set_first:
value = config.pop(attr, None)
if value is not None:
setattr(self, attr, value)
for attr, value in config.items():
if not hasattr(self, attr):
msg = 'Bad configuration for AD9361. No attribute "{}"'
msg = msg.format(attr)
raise ValueError(msg)
setattr(self, attr, value)