import contextlib
import itertools
import logging
import os
import time
from threading import Thread
import nanomsg
import wni.config as config
import wni.radar
from wni.nano import get_current_connections
logger = logging.getLogger(__name__)
[docs]@contextlib.contextmanager
def notify_respondents(surveyor_sock, pubsock):
"""
Notify TIMER_START_TOPIC whenever context is entered, and
TIMER_FINISHED_TOPIC when context is exited
"""
surveyor_sock.send(config.TIMER_START_TOPIC)
_wait_for_responses(surveyor_sock)
pubsock.send(b'started')
try:
yield
finally:
surveyor_sock.send(config.TIMER_FINISHED_TOPIC)
_wait_for_responses(surveyor_sock)
[docs]def flush_sock(socket):
"""Flushes all recv() data in socket."""
orig_timeout = socket.recv_timeout
socket.recv_timeout = 0
try:
while True:
socket.recv()
except nanomsg.NanoMsgAPIError:
# this is what we wanted actually
pass
finally:
socket.recv_timeout = orig_timeout
def _wait_for_responses(socket):
how_many = get_current_connections(socket)
try:
for i in range(how_many):
socket.recv()
except nanomsg.NanoMsgAPIError:
msg = 'Waited for responses from {} peers, only got {}'
logger.error(msg.format(how_many, i - 1))
raise
class _ScanTimer():
"""
Holds scan timer information, and publishes a signal when a scan begins and
ends.
"""
def __init__(self):
self.trig_file = os.path.join(config.WNI_SYSFS, 'trigger')
self.ch1_err = os.path.join(config.WNI_SYSFS,
'ch1/scan_settings_errormessage')
self.ch2_err = os.path.join(config.WNI_SYSFS,
'ch2/scan_settings_errormessage')
self.surveyor = nanomsg.Socket(nanomsg.SURVEYOR)
self.surveyor.bind(config.TIMER_SURVEY_ADDR)
# set the deadline to 4 seconds
self.surveyor.set_int_option(nanomsg.SURVEYOR,
nanomsg.SURVEYOR_DEADLINE,
4000)
self.pubsock = nanomsg.Socket(nanomsg.PUB)
self.pubsock.bind(config.TIMER_SYNC_ADDR)
# if this socket receives any data during a can, the scan aborts
self.interrupt_sock = nanomsg.Socket(nanomsg.REP)
self.interrupt_sock.bind(config.SCAN_INTERRUPT_ADDR)
self.radar = wni.radar.Radar()
self._thread = None
def _write_trigger(self, val):
encoded = b'%d\n' % val
with open(self.trig_file, 'wb', buffering=False) as f:
try:
f.write(encoded)
except OSError:
# Writing a 1 to the trigger sysfs file causes
logger.exception('Could not run scan; check the kernel logs.')
raise
def _read_err_files(self):
# checking for errors actually has side effects which cause the
# scan settings to be sequenced correctly
errs = []
for fname in (self.ch1_err, self.ch2_err):
with open(fname, 'rb') as f:
errs.append(f.read().strip())
return errs
def _do_simple_trigger(self, time_seconds):
# since this is called in a thread, we wrap the whole thing in a
# try/except so that if something goes wrong we can be notified of it.
try:
# if the scan lasts too long (not sure how long that is yet!) then
# the xilinx DMA core will start to fail. Chunk up the scan so
# that it doesn't screw anything up.
with notify_respondents(self.surveyor, self.pubsock):
self._write_trigger(1)
timeout = int(round(time_seconds * 1000))
self.interrupt_sock.recv_timeout = timeout
try:
self.interrupt_sock.recv()
self.interrupt_sock.send(b'ok')
except nanomsg.NanoMsgAPIError:
# timeout is the normal way to quit a scan.
pass
finally:
self._write_trigger(0)
except Exception:
logger.exception("trigger thread unexpectedly died")
# make the timer process kill itself so that the process manager
# will revive it, and then maybe things will go better.
os._exit(2)
def ensure_ok_settings(self, vcp_settings):
# ensure that the VCP settings are ok.
if not isinstance(vcp_settings, dict):
msg = 'vcp_settings must be a dict, not {}'
msg = msg.format(type(vcp_settings))
raise ValueError(msg)
if 'type' not in vcp_settings:
raise ValueError('"type" field must be present in vcp settings')
if vcp_settings['type'] != 'ppi':
msg = 'type must be "ppi", not {}'.format(vcp_settings['type'])
raise ValueError(msg)
if 'value' not in vcp_settings:
raise ValueError('"value" field must be present in vcp settings')
program = vcp_settings['value']
if not isinstance(program, (list, tuple)):
msg = '"value" field must be a list, not {}'
msg = msg.format(type(program))
raise ValueError(msg)
for i, step in enumerate(program):
if not isinstance(step, dict):
msg = "The program must be a list of dicts, not {}"
msg = msg.format(type(step))
raise ValueError(msg)
if i == 0:
# ensure that the first one contains fields for rotation speed
# and elevation
if 'az_speed' not in step or 'el' not in step:
msg = ('"az_speed" and "elevation" are required fields of '
'the first step')
raise ValueError(msg)
az_speed = step.get('az_speed', 1.0)
el = step.get('el', 1.0)
if not isinstance(az_speed, (float, int)) or az_speed <= 0:
msg = 'az_speed must be a number > 0, not {!r:}'
msg = msg.format(az_speed)
raise ValueError(msg)
if not isinstance(el, (float, int)) or el < -1.5:
msg = 'el must be a number > -1.5, not {!r:}'
msg = msg.format(el)
raise ValueError(msg)
def _ppi_step(self, step, end_time, az_speed, el):
# returns boolean of whether we need to do more steps or not.
az_speed = step.get('az_speed', az_speed)
el = step.get('el', el)
self.radar.pedestal.degrees_per_second = az_speed
self.radar.pedestal.elevation = el
endpos, _ = self.radar.current_position()
# wait for speed to settle
# wait for speed to settle. Wait 2 seconds
for _ in range(20):
curspeed = self.radar.current_az_speed()
if abs(curspeed - az_speed) < 0.2:
break
time.sleep(0.1)
else:
logger.warning("Speed did not settle within two seconds.")
curpos, _ = self.radar.current_position()
left_to_go = 360 - abs(curpos - endpos)
time_until_completion = left_to_go / az_speed
now = time.monotonic()
time_left = end_time - now
time_to_wait = min(time_until_completion, time_left)
if time_to_wait < 0:
return False
else:
self.interrupt_sock.recv_timeout = round(int(time_to_wait * 1000))
try:
self.interrupt_sock.recv()
self.interrupt_sock.send(b'ok')
# we were interrupted
return False
except nanomsg.NanoMsgAPIError:
return True
def do_ppi(self, vcp_settings):
steps = vcp_settings['value']
max_time = vcp_settings.get('scan_time', -1)
with notify_respondents(self.surveyor, self.pubsock):
first = steps[0]
az_speed = first['az_speed']
self.radar.pedestal.degrees_per_second = az_speed
el = first['el']
if not self.radar.pedestal.is_spinning:
self.radar.pedestal.spin()
self.radar.pedestal.degrees_per_second = az_speed
self.radar.pedestal.elevation = el
# give a little time for az and el to stabilize
time.sleep(0.8)
try:
self._write_trigger(1)
start_time = time.monotonic()
if max_time < 0:
# end in 1 year
end_time = start_time + 60 * 60 * 365
else:
end_time = start_time + max_time
for step in itertools.cycle(steps):
keep_going = self._ppi_step(step, end_time, az_speed, el)
if not keep_going:
break
finally:
self._write_trigger(0)
def do_vcp(self, vcp_settings):
self.ensure_ok_settings(vcp_settings)
self._ensure_can_trigger()
self._thread = Thread(name='triggered', target=self.do_ppi,
args=[vcp_settings])
self._thread.daemon = True
self._thread.start()
def _ensure_can_trigger(self):
"""
Raises RuntimeError if we can't run a trigger now.
There is a TOCTOU potential bug here, so don't try to trigger in
multiple threads.
"""
errs = self._read_err_files()
if any(errs):
raise ValueError('Cannot run scan right now.')
if isinstance(self._thread, Thread) and self._thread.is_alive():
# wait 1 second for the previous thread to stop
timeout = time.monotonic() + 1
while time.monotonic() < timeout and self._thread.is_alive():
time.sleep(0.005)
if self._thread.is_alive():
raise RuntimeError('scan already running')
# make sure the interrupt socket is empty before the scan is triggered.
# This prevents stale requests from causing the scan to stop
# immediately after it is told to start.
flush_sock(self.interrupt_sock)
def trigger(self, seconds):
"""
Triggers the radar to run in a background thread
Args:
seconds: the amount of time to run, in seconds
"""
self._ensure_can_trigger()
self._thread = Thread(name='triggered', target=self._do_simple_trigger,
args=[seconds])
self._thread.daemon = True
self._thread.start()