wni

Submodules

wni.actuator

Control the actuator from Python! Wow.

class wni.actuator.Actuator(idVendor=1240, idProduct=64607)[source]

Bases: object

Control Firgelli USB from Microzed

Call Actuator.position to get the position reported by the actuator directly, or Actuator.cached_position to get the cached position. It takes about 100 ms to talk to the hardware, so the caching is necessary.

ACCURACY = 20
EXTEND_LIMIT = 200
ID_PRODUCT = 64607
ID_VENDOR = 1240
RETRACT_LIMIT = 150
SPEED = 100
accuracy
extend_limit
get_feedbacks(n)[source]
position
retract_limit
set_defaults()[source]
speed
write(addr, value)[source]
class wni.actuator.FakeActuator(*args, **kwargs)[source]

Bases: object

ACCURACY = 20
EXTEND_LIMIT = 200
RETRACT_LIMIT = 150
SPEED = 100

wni.axi

Python interface for AXI IP blocks on the FPGA

class wni.axi.MmapRegion(base, size)[source]

Bases: object

read_bytes(addr, size)[source]
read_int(addr)

Returns a 32-bit signed integer at addr.

read_int16(addr)[source]

Returns a 16-bit signed integer at addr.

read_int32(addr)[source]

Returns a 32-bit signed integer at addr.

read_int8(addr)[source]

Returns an 8-bit signed integer at addr.

read_uint(addr)

Returns a 32-bit unsigned integer at addr.

read_uint16(addr)[source]

Returns a 16-bit unsigned integer at addr.

read_uint32(addr)[source]

Returns a 32-bit unsigned integer at addr.

read_uint8(addr)[source]

Returns an 8-bit unsigned integer at addr.

write_bytes(addr, bytes)[source]
write_int(addr, value)

Writes a 32-bit signed integer value to addr

write_int16(addr, value)[source]

Writes a 16-bit signed integer value to addr

write_int32(addr, value)[source]

Writes a 32-bit signed integer value to addr

write_int8(addr, value)[source]

Writes an 8-bit signed integer value to addr

write_uint(addr, value)

Writes a 32-bit unsigned integer value to addr

write_uint16(addr, value)[source]

Writes a 16-bit unsigned integer value to addr

write_uint32(addr, value)[source]

Writes a 32-bit unsigned integer value to addr

write_uint8(addr, value)[source]

Writes an 8-bit unsigned integer value to addr

wni.axi.float_to_int12(x)[source]

Change a float (between between -1 and 1) to an int12.

wni.axi.int12_to_float(x)[source]

wni.az_el

class wni.az_el.AzElController(recv_timeout=-1)[source]

Bases: object

Provides abstraction layer over the motor and actuator.

acceleration
degrees_per_second
elevation
go_home()[source]
home_angle
is_spinning
point
spin()[source]
stop()[source]

wni.config

wni.data

Classes and functions for working with WeatherNews data.

class wni.data.IQData(radar_data, waveforms=None, scaninfo=())[source]

Bases: object

Parse data from the FPGA into a Python object.

calc_doppler_velocity(channel, wavelength=0.03179)[source]

Calculate the doppler velocity

calc_reflectivity(channel, cal=0, range_correct=True)[source]

Calculate the reflectity.

doppler_spectrum(n=32, normalize=False)[source]

Return the Doppler spectrum of the data.

doppler_spectrum_windowed(n=32, normalize=False)[source]
classmethod fromfile(filename, *args, **kwargs)[source]
iq
savemat(fname)[source]

Dumps iq (unfiltered) data to a format readable by Matlab.

class wni.data.IQDataReader(fname, has_header=True)[source]

Bases: object

Iterate over a file dumped by :func:wni.processees.data_dumper.

Yields a :class:IQData instance for every iteration.

close()[source]
parse_radial_at(pos=None)[source]

Parse radial at the specified offset in the memorymapped file. If pos is None, parse the radial at self._current_offset

class wni.data.IQFileHeader(bytes)[source]

Bases: object

Parse header data dumped by wni.processes.data_dumper().

classmethod fromfile(fname)[source]
class wni.data.Packet(packet_data)[source]

Bases: object

Parse a single packet in the format it is in coming from the FPGA to the PS. A packet consists of a packet header and I/Q data.

class wni.data.PacketHeader(data: bytes)[source]

Bases: object

Packet header data; includes info about scan setup.

struct_format = '4sIQ12I'
class wni.data.Radial(packets, az, el)

Bases: tuple

az

Alias for field number 1

el

Alias for field number 2

packets

Alias for field number 0

wni.data.nextpow2(x)[source]

Return the next power of 2 greater than or equal to x.

wni.data.parse_radials(data, nradials=1)[source]

Parse specified number of radials from data.

Parameters:
  • data – the buffer of radar data.
  • nradials – the number of radials to return. If the number of radials in the data set is less than nradials, an exception is raised.
Returns:

tuple containing:

radials: a list of Radials. bytes_read: bytes read from data.

Return type:

(tuple)

wni.data.unpackb(packed, object_hook=None, list_hook=None, bool use_list=True, bool raw=True, encoding=None, unicode_errors=None, object_pairs_hook=None, ext_hook=ExtType, Py_ssize_t max_str_len=2147483647, Py_ssize_t max_bin_len=2147483647, Py_ssize_t max_array_len=2147483647, Py_ssize_t max_map_len=2147483647, Py_ssize_t max_ext_len=2147483647)

Unpack packed_bytes to object. Returns an unpacked object.

Raises ValueError when packed contains extra bytes.

See Unpacker for options.

wni.data_client

Open client connection to the data servers.

class wni.data_client.DataClient(connect_addr, recv_timeout=1000, send_timeout=1000)[source]

Bases: object

Sends requests to the system, asking for moment data.

Call the method latest to receive only the most recent data from the server. Call the method all to receive all the data that the client has not yet received.

all()[source]

Request all data on the server that the client has not received yet.

Returns a list of radial data.

all_bytes()[source]

Request all data on the server that the client has not received yet.

Does not decode the response; the response is the bytes exactly returned by the data server.

Returns a tuple of (num_radials, raw_bytes).

close()[source]

Closes the underlying connection to the data server

decode(response)[source]

Decode response and return the Python objects represented.

latest()[source]

Returns the most recent radial data. If no data is available, returns None

next_n(n)[source]

Return the next n not yet received radials.

recv_timeout

recv timeout in ms

sync()[source]

Tell the data server that we don’t want any data older than the current point in time.

exception wni.data_client.DataClientError[source]

Bases: Exception

class wni.data_client.DataClients(ch1_addr, ch2_addr, recv_timeout=1000, send_timeout=1000)[source]

Bases: object

Like DataClient, but makes requests of both channels each time.

all()[source]
all_bytes()[source]

Request all data on the server that the client has not received yet.

Does not decode the response; the response is the bytes exactly returned by the data server.

close()[source]
decode(response)

Decode response and return the Python objects represented.

latest()[source]
sync()[source]

wni.datautil

Utilities for scripting with the data produced by the WNI radar.

class wni.datautil.DataClientRadialIter(addr='tcp://None:None', timeout=3000)[source]

Bases: wni.datautil.RadialIter

Make requests for data using a wni.data_client.DataClient.

get_next_radial()[source]

Return the next radial. If there is no data left, return None.

class wni.datautil.MsgpackIter(unpacker)[source]

Bases: object

Iterates over a msgpack.Unpacker object

class wni.datautil.MsgpackRadialIter(unpacker)[source]

Bases: wni.datautil.RadialIter

Return an object that can iterate over radials.

classmethod from_file(path)[source]
get_next_radial()[source]

Return the next radial. If there is no data left, return None.

tell()[source]

Tell bytes offset in file.

class wni.datautil.NanomsgSubRadialIter(addr='tcp://None:None', timeout=3000)[source]

Bases: wni.datautil.RadialIter

Listens on a Nanomsg Subscribe socket and provides radials as they are ready.

Uses a nanomsg.SUBSCRIBE socket; for live data, you should use the NanomsgRadialIter class.

get_next_radial()[source]

Return the next radial. If there is no data left, return None.

class wni.datautil.PPISelector(fname)[source]

Bases: object

Select PPIs from a data file.

class wni.datautil.PerScanRadialIter(radial_iterator)[source]

Bases: object

Iterate radials within a scan. There is no public constructor for this class; an instance is returned from ScanIter.__next__

ppi_iter()[source]

Iterate over PPIs of the scan.

class wni.datautil.RadialIter[source]

Bases: abc.ABC

ABC for iterating over radials produced by the pulse processor. Subclasses must provide a method “get_next_radial” for providing data.

get_next_ppi(nradials=4000, scan_id=None)[source]

Return a list of radials associated with a single ppi. If the scan id changes while this function is running, the function will return a partial PPI.

If nradials radials are yielded before a PPI is complete, yield nradials, regardless of whether they complete a PPI.

If scan_id is passed and is not None, the function will return None if the first radial’s scan id does not match scan_id.

get_next_radial()[source]

Return the next radial. If there is no data left, return None.

iterate_scan()[source]

Yield radials from the current scan.

peek()[source]

Examine the next value without consuming the iterator. Multiple calls to this function return the same value.

returns None if there is no more data.

class wni.datautil.ScanIter(radial_iterator: wni.datautil.RadialIter, info_sock: nanomsg.Socket)[source]

Bases: object

ScanIter allows the following pattern:

radial_iterator = NanomsgReqRadialIter(addr=...)
for scan_settings, special_radial_iter in ScanIter():
    # special radial_iter will yield radials until it is dones
    # do something with scan_settings, e.g. open a new file
    for radial in special_radial_iter:
        # do something with each radial

    # if you want to work with ppis, use this for loop instead of the
    # above one:
    # for scan_id, ppi in special_radial_iter.ppi_iter():
    #     # do something cool with each ppi
wni.datautil.dt_from_str(string)[source]

Return datetime object from string formatted correctly

wni.datautil.dump_hdf(data, hdf, packer=<function pack_hdf5_dataset>, *args, **kwargs)[source]

Adds keys of given dict as groups and values as datasets to the given hdf-file (by string or object) or group object.

Parameters:
  • data (dict) – The dictionary containing only string keys and data values or dicts again.
  • hdf (string (path to file) or h5py.File() or h5py.Group()) –
  • packer (callable) – Callable gets hdfobject, key, value as input. hdfobject is considered to be either a h5py.File or a h5py.Group. key is the name of the dataset. value is the dataset to be packed and accepted by h5py.
Returns:

hdfh5py.Group() or h5py.File() instance

Return type:

obj

wni.datautil.extract_keys(key, list_of_dicts, replace_none=())[source]

Extract key from every dict in a list of dicts.

wni.datautil.get_scan_metadata(radial)[source]

Return scan metadata from radial.

(Basically returns the same dict with the data keys removed)

wni.datautil.get_unpacker(fname)[source]

Return a msgpack.Unpacker which can read the data saved by pulse processor. If fname is a string or bytes, a file will be opened for it. Otherwise, fname is treated as a file opened in binary read-only mode.

wni.datautil.load_hdf(hdf, unpacker=<function unpack_dataset>, *args, **kwargs)[source]

Returns a dictionary containing the groups as keys and the datasets as values from given hdf file.

Parameters:
  • hdf (h5py.File() or h5py.Group()) –
  • upacker (callable) – Unpack function gets value of type h5py.Dataset. Must return the data you would like to have it in the returned dict.
Returns:

d – The dictionary containing all groupnames as keys and datasets as values.

Return type:

dict

wni.datautil.pack_hdf5_dataset(hdfobject, key, value)[source]

Packs a given key value pair into a dataset in the given hdfobject.

wni.datautil.pack_radials(radials)[source]

Return msgpacked bytes in the same format produced by the client applications.

wni.datautil.packb(o, **kwargs)

Pack object o and return packed bytes

See Packer for options.

wni.datautil.radials_to_dict(radials: list, scan_set=0)[source]

Return a consolidated dict of radials.

Parameters:
  • radials – the list of radials, as returned from a DataClient.
  • scan_set – Index into each radial within radials.
wni.datautil.read_dumped_data(fname)[source]

Reads a data file created by the script wni.scripts.dump_ppi.

Returns a dict with two keys: "scan_info" and "data"). scan_info is a nested dict, corresponding to all scan settings for the PPI. data is a dict of scan setting sets. The keys of data are “set0”, “set1”, … “set[n]”, for n == the number of configured scan settings. If the radar is using a single prt and waveform, “set0” will likely be the only set available.

wni.datautil.remove_none(waveforms)[source]
wni.datautil.skip_to_time(time, unpacker)[source]

Skip to the radial that occurs after the specified time.

wni.datautil.unpack_dataset(item)[source]

Reconstruct a hdfdict dataset.

Parameters:item (h5py.Dataset) –
Returns:value
Return type:Unpacked Data
wni.datautil.unpackb(packed, object_hook=None, list_hook=None, bool use_list=True, bool raw=True, encoding=None, unicode_errors=None, object_pairs_hook=None, ext_hook=ExtType, Py_ssize_t max_str_len=2147483647, Py_ssize_t max_bin_len=2147483647, Py_ssize_t max_array_len=2147483647, Py_ssize_t max_map_len=2147483647, Py_ssize_t max_ext_len=2147483647)

Unpack packed_bytes to object. Returns an unpacked object.

Raises ValueError when packed contains extra bytes.

See Unpacker for options.

wni.datautil.write_ppi(f, ppi, file_format)[source]
wni.datautil.write_scan_settings(f, scan_settings, file_format)[source]

wni.derpy

This module provides a NanoClient and NanoServer that can be used for testing.

class wni.derpy.Derp(*args, **kwargs)[source]

Bases: object

prop
raises_attribute_error()[source]
raises_name_error()[source]
class wni.derpy.DerpClient(addr='tcp://192.168.1.1:2121')[source]

Bases: wni.util.NanoClient

wni.derpy.serve(addr='tcp://192.168.1.1:2121')[source]

wni.gpio

class wni.gpio.GPIO(number, force_export=False)[source]

Bases: object

Provides an interface to /sys/class/gpio

PATH = '/sys/class/gpio'
close()[source]
direction
value

wni.nanolog

A Nanomsg log handler for Python. NanomsgHandler publishes JSON logs to the specified address. It’s up to something else to display those logs.

class wni.nanolog.NanomsgHandler(connect_addr=None)[source]

Bases: logging.Handler

A handler class which logs messages to a specified nanomsg host and port.

Publishes the log message as utf-8 encoded JSON string. To receive the message, the receiver decode the message:

# assuming you already have a suitable socket bytes = sock.recv() json_str = bytes.decode(‘utf-8’) dict = json.loads(json_str) # do whatever you want with the dict
create_socket()[source]
emit(record)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

format_record(record)[source]

Return formatted bytes ready to be sent.

send(msg)[source]

Publish the message.

msg (bytes): the formatted log message to send.

wni.nanolog.color_print(msg, level, file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]
wni.nanolog.fixup_exc_info(d)[source]

Put exception information into the msg of the dict.

wni.nanolog.nanologger(sub_addr=None, pub_addr=None, log_filename='/home/pier3595/log/wni_log.json')[source]

Receives logs and prints the message as a dict to stderr and to the specified file.

To do something else with the received dict, you can subclass this class and override the handle_messsage method.

wni.processes

Contains all (maybe all) of the processes that are controlled with nanomsg sockets.

class wni.processes.NanoProcessManager(addr, recv_timeout=1000, force=False, check_interval=10)[source]

Bases: wni.processes.ProcessManager

A ProcessManager that can be controlled and queried via nanomsg.

COMMANDS = ['pid', 'start', 'stop', 'restart', 'status', 'cmd', 'cmdline']
close()[source]

Close the socket associated with the process manager

handle_meta_command(cmd, args)[source]

Responds to a command that changes stuff about this class

obey(data)[source]

Respond to incoming commands.

recv()[source]

listen for incoming commands.

recv_and_resp()[source]
send(msg)[source]

Send response.

serve()[source]
class wni.processes.NanoProcessManagerClient(addr, recv_timeout=1000)[source]

Bases: object

Provides an interface to a NanoProcessManager in Python.

is_managed(name)[source]

Return True if the process with given name is managed by the ProcesssManager.

manager_is_running

Detect if the process manager is running.

process_is_running(process)[source]
processes

Return a list of strings of the names of all the running processes

restart(process)[source]

Restart a process.

If the process is unknown, an UnknownProcessError is raised.

start(process)[source]

Start a previously-stopped process.

If the process is already running, a ProcessStateError is raised.

status(process)[source]
stop(process)[source]

Stop a running process.

If the process is uknown to the ProcessManager, UnknownProcessError is raised.

If the process was already stopped, ProcessStateError is raised.

exception wni.processes.OutOfSyncError[source]

Bases: Exception

This gets raised if ProcessManager gets out of sync with the processes it should know about.

exception wni.processes.ProcManError[source]

Bases: Exception

This gets raised if ProcessManager is told to start a process without giving enough information it should know about.

class wni.processes.ProcessManager(check_interval=10)[source]

Bases: object

This class starts and manages processes that are started by the script wni_process.

This class is modeled after Linux services, and knows how to respond to the commands “start”, “stop”, “restart”, and “status”.

close()[source]
is_running(name)[source]
list_processes(verbose=False)[source]
manage(processes)[source]

Adds processes to the list of running processes. processes is a list of processes to start; each element of processes is a list of arguments passed to psutil.Popen

process_iter()[source]

Yield processes that are managed by this class.

restart(name)[source]
restart_all()[source]

Restart all managed processes

revive_dead_children()[source]

Called from serve() approximately every check_interval seconds. Revive processes that died and need to be revived.

start(name, *args)[source]

Start a process and make sure it keeps running.

A ProcessManager will not allow multiple processes with identical name`s to be started. The `name is just a string, and is not necessarily related to the actual command line arguments.

status(name)[source]

Return a string indicating the status of the process associated with name

stop(name)[source]
stop_all()[source]
watch_processes_forever()[source]

Checks on its child processes eternally, polling them every self.check_interval seconds.

exception wni.processes.ProcessStateError[source]

Bases: Exception

exception wni.processes.UnknownProcessError[source]

Bases: Exception

wni.scan_config

Set up scan configuration. The hierarchy presented here reflects the hierarchy in the FPGA and the kernel driver.

  • ScanConf: This is the top-level object. Configures and reads top-level settings from the FPGA. Contains reference to the two channels of the AD9361 as the list ch, or individually as ch1 and ch2.
  • ChannelConfig: Per-channel configuration settings. Each channel contains a list of ScanConfSet called sets. Many settings can be set on the sets, but a few are per-channel: fir_enable
class wni.scan_config.BoolAttr(path)[source]

Bases: wni.scan_config.IntAttr

Sysfs file that represents a Boolean as a 0 or 1

class wni.scan_config.ChannelConfig(scan_conf, channel_id)[source]

Bases: object

Holder of all channel configuration settings.

sets

ScanSets instance

scan_start_set

Index (within sets) of the first scan set.

fir_len

The number of taps of the FIR filter in the FPGA

fir_enable

Whether the Rx data goes through the decimating FIR filter in the FPGA

fir_aresetn

Active low reset to the FIR filters in the FPGA.

num_sets

number of scan sets possible

num_used_sets

number of scan sets used by the current scan setup

bytes_read

Number of rx data bytes read by userspace

bytes_written

Number of rx data bytes written by the FPGA

overflow

Number of CPIs lost because userspace couldn’t keep up

scan_settings_errormessage

a string describing errors with the current scan setup

scheduler_mode

the mode of operation of the scheduler. The scheduler can either do its sequence N times (‘run_n’) or it can run as long as the input trigger is high (‘run_while_enabled’)

enable_packet_tagging

whether packet tagging is enabled. Don’t change this

fir_normalizer

the scaler applied to the Rx data in the FPGA

nloops

Number of times to run the sequence, if scheduler_mode == ‘run_n’

samples_per_dma_transfer

Number of bytes transferred per DMA transfer

apply_dict_config(config)[source]

Apply configuration to a channel from a dict.

The keys are channel attributes, and the values are the values they will be set to. The keys matching the regular expression “setd+” are special. They configure scan settings. The key set* applies scan settings to all scan sets. Individual scan set settings will override set* settings.

available_scheduler_modes
bytes_per_loop

Get/set sysfs attribute as an int

bytes_read

Get/set sysfs attribute as an int

bytes_written

Get/set sysfs attribute as an int

enable_packet_tagging

Sysfs file that represents a Boolean as a 0 or 1

fir_aresetn

Get/set sysfs attribute as an int

fir_enable

Get/set sysfs attribute as an int

fir_get_coef(normalized=True)[source]

Return the complex array of FPGA coefficients

Parameters:normalized – If True (the default), the coefficients are scaled between [-1, 1]. Otherwise, the Int16 representation is returned
fir_len

Get/set sysfs attribute as an int

fir_normalizer

Get/set sysfs attribute as an int

fir_set_bpf(numtaps=120, start=0, stop=3, window='hamming', nyq=15)[source]

Set the FPGA FIR filter to use a bandpass filter. If start == 0, a low-pass filter is created. If stop == 0, a high-pass filter is created. Otherwise, a band-pass filter is created.

fir_set_coef(coef, normalized=None)[source]

Set the FPGA FIR coefficients

Parameters:
  • coef – the complex FIR coefficients.
  • normalized – If None (the default), the function will determine if coef is normalized or not based on the values of the array. If True, the values must be in the range of [-1, 1]. If False, all the numbers must be between [-INT16_MAX, INT_16_MAX].
fir_set_matched_filter(tx_wfm)[source]

Based on the tx waveform, set the FPGA matched filter coefficients.

Parameters:tx_wfm – complex numpy array of the transmit waveform.
nloops

Get/set sysfs attribute as an int

num_sets

Get/set sysfs attribute as an int

num_used_sets

Get/set sysfs attribute as an int

overflow

Get/set sysfs attribute as an int

rx_only()[source]
samples_per_dma_transfer

Get/set sysfs attribute as an int

scan_settings_errormessage

Decodes / encodes sysfs files as utf-8

scan_start_set

Get/set sysfs attribute as an int

scheduler_mode

Decodes / encodes sysfs files as utf-8

settings_as_dict()[source]

Return channel settings as a dict

class wni.scan_config.Channels(scan_conf)[source]

Bases: object

Broadcasts settings between channels

my_attrs = ['_ch1', '_ch2', '_ch', 'scan_conf']
sets

Return a _BroadcastList which passes through all get/set operations

class wni.scan_config.IntAttr(path)[source]

Bases: wni.scan_config.SysfsBinAttr

Get/set sysfs attribute as an int

class wni.scan_config.ScanConf(sysfs_dir=None)[source]

Bases: object

Holds scan configuration and can initiate a software trigger

Uses the sysfs interface. Writing the registers directly will make the kernel module become out of sync with the hardware, which would be bad. Do not write the registers directly.

ch

list of channels

ch1

first channel (equivalent to ch[0])

ch2

second channel (equivalent to ch[1])

fir_config_mode

Whether the FPGA is configuring the FIR filters

busy

Whether the system is busy. Usually indicates a running scan

revision

version of FPGA code

clk_counter

64-bit timestamp from the FPGA

apply_dict_config(config)[source]

Apply scan settings from a dict.

The keys are attributes on scan_conf. The values are the values to set to. There are three special keys: ch1, ch2, and ch*. The value for each of these keys is a set. ch1 configures channel 1. ch2 configures channel 2. ch* configures both channels.

busy

Sysfs file that represents a Boolean as a 0 or 1

clear_buffer()[source]
clk_counter

Get/set sysfs attribute as an int

fir_config_mode

Sysfs file that represents a Boolean as a 0 or 1

interrupt()[source]
jitter_mask

Get/set sysfs attribute as an int

jitter_min

Get and set sysfs files in a microseconds way

jitter_min_clk

Get/set sysfs attribute as an int

max_duty_cycle

Get/set sysfs attribute as an int

max_tx_amp_length

Get and set sysfs files in a microseconds way

max_tx_amp_length_clk

Get/set sysfs attribute as an int

min_prt

Get and set sysfs files in a microseconds way

min_prt_clk

Get/set sysfs attribute as an int

revision

Get/set sysfs attribute as an int

rx_only()[source]
settings_as_dict()[source]

Return the scan settings as a nested dict. The returned dict will have the following form:

returned_d = {
    'clk_counter': 3131414141,
    'fir_config_mode': False,
    'busy': False,
    'ch1': {
        'fir_enable': True,
        'scheduler_mode': "run_while_enabled",
        'set0': {
            'rx_length': 3,
            'waveform': {
                'iq': [[0.01 + 0.1j], [0.02 + 0.02j], ...],
            ...: ...
        },
        'set1': {...: ...},
    },
    'ch2': {...: ...},
}
trigger()[source]

Start the software trigger; run for self._scan_time ms. This function immediately returns.

wait_for_scan_completion()[source]

Wait for the scan to complete.

class wni.scan_config.ScanConfigSet(channel, set_id)[source]

Bases: object

A single set of scan settings for a single channel.

The following attributes have units of microseconds. The scan config set also has attributes with the same name but with the suffix _clk, which has units of clock cycles.

* delay

the delay at the start of the cpi

* tx_amp_delay

microseconds to delay transmit amplifier trigger after prt trigger goes high

* tx_amp_length

number of microseconds that the amplfiier trigger is high

* tx_delay

microseconds to delay tx waveform trigger after prt trigger goes high

* tx_length

number of microseconds the waveform trigger is high

* rx_delay

microseconds to delay receive window after prt trigger goes high

* rx_length

number of microseconds to count rx data as valid

* prt

the pulse repetition time, in microseconds

The following attributes are also on each scan set:

Attributes

  • pulses: number of pulses in the scan set
  • add_jitter: Whether to add a random delay at the end of a CPI
  • last: whether this is the last scan configuration option in the set
  • next_set: the index of the next scan set
  • wfm_addr: waveform start address in block RAM in FPGA
  • pc_fir_taps: Filter taps to be applied on the processing computer for this scan set’s rx data.
add_jitter

Sysfs file that represents a Boolean as a 0 or 1

apply_dict_config(config)[source]

Apply configuration settings from a dict.

delay

Get and set sysfs files in a microseconds way

delay_clk

Get/set sysfs attribute as an int

last

Sysfs file that represents a Boolean as a 0 or 1

next_set

Get/set sysfs attribute as an int

pc_fir_taps

Set the FIR taps that will be used by the data processor to do additional filtering of the data.

prt

Get and set sysfs files in a microseconds way

prt_clk

Get/set sysfs attribute as an int

pulses

Get/set sysfs attribute as an int

rx_delay

Get and set sysfs files in a microseconds way

rx_delay_clk

Get/set sysfs attribute as an int

rx_length

Get and set sysfs files in a microseconds way

rx_length_clk

Get/set sysfs attribute as an int

rx_samples

Return and set the number of samples to receive. This is the number of I/Q samples that will get DMA’d from the FPGA to the processor.

scan_set_id

Get/set sysfs attribute as an int

settings_as_dict()[source]

Return scan settings as a dict

tx_amp_delay

Get and set sysfs files in a microseconds way

tx_amp_delay_clk

Get/set sysfs attribute as an int

tx_amp_length

Get and set sysfs files in a microseconds way

tx_amp_length_clk

Get/set sysfs attribute as an int

tx_delay

Get and set sysfs files in a microseconds way

tx_delay_clk

Get/set sysfs attribute as an int

tx_length

Get and set sysfs files in a microseconds way

tx_length_clk

Get/set sysfs attribute as an int

wfm
wfm_addr

Get/set sysfs attribute as an int

wfm_chirp(nsamples, center, bw, window='boxcar', scale=1, clk_freq=30.0)[source]
wfm_get_iq(normalized=True)[source]
wfm_no_tx()[source]
wfm_set_iq(iq, normalized=None)[source]
class wni.scan_config.ScanSets(channel, slices=())[source]

Bases: object

Getting/setting scan sets. Since the user can change the number of scan settings out from under us, we need to dynamically create them as needed.

my_attrs = ['channel', 'slices', 'chan_dir']
class wni.scan_config.StrAttr(path)[source]

Bases: wni.scan_config.SysfsBinAttr

Decodes / encodes sysfs files as utf-8

class wni.scan_config.SysfsBinAttr(path)[source]

Bases: object

Return Return a binary sysfs attribute. gets/sets with raw bytes.

get_path(obj)[source]

Build the sysfs path

class wni.scan_config.UsAttr(path, offset=0, clamp=True)[source]

Bases: wni.scan_config.IntAttr

Get and set sysfs files in a microseconds way

wni.scan_config.clk_to_us(cycles, clock_freq=120000000)[source]

Convert clk cycles to number of microseconds.

wni.scan_config.iq_to_tx_amp_len(iq)[source]
wni.scan_config.p(fname, root=None)[source]

Return the path to a WNI sysfs file

wni.scan_config.us_to_clk(us, clock_freq=120000000)[source]

Convert microseconds to number of clock cycles.

wni.scan_timer

wni.scan_timer.flush_sock(socket)[source]

Flushes all recv() data in socket.

wni.scan_timer.notify_respondents(surveyor_sock, pubsock)[source]

Notify TIMER_START_TOPIC whenever context is entered, and TIMER_FINISHED_TOPIC when context is exited

wni.system_config

System configuration settings

class wni.system_config.SystemConfig[source]

Bases: object

Contains (certain) highlevel system configuration, including options for where data gets sent and radar calibration details.

apply_dict_config(config)[source]

Apply configuration specified in a dict. Keys are attribute names, and values are the values that will be assigned.

settings_as_dict()[source]

Return current settings as a dict.

wni.txrx_spi

Set the LO on the radar frontend board to the correct frequency. The LO is controlled via SPI.

class wni.txrx_spi.HMC807(bus, device)[source]

Bases: object

REG_ANALOG_EN = 8
REG_CP = 9
REG_CP_OPAMP = 10
REG_FREQ_FRAC = 4
REG_FREQ_INT = 3
REG_GPO = 13
REG_ID = 0
REG_LD_STATE = 15
REG_LKD_CSP = 7
REG_PFD = 11
REG_REFDIV = 2
REG_RST = 1
REG_SD_CFG = 6
REG_SEED = 5
REG_VCO = 12
frequency_mult

Get and set the reference frequency multiplier.

classmethod from_chardev(path)[source]

Instantiate an HMC807 device given a path to SPI device in /dev

id
init_registers()[source]

Initialize the registers with values we like.

read_reg(reg)[source]
reset()[source]

Reset all registers to default values.

write_reg(reg, data)[source]

wni.util

Random utilities

class wni.util.AD9361Client(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

class wni.util.ActuatorClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

class wni.util.DpramReaderClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

class wni.util.EXT_TYPES[source]

Bases: object

chirp = 4
complex = 0
exception = 2
ndarray = 1
proxy = 3
slice = 5
class wni.util.MotorClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

class wni.util.NanoClient(connect_addr, class_=None, recv_timeout=10000)[source]

Bases: object

A client for a NanoServer.

Messages between client and server are encoded and decoded via msgpack.

close_nanoclient()[source]

Closes the underlying socket.

This isn’t just called close() because it is too likely that we’re proxying an object with a close() method.

getattr(name)[source]

Return the attribute from the server

kill_the_server()[source]
class wni.util.NanoServer(obj, bind_addr, capture_stdstreams=True)[source]

Bases: object

Manage an object remotely via nanomsg.

Messages between client and server are encoded and decoded via msgpack.

close()[source]
handle_request(d)[source]

Handle a client’s request

serve()[source]
class wni.util.Notifier(surveyor_addr=None)[source]

Bases: object

Connects to the timer surveyor.

bg()[source]
close()[source]
flush_queue()[source]
sync_start()[source]

Should be called after getting the surveyor message if you want to know exactly when a scan starts.

wait_for_msg(timeout=None)[source]
class wni.util.PositionIndicatorClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

class wni.util.ScanConfClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

class wni.util.ScanTimerClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

exception wni.util.ServerException(exc_info)[source]

Bases: Exception

Whenever a NanoServer would have raised an exception, the client receives this exception and raises it.

class wni.util.SystemConfigClient(addr=None, **kwargs)[source]

Bases: wni.util.NanoClient

wni.util.actuator_server(bind_addr=None)[source]

This is meant to be run as its own process, and should be the only thing on the machine that communicates directly with the actuator.

This process provides a request/respoonse nanomsg way of working with the actuator, to provide for faster responses for the position.

The message is either of the form “get[value]” or “set[value##]”; if the value is set, an “ok” response is sent back.

wni.util.ad9361_server(bind_addr=None, spidev_name=None)[source]
wni.util.data_client1()[source]

Return a data client for channel 1 that can only be used from the processing computer or Microzed

wni.util.data_client2()[source]

Return a data client for channel 2 that can only be used from the processing computer or Microzed

wni.util.default_pack(o)[source]

Serialize (certain) Python objects using msgpack.

Currently supported objects:

  1. complex numbers
  2. Exceptions (limited support)
  3. _ProxyInfo objects, for use in NanoClient/NanoServer communication
  4. numpy.ndarray
wni.util.ext_hook_unpack(code, data)[source]

Unpack the Python objects that are known to default_pack()

wni.util.is_leaf(obj)[source]
wni.util.nested_dict_merged(d1, d2)[source]

Return a dict representing a merged dict of d1 and d2.

The values in d2 supersede the values in d1, but there is an important case here: if the value is another dict, this method will be called again for the two dicts.

class wni.util.nullcontext(enter_result=None)[source]

Bases: object

wni.util.packb(o, **kwargs)

Pack object o and return packed bytes

See Packer for options.

wni.util.pcpm_client(timeout=1000)[source]

Return a client for the processing computer process manager that can only be used from the processing computer or Microzed.

wni.util.r356_server(bind_addr=None, use_fake=False)[source]

This is meant to be run as its own process, and should be the only thing on the machine that communicates directly with the motor.

This process provides a request/response nanomsg way of working with the motor, to provide for faster responses for the position.

Usage:

p = multiprocessing.Process(target=R356NanomsgServer) p.start()
wni.util.radar_server(bind_addr='tcp://0.0.0.0:25010')[source]
wni.util.scan_conf_server(bind_addr=None)[source]
wni.util.scan_timer_server(bind_addr=None)[source]
wni.util.send_obj(sock, data, stdout='', stderr='')[source]
wni.util.setter(fset)[source]
wni.util.system_config_server(bind_addr=None)[source]
wni.util.unpackb(packed, object_hook=None, list_hook=None, bool use_list=True, bool raw=True, encoding=None, unicode_errors=None, object_pairs_hook=None, ext_hook=ExtType, Py_ssize_t max_str_len=2147483647, Py_ssize_t max_bin_len=2147483647, Py_ssize_t max_array_len=2147483647, Py_ssize_t max_map_len=2147483647, Py_ssize_t max_ext_len=2147483647)

Unpack packed_bytes to object. Returns an unpacked object.

Raises ValueError when packed contains extra bytes.

See Unpacker for options.

wni.util.uzpm_client(timeout=1000)[source]

Return a client for the Microzed process manager that can only be used from the processing computer or Microzed.

wni.util.wait_for_connection(sock, timeout)[source]

Busy-wait for connection on sock. If it does not happen within specified time, assert False is raised.

wni.waveforms

Provides an interface to the waveforms stuff exposed by the kernel

class wni.waveforms.Chirp(nsamples, center, bw, window='boxcar', scale=1, clk_freq=30.0, phase=0)[source]

Bases: object

A bag of values representing a linear frequency modulated (LFM) chirp.

class wni.waveforms.Waveform(path)[source]

Bases: object

Get and set Tx waveforms.

apply_dict_config(config)[source]

Apply waveform configuration from a dict. Returns the I/Q samples that were set.

Configuration keys:

  • “type”: either “chirp” or “arb”. If “type” is not specified, “arb” is assumed. If “chirp” is given, the following keys are required:

    • “nsamples”: Number of samples in the chirp.
    • “center”: Center frequency in the chirp, in MHz. The value can be between +/-15.
    • “bandwidth”: The bandwidth of the chirp, in MHz. The value can be between +/-15.

The following keys are optional for chirp:

  • “window”: The name of the amplitude window to apply. Defaults to “hanning”.
  • “phase”: The phase offset of the waveform.

The following keys are required if the type is “arb”

  • “iq”: A complex numpy array of samples. If all values fall between -1 and 1, they are scaled appropriately to fit in the 12-bit DAC. Otherwise, the values are used directly as is.
  • “scale”: Scale to apply to the digital waveform. Defaults to 0.25. The DAC performs better whenever this value is not near 1.
chirp(nsamples, center, bw, window='boxcar', scale=1, clk_freq=30.0, phase=0)[source]

Set the current waveform to the specified LFM chirp.

Parameters:
  • nsamples – Number of samples in the chirp.
  • center – The center frequency in MHz
  • bw – The bandwidth in MHz
  • window – The amplitude window
  • scale – A number between [0, 1]. Scale the Tx waveform by this factor.
  • clk_freq – Clock frequency in MHz.
get_iq(normalized=True)[source]
get_iq_int12()[source]
get_iq_normalized()[source]
no_tx()[source]
set_iq(iq, normalized=None)[source]

Write I/Q samples to the waveform file.

Parameters:
  • iq – complex numpy array of I/Q samples
  • normalized – whether the I/Q samples are normalized (all samples between [-1, 1]) or not (all samples fit in 12-bit ints)
set_iq_int12(iq)[source]

Write I/Q samples to the waveform file.

Parameters:iq – complex numpy array of I/Q samples, which all fit in 12-bit ints
set_iq_normalized(iq)[source]

Writes normalized I/Q samples to the waveorm file.

Parameters:
  • iq – complex numpy array of I/Q samples. All samples must be
  • [-1, 1] inclusive. (between) –
settings_as_dict()[source]

Return the current waveform information serialized as a dict

waveform

Return the scaled waveform, so that max amplitude corresponds to [-1, 1].

wni.waveforms.calc_tx_length(nsamples)[source]

Calculate the tx time in microseconds.

wni.waveforms.to_int12(iq)[source]

Set iq scaled from [-1, 1] to int12 values.

Module contents