Source code for wni.scan_timer

import contextlib
import itertools
import logging
import os
import time
from threading import Thread

import nanomsg

import wni.config as config
import wni.radar
from wni.nano import get_current_connections

logger = logging.getLogger(__name__)


[docs]@contextlib.contextmanager def notify_respondents(surveyor_sock, pubsock): """ Notify TIMER_START_TOPIC whenever context is entered, and TIMER_FINISHED_TOPIC when context is exited """ surveyor_sock.send(config.TIMER_START_TOPIC) _wait_for_responses(surveyor_sock) pubsock.send(b'started') try: yield finally: surveyor_sock.send(config.TIMER_FINISHED_TOPIC) _wait_for_responses(surveyor_sock)
[docs]def flush_sock(socket): """Flushes all recv() data in socket.""" orig_timeout = socket.recv_timeout socket.recv_timeout = 0 try: while True: socket.recv() except nanomsg.NanoMsgAPIError: # this is what we wanted actually pass finally: socket.recv_timeout = orig_timeout
def _wait_for_responses(socket): how_many = get_current_connections(socket) try: for i in range(how_many): socket.recv() except nanomsg.NanoMsgAPIError: msg = 'Waited for responses from {} peers, only got {}' logger.error(msg.format(how_many, i - 1)) raise class _ScanTimer(): """ Holds scan timer information, and publishes a signal when a scan begins and ends. """ def __init__(self): self.trig_file = os.path.join(config.WNI_SYSFS, 'trigger') self.ch1_err = os.path.join(config.WNI_SYSFS, 'ch1/scan_settings_errormessage') self.ch2_err = os.path.join(config.WNI_SYSFS, 'ch2/scan_settings_errormessage') self.surveyor = nanomsg.Socket(nanomsg.SURVEYOR) self.surveyor.bind(config.TIMER_SURVEY_ADDR) # set the deadline to 4 seconds self.surveyor.set_int_option(nanomsg.SURVEYOR, nanomsg.SURVEYOR_DEADLINE, 4000) self.pubsock = nanomsg.Socket(nanomsg.PUB) self.pubsock.bind(config.TIMER_SYNC_ADDR) # if this socket receives any data during a can, the scan aborts self.interrupt_sock = nanomsg.Socket(nanomsg.REP) self.interrupt_sock.bind(config.SCAN_INTERRUPT_ADDR) self.radar = wni.radar.Radar() self._thread = None def _write_trigger(self, val): encoded = b'%d\n' % val with open(self.trig_file, 'wb', buffering=False) as f: try: f.write(encoded) except OSError: # Writing a 1 to the trigger sysfs file causes logger.exception('Could not run scan; check the kernel logs.') raise def _read_err_files(self): # checking for errors actually has side effects which cause the # scan settings to be sequenced correctly errs = [] for fname in (self.ch1_err, self.ch2_err): with open(fname, 'rb') as f: errs.append(f.read().strip()) return errs def _do_simple_trigger(self, time_seconds): # since this is called in a thread, we wrap the whole thing in a # try/except so that if something goes wrong we can be notified of it. try: # if the scan lasts too long (not sure how long that is yet!) then # the xilinx DMA core will start to fail. Chunk up the scan so # that it doesn't screw anything up. with notify_respondents(self.surveyor, self.pubsock): self._write_trigger(1) timeout = int(round(time_seconds * 1000)) self.interrupt_sock.recv_timeout = timeout try: self.interrupt_sock.recv() self.interrupt_sock.send(b'ok') except nanomsg.NanoMsgAPIError: # timeout is the normal way to quit a scan. pass finally: self._write_trigger(0) except Exception: logger.exception("trigger thread unexpectedly died") # make the timer process kill itself so that the process manager # will revive it, and then maybe things will go better. os._exit(2) def ensure_ok_settings(self, vcp_settings): # ensure that the VCP settings are ok. if not isinstance(vcp_settings, dict): msg = 'vcp_settings must be a dict, not {}' msg = msg.format(type(vcp_settings)) raise ValueError(msg) if 'type' not in vcp_settings: raise ValueError('"type" field must be present in vcp settings') if vcp_settings['type'] != 'ppi': msg = 'type must be "ppi", not {}'.format(vcp_settings['type']) raise ValueError(msg) if 'value' not in vcp_settings: raise ValueError('"value" field must be present in vcp settings') program = vcp_settings['value'] if not isinstance(program, (list, tuple)): msg = '"value" field must be a list, not {}' msg = msg.format(type(program)) raise ValueError(msg) for i, step in enumerate(program): if not isinstance(step, dict): msg = "The program must be a list of dicts, not {}" msg = msg.format(type(step)) raise ValueError(msg) if i == 0: # ensure that the first one contains fields for rotation speed # and elevation if 'az_speed' not in step or 'el' not in step: msg = ('"az_speed" and "elevation" are required fields of ' 'the first step') raise ValueError(msg) az_speed = step.get('az_speed', 1.0) el = step.get('el', 1.0) if not isinstance(az_speed, (float, int)) or az_speed <= 0: msg = 'az_speed must be a number > 0, not {!r:}' msg = msg.format(az_speed) raise ValueError(msg) if not isinstance(el, (float, int)) or el < -1.5: msg = 'el must be a number > -1.5, not {!r:}' msg = msg.format(el) raise ValueError(msg) def _ppi_step(self, step, end_time, az_speed, el): # returns boolean of whether we need to do more steps or not. az_speed = step.get('az_speed', az_speed) el = step.get('el', el) self.radar.pedestal.degrees_per_second = az_speed self.radar.pedestal.elevation = el endpos, _ = self.radar.current_position() # wait for speed to settle # wait for speed to settle. Wait 2 seconds for _ in range(20): curspeed = self.radar.current_az_speed() if abs(curspeed - az_speed) < 0.2: break time.sleep(0.1) else: logger.warning("Speed did not settle within two seconds.") curpos, _ = self.radar.current_position() left_to_go = 360 - abs(curpos - endpos) time_until_completion = left_to_go / az_speed now = time.monotonic() time_left = end_time - now time_to_wait = min(time_until_completion, time_left) if time_to_wait < 0: return False else: self.interrupt_sock.recv_timeout = round(int(time_to_wait * 1000)) try: self.interrupt_sock.recv() self.interrupt_sock.send(b'ok') # we were interrupted return False except nanomsg.NanoMsgAPIError: return True def do_ppi(self, vcp_settings): steps = vcp_settings['value'] max_time = vcp_settings.get('scan_time', -1) with notify_respondents(self.surveyor, self.pubsock): first = steps[0] az_speed = first['az_speed'] self.radar.pedestal.degrees_per_second = az_speed el = first['el'] if not self.radar.pedestal.is_spinning: self.radar.pedestal.spin() self.radar.pedestal.degrees_per_second = az_speed self.radar.pedestal.elevation = el # give a little time for az and el to stabilize time.sleep(0.8) try: self._write_trigger(1) start_time = time.monotonic() if max_time < 0: # end in 1 year end_time = start_time + 60 * 60 * 365 else: end_time = start_time + max_time for step in itertools.cycle(steps): keep_going = self._ppi_step(step, end_time, az_speed, el) if not keep_going: break finally: self._write_trigger(0) def do_vcp(self, vcp_settings): self.ensure_ok_settings(vcp_settings) self._ensure_can_trigger() self._thread = Thread(name='triggered', target=self.do_ppi, args=[vcp_settings]) self._thread.daemon = True self._thread.start() def _ensure_can_trigger(self): """ Raises RuntimeError if we can't run a trigger now. There is a TOCTOU potential bug here, so don't try to trigger in multiple threads. """ errs = self._read_err_files() if any(errs): raise ValueError('Cannot run scan right now.') if isinstance(self._thread, Thread) and self._thread.is_alive(): # wait 1 second for the previous thread to stop timeout = time.monotonic() + 1 while time.monotonic() < timeout and self._thread.is_alive(): time.sleep(0.005) if self._thread.is_alive(): raise RuntimeError('scan already running') # make sure the interrupt socket is empty before the scan is triggered. # This prevents stale requests from causing the scan to stop # immediately after it is told to start. flush_sock(self.interrupt_sock) def trigger(self, seconds): """ Triggers the radar to run in a background thread Args: seconds: the amount of time to run, in seconds """ self._ensure_can_trigger() self._thread = Thread(name='triggered', target=self._do_simple_trigger, args=[seconds]) self._thread.daemon = True self._thread.start()