Source code for wni.ad9361.ad9361

from __future__ import print_function, division, absolute_import

import logging
import time
from enum import IntEnum
import os

from . import ad9361_swig

import wni.config as config
from wni.gpio import GPIO
from wni.txrx_spi import HMC807

logger = logging.getLogger(__name__)


[docs]class BistMode(IntEnum): DISABLE = 0 INJ_TX = 1 INJ_RX = 2
[docs]class ENSMMode(IntEnum): TX = 0 RX = 1 ALERT = 2 FDD = 3 WAIT = 4 SLEEP = 5 PINCTRL = 6 PINCTRL_FDD_INDEP = 7
def _get_bist_mode(mode): """Convert human-passed BistMode to an enum value""" bist_mode_map = { 'rx': BistMode.INJ_RX, 'tx': BistMode.INJ_TX, 'disable': BistMode.DISABLE, } bist_modes = list(BistMode) if mode in bist_modes: return mode else: # better be a string mode = mode.lower() return bist_mode_map[mode]
[docs]class AD9361(object): """ Implements some functions from C API that can be called from Python. """ # Reference clock, in Hz. REF_CLK = int(40E6) # change the min attenuation if you need to! MIN_ATTENUATION = 0 # throw some flags on the class so we don't have to import anything other # than the class when using this class # dig_tune flags BE_VERBOSE = ad9361_swig.BE_VERBOSE BE_MOREVERBOSE = ad9361_swig.BE_MOREVERBOSE DO_ODELAY = ad9361_swig.DO_ODELAY DO_IDELAY = ad9361_swig.DO_IDELAY MAX_SAMPLING_FREQ = int(61.44e6) # number of times to try tuning digital interface when sampling rate is # changed. _retries = 5 def __init__(self, spidev_name=config.AD9361_SPI): self.spidev_name = spidev_name self.init_param = ad9361_swig.get_init_param() self.tx_fir_config = ad9361_swig.get_tx_fir_config() self.rx_fir_config = ad9361_swig.get_rx_fir_config() self._rf_loopback = False self.__setup_external_hw() self.init()
[docs] def init(self): """Initializes the ad9361, configuring clocks and stuff.""" # initialize the AD9361. This relies on all the C code in this # directory. ad9361_swig.init( self.spidev_name, self.init_param, self.tx_fir_config, self.rx_fir_config ) self.phy = ad9361_swig.get_ad9361_phy() # make the gain control manual so the output is consistent. self.set_rx_gain_control_mode(ad9361_swig.RF_GAIN_MGC) # in ad9361_globals.h, this attribute is set to 2, to skip tuning both # interfaces. This is because at the top speeds, it may have issues # doing the tuning. Now we set skipmode to 0, and the digital # interface will get tuned by setting the tx_sampling_freq. self.phy.pdata.dig_interface_tune_skipmode = 0 # sometimes the first setting is funky; we're INTENTIONALLY setting # tx sampling frequency twice. self.tx_sampling_freq = config.SAMPLE_CLK self.tx_sampling_freq = config.SAMPLE_CLK
def __setup_external_hw(self): """ Get access to the GPIOs that are external to the AD9361 but are used to control the AD9361. """ # Make sure the gpios are unexported before we try to use them. The # process *should* close them whenever it exits, but ya know what, # sometimes things just don't go as planned. # enable RX channel on AD9361 self._rx_enable = GPIO(961, force_export=True) self._rf_loopback_ctrl1 = GPIO(962, force_export=True) self._rf_loopback_ctrl2 = GPIO(963, force_export=True) self.rf_loopback = False # In the ad9361 datasheet this pip is called TXNRX self._tx_enable = GPIO(915, force_export=True) self.enable_tx() self.enable_rx() # sets the LO on RF board if config.HAS_XKU_BOARD: self.hmc0 = HMC807.from_chardev(config.XKU_SPI0) self.hmc0.init_registers() if config.XKU_SPI1 is None: self.hmc1 = None else: self.hmc1 = HMC807.from_chardev(config.XKU_SPI1) self.hmc1.init_registers() else: self.hmc0 = None self.hmc1 = None @property def tx_frequency(self): """The transmit center frequency of the radar, taking into account a UDC if it is present. """ ad9361_freq = self.tx_lo_freq if self.hmc0 is not None: return self.hmc0.frequency_mult * 100E6 - ad9361_freq else: return float(ad9361_freq) @tx_frequency.setter def tx_frequency(self, freq): if self.hmc0 is not None: desired = self.hmc0.frequency_mult * 100E6 - freq else: desired = freq self.tx_lo_freq = int(round(desired)) @property def rx_frequency(self): """The receive center frequency of the radar, taking into account a UDC if it is present. """ ad9361_freq = self.rx_lo_freq if self.hmc0 is not None: return self.hmc0.frequency_mult * 100E6 - ad9361_freq else: return float(ad9361_freq) @rx_frequency.setter def rx_frequency(self, freq): if self.hmc0 is not None: desired = self.hmc0.frequency_mult * 100E6 - freq else: desired = freq self.rx_lo_freq = int(round(desired)) def _set_loopback(self, gpio_ctrl, enable): """Sets RF loopback on the TXRX board. Args: gpio_ctrl (GPIO): gpio control to tx_sw_ctrl[1 or 2] on txrx board enable (bool): if True, enable loopback; else enable normal operation """ direction = 'high' if enable else 'low' gpio_ctrl.direction = direction @property def rf_loopback(self): return self._rf_loopback @rf_loopback.setter def rf_loopback(self, val): if not (val == 0 or val == 1): msg = 'rf_loopback must be set to either True or False, not "%s"' % val raise ValueError(msg) self._set_loopback(self._rf_loopback_ctrl1, val) self._set_loopback(self._rf_loopback_ctrl2, val) self._rf_loopback = val @property def digital_loopback(self): return bool(self.mask_read(ad9361_swig.REG_OBSERVE_CONFIG, 1)) @digital_loopback.setter def digital_loopback(self, val): if not (val == 0 or val == 1): msg = 'digital_loopback must be set to either True or False, not "%s"' % val raise ValueError(msg) self.mask_write(ad9361_swig.REG_OBSERVE_CONFIG, 1, int(val))
[docs] def disable_rx(self): """Disable rx chain by setting GPIO low.""" self._rx_enable.direction = 'low'
[docs] def disable_tx(self): """Disable rx chain by setting GPIO low.""" self._tx_enable.direction = 'low'
[docs] def enable_rx(self): """Disable rx chain by setting GPIO low.""" self._rx_enable.direction = 'high'
[docs] def enable_tx(self): """Disable rx chain by setting GPIO low.""" self._tx_enable.direction = 'high'
[docs] def reg_read(self, reg): """Read the specified register.""" return ad9361_swig.spi_read(reg)
[docs] def reg_write(self, reg, val): """Write `val` at `reg` address.""" return ad9361_swig.spi_write(reg, val)
[docs] def set_tx_fir_config(self): return ad9361_swig.ad9361_set_tx_fir_config(self.phy, self.tx_fir_config)
[docs] def set_rx_fir_config(self): return ad9361_swig.ad9361_set_rx_fir_config(self.phy, self.rx_fir_config)
[docs] def set_rx_gain_control_mode(self, mode): """Sets the gain control mode for both channels.""" ad9361_swig.ad9361_set_rx_gain_control_mode(self.phy, 0, mode) ad9361_swig.ad9361_set_rx_gain_control_mode(self.phy, 1, mode)
@property def _bbpll_integer_frequency_word(self): """This word represents floor(BBPLL_FREQ / REF_CLOCK_FREQ) See page 20 of the register map manual. """ return self.reg_read(0x44) @property def _bbpll_fractional_frequency_word(self): """ This word represents floor((BBPLL_FREQ/REF_CLK_FREQ - floor(BBPLL_FREQ / REF_CLK_FREQ)) * 2088960) """ # least to most significant bytes least = self.reg_read(0x43) mid = self.reg_read(0x42) most = self.reg_read(0x41) joined = (most << 16) | (mid << 8) | least return joined @property def bbpll_freq(self): """ Calculated based on equations on pg 20 of the AD9361 register map manual. """ N_integer = self._bbpll_integer_frequency_word N_fractional = self._bbpll_fractional_frequency_word return self.REF_CLK * (N_integer - N_fractional / 2088960) @property def bist_loopback(self): return ad9361_swig.get_bist_loopback() @bist_loopback.setter def bist_loopback(self, mode): return ad9361_swig.bist_loopback(mode)
[docs] def bist_prbs(self, mode='Rx'): mode = _get_bist_mode(mode) ad9361_swig.bist_prbs(mode)
@property def tx_lo_freq(self): """Get or set the current transmit LO frequency (Hz)""" return ad9361_swig.ad9361_get_tx_lo_freq(self.phy)[1] @tx_lo_freq.setter def tx_lo_freq(self, freq_Hz): freq_Hz = int(round(freq_Hz)) return ad9361_swig.ad9361_set_tx_lo_freq(self.phy, freq_Hz) @property def rx_lo_freq(self): """Get or set the current receive LO frequency (Hz)""" return ad9361_swig.ad9361_get_rx_lo_freq(self.phy)[1] @rx_lo_freq.setter def rx_lo_freq(self, freq_Hz): freq_Hz = int(round(freq_Hz)) return ad9361_swig.ad9361_set_rx_lo_freq(self.phy, freq_Hz) def _get_tx_attenuation(self, channel): """Return tx attenuation in dBm""" return ad9361_swig.ad9361_get_tx_attenuation(self.phy, channel)[1] def _set_tx_attenuation(self, channel, attenuation_mdb): """Return tx attenuation for channel in dBm""" if attenuation_mdb < self.MIN_ATTENUATION: attenuation_mdb = self.MIN_ATTENUATION return ad9361_swig.ad9361_set_tx_attenuation(self.phy, channel, attenuation_mdb) @property def ch1_tx_attenuation(self): """Get and set channel 1 tx attenuation in dBm""" return self._get_tx_attenuation(0) @ch1_tx_attenuation.setter def ch1_tx_attenuation(self, value): return self._set_tx_attenuation(0, value) @property def ch2_tx_attenuation(self): """Get and set channel 2 tx attenuation in dBm""" return self._get_tx_attenuation(1) @ch2_tx_attenuation.setter def ch2_tx_attenuation(self, value): return self._set_tx_attenuation(1, value) @property def tx_rf_bandwidth(self): """Get the tx bandwidth in Hz.""" return ad9361_swig.ad9361_get_tx_rf_bandwidth(self.phy)[1] @tx_rf_bandwidth.setter def tx_rf_bandwidth(self, bandwidth_Hz): """Sets the tx bandwidth in Hz. The tx bandwidth will be set to the nearest value it can be. """ return ad9361_swig.ad9361_set_tx_rf_bandwidth(self.phy, bandwidth_Hz) @property def rx_rf_bandwidth(self): return ad9361_swig.ad9361_get_rx_rf_bandwidth(self.phy)[1] @rx_rf_bandwidth.setter def rx_rf_bandwidth(self, bandwidth_Hz): return ad9361_swig.ad9361_set_rx_rf_bandwidth(self.phy, bandwidth_Hz) @property def tx_sampling_freq(self): """Get or set the tx sampling frequency. Warning: The sample clock from the AD9361 goes into the FPGA. Setting the sampling frequency must always be accompanied by resetting the sysfs file fir_aresetn. """ ret, freq = ad9361_swig.ad9361_get_tx_sampling_freq(self.phy) if ret != 0: raise ValueError('tx sampling frequency could not be gotten.') return freq @tx_sampling_freq.setter def tx_sampling_freq(self, freq_Hz): if freq_Hz > self.MAX_SAMPLING_FREQ: msg = "Maximum sampling frequency is {:.2f} MHz; received {} MHz" msg = msg.format(self.MAX_SAMPLING_FREQ / 1e6, freq_Hz / 1e6) raise ValueError(msg) freq_Hz = int(freq_Hz) ad9361_swig.ad9361_set_tx_sampling_freq(self.phy, freq_Hz) for _ in range(self._retries): ret = self.dig_tune(0) if ret == 0: break else: msg = 'could not tune sampling frequency to {:.3f} MHz' raise RuntimeError(msg.format(freq_Hz / 1e6,)) @property def tx_auto_cal(self): """Get or set tx auto cal state""" return ad9361_swig.ad9361_get_tx_auto_cal_en_dis(self.phy)[1] @tx_auto_cal.setter def tx_auto_cal(self, val): val = int(bool(val)) ad9361_swig.ad9361_set_tx_auto_cal_en_dis(self.phy, val) @property def rx_rfdc_tracking(self): """Get or set tx auto cal state""" return ad9361_swig.ad9361_get_rx_rfdc_track_en_dis(self.phy)[1] @rx_rfdc_tracking.setter def rx_rfdc_tracking(self, val): val = int(bool(val)) ad9361_swig.ad9361_set_rx_rfdc_track_en_dis(self.phy, val) @property def rx_bbdc_tracking(self): """Get or set tx auto cal state""" return ad9361_swig.ad9361_get_rx_bbdc_track_en_dis(self.phy)[1] @rx_bbdc_tracking.setter def rx_bbdc_tracking(self, val): val = int(bool(val)) ad9361_swig.ad9361_set_rx_bbdc_track_en_dis(self.phy, val) @property def rx_quad_tracking(self): """Get or set tx auto cal state""" return ad9361_swig.ad9361_get_rx_quad_track_en_dis(self.phy)[1] @rx_quad_tracking.setter def rx_quad_tracking(self, val): val = int(bool(val)) ad9361_swig.ad9361_set_rx_quad_track_en_dis(self.phy, val) def _adjust_signal_delays(self, frequency): """Adjusts rx and tx data and clk signals and delays to the values they need to be based on the selected sampling frequency. """ if frequency < 15E6: msg = 'Attempting to set the signal delays for a value less ' msg += 'than 15 MHz. No guarantees.' logger.warn(msg) # let the previous check fall through to here on purpose; in other # words this is _not_ supposed to be an `elif` if frequency <= 20E6: self.rx_data_delay = 0xF elif frequency <= 25E6: self.rx_data_delay = 0xC elif frequency <= 30E6: self.rx_data_delay = 0x9 else: msg = 'Attempting to set the signal delays for a value ' msg += 'greater than 30 MHz. No guarantees.' logger.warn(msg) self.rx_data_delay = 0x9
[docs] def mask_write(self, reg, mask, value): current = self.reg_read(reg) new = (value & mask) | (current & ~mask) self.reg_write(reg, new)
[docs] def mask_read(self, reg, mask): current = self.reg_read(reg) masked = current & mask return masked
[docs] def bist_tone(self, freq=1, dB=0, mask=0, mode='rx'): """Run the built-in self-test with some defaults.""" freq = round(int(freq)) mode = _get_bist_mode(mode) ad9361_swig.set_bist_tone(mode, freq, dB, mask)
[docs] def bist_disable(self): ad9361_swig.set_bist_tone(BistMode.DISABLE, 0, 0, 0)
def _axi_setup(self): """ Ensure that the AXI registers are set up in the correct way. """ os.system('env -i bash /home/root/wni/sw/scripts/axi_ad9361_setup.sh')
[docs] def dig_tune(self, max_freq=False, flags=BE_VERBOSE | BE_MOREVERBOSE): """Tune the AD9361 digital interface. if max_freq == False, the current tx sampling frequency is the only frequency that will be tuned. Otherwise, all frequencies will be stepped through. """ # swig expects an integer max_freq = int(max_freq) ret = ad9361_swig.ad9361_dig_tune(self.phy, max_freq, flags) # tuning the digital interface causes issues. self._axi_setup() return ret
[docs] def reset(self): """Resets the ad9361 using GPIO if possible""" return ad9361_swig.ad9361_reset(self.phy)
@property def ensm_mode(self): return ENSMMode(ad9361_swig.ad9361_get_en_state_machine_mode(self.phy)[1]) @ensm_mode.setter def ensm_mode(self, mode): return ad9361_swig.ad9361_set_en_state_machine_mode(self.phy, mode) def _get_rx_rf_gain(self, channel): """Get the gain of the specified channel in dB""" errcode, gain = ad9361_swig.ad9361_get_rx_rf_gain(self.phy, channel) return gain def _set_rx_rf_gain(self, channel, gain): """Sets the gain of the specified channel in dB""" ret = ad9361_swig.ad9361_set_rx_rf_gain(self.phy, channel, gain) return ret @property def ch1_rx_rf_gain(self): """Get and set channel 1 rx rf gain """ gain = self._get_rx_rf_gain(0) return gain @ch1_rx_rf_gain.setter def ch1_rx_rf_gain(self, value): self._set_rx_rf_gain(0, value) @property def ch2_rx_rf_gain(self): """Get and set channel 2 rx rf gain """ return self._get_rx_rf_gain(1) @ch2_rx_rf_gain.setter def ch2_rx_rf_gain(self, value): self._set_rx_rf_gain(1, value)
[docs] def set_rx_lo_internal_external(self, int_ext): """Set whether the LO is internal or external""" return ad9361_swig.ad9361_get_rx_lo_int_ext(self.phy, int_ext)
[docs] def set_tx_lo_internal_external(self, int_ext): """Set whether the LO is internal or external""" return ad9361_swig.ad9361_get_tx_lo_int_ext(self.phy, int_ext)
[docs] def tx_quad_calib(self): ad9361_swig.ad9361_do_calib(self.phy, ad9361_swig.TX_QUAD_CAL, -1)
[docs] def rf_dc_calib(self): ad9361_swig.ad9361_do_calib(self.phy, ad9361_swig.RFDC_CAL, -1)
[docs] def recalibrate(self): self.rf_dc_calib() time.sleep(0.5) self.tx_quad_calib()
def _get_rx_data_clk_delay(self): return self.reg_read(ad9361_swig.REG_RX_CLOCK_DATA_DELAY) def _get_tx_data_clk_delay(self): return self.reg_read(ad9361_swig.REG_TX_CLOCK_DATA_DELAY) @property def rx_data_delay(self): """Get and set the rx data delay amounts. When the sampling clock changes, these values need to change as well. """ data_clk_delay = self._get_rx_data_clk_delay() return data_clk_delay & 0xF @rx_data_delay.setter def rx_data_delay(self, value): clk_delay = self.rx_clk_delay writeback = (clk_delay << 4) | value self.reg_write(ad9361_swig.REG_RX_CLOCK_DATA_DELAY, writeback) @property def rx_clk_delay(self): """Get and set the rx clk delay amounts. When the sampling clock changes, these values need to change as well. """ data_clk_delay = self._get_rx_data_clk_delay() return (data_clk_delay & 0xF0) >> 4 @rx_clk_delay.setter def rx_clk_delay(self, value): data_delay = self.rx_data_delay writeback = (value << 4) | data_delay self.reg_write(ad9361_swig.REG_RX_CLOCK_DATA_DELAY, writeback) @property def tx_data_delay(self): """Get and set the tx data delay amounts. When the sampling clock changes, these values need to change as well. """ data_clk_delay = self._get_tx_data_clk_delay() return data_clk_delay & 0xF @tx_data_delay.setter def tx_data_delay(self, value): clk_delay = self.tx_clk_delay writeback = (clk_delay << 4) | value self.reg_write(ad9361_swig.REG_TX_CLOCK_DATA_DELAY, writeback) @property def tx_clk_delay(self): """Get and set the tx clk delay amounts. When the sampling clock changes, these values need to change as well. """ data_clk_delay = self._get_tx_data_clk_delay() return (data_clk_delay & 0xF0) >> 4 @tx_clk_delay.setter def tx_clk_delay(self, value): data_delay = self.tx_data_delay writeback = (value << 4) | data_delay self.reg_write(ad9361_swig.REG_TX_CLOCK_DATA_DELAY, writeback)
[docs] def settings_as_dict(self): """ Return current transceiver settings as a dict. """ attrs = ( 'tx_frequency', 'rx_frequency', 'ch1_rx_rf_gain', 'ch2_rx_rf_gain', 'ch1_tx_attenuation', 'ch2_tx_attenuation', 'rf_loopback', 'digital_loopback', 'rx_lo_freq', 'tx_lo_freq', 'tx_rf_bandwidth', 'rx_rf_bandwidth', 'tx_sampling_freq', 'rx_rfdc_tracking', 'rx_bbdc_tracking', 'rx_quad_tracking', 'rx_data_delay', 'rx_clk_delay', 'tx_data_delay', 'tx_clk_delay', ) settings = {attr: getattr(self, attr) for attr in attrs} return settings
[docs] def apply_dict_config(self, config): """ Apply configuration from a dict. """ config = config.copy() # changing the LO causes other things to happen (the Rx gain can be # changed automatically) so we need to make sure we apply that first # check for invalid settings: can't set both LO and rx_frequency, since # they're really the same thing. If they're both provided rx_frequency # and tx_frequency will be the values that are actually used. set_first = 'rx_lo_freq', 'rx_frequency', 'tx_lo_freq', 'tx_frequency' for attr in set_first: value = config.pop(attr, None) if value is not None: setattr(self, attr, value) for attr, value in config.items(): if not hasattr(self, attr): msg = 'Bad configuration for AD9361. No attribute "{}"' msg = msg.format(attr) raise ValueError(msg) setattr(self, attr, value)