"""
Contains all (maybe all) of the processes that are controlled with nanomsg
sockets.
"""
from __future__ import absolute_import, division, print_function
import datetime
import logging
import os
import shutil
import time
import nanomsg
import psutil
logger = logging.getLogger(__name__)
[docs]class OutOfSyncError(Exception):
"""This gets raised if ProcessManager gets out of sync with the processes
it should know about.
"""
[docs]class ProcManError(Exception):
"""This gets raised if ProcessManager is told to start a process
without giving enough information it should know about.
"""
# we're defining the help text here so it doesn't make the indentation in the
# class look horrible.
_procman_help_text = """\
ProcessManager: manages processes, accepting commands via NanoMessage.
Generally, a command can be given via e.g. nanocat with the following format:
nanocat --req --connect {address_of_procman} --raw -D "PROCESS_NAME COMMAND [ARGS...]" --raw
Available commands:
stop: stop the managed process
restart: restarts the managed process with the same cmdline it was initially
given
status: show process information
cmdline: show the command line arguments used for the process
if PROCESS_NAME == procman, the process manager can execute certain commands
which affect itself and/or all child processes.
Available meta commands:
help: show this help
restart: restart all managed processes
stop: stop all managed processes
list: list all processes managed by the process mananger
die: kill all children and stop running
pid: show the pid of the process manager
start NAME /PATH/TO/PROGRAM [ARGS...]: if the process identified by
``name`` is already managed by the process manager and currently running,
does nothing. If it is managed but has been stopped previously, it is
restarted with its original commandline. If it is not a managed process, the
specified process will be started with the provided cmdline.
"""
[docs]class ProcessManager(object):
"""
This class starts and manages processes that are started by the script
wni_process.
This class is modeled after Linux services, and knows how to respond to the
commands "start", "stop", "restart", and "status".
"""
def __init__(self, check_interval=10):
self.check_interval = check_interval
self.cmdlines = {}
self.name_to_process = {}
self._process_to_name = {}
self._running_processes = set()
[docs] def manage(self, processes):
"""Adds `processes` to the list of running processes.
`processes` is a list of processes to start; each element of processes
is a list of arguments passed to psutil.Popen
"""
for proc in processes:
name = proc[0]
cmdline = proc[1]
self.start(name, *cmdline)
[docs] def revive_dead_children(self):
"""Called from serve() approximately every `check_interval` seconds.
Revive processes that died and need to be revived.
"""
processes = list(self.process_iter())
pnames = {self._process_to_name[proc] for proc in processes}
out_of_sync = pnames - self._running_processes
dead_processes = self._running_processes - pnames
if out_of_sync:
msg = 'self._running_processes got out of sync with managed processes. '
msg += 'Unmanaged process: "%s"' % out_of_sync
logger.error("ProcessManager internal error: processes out of sync now")
raise OutOfSyncError(msg)
for procname in dead_processes:
self.start(procname)
logger.warning("Restarting process %s", procname)
[docs] def list_processes(self, verbose=False):
fmt = '{:>5} {: <20s} {}'.format
msg = [fmt('pid', 'name', 'status')]
for name, process in self.name_to_process.items():
if name in self._running_processes:
pid = process.pid
msg.append(fmt(pid, name, process.status()))
else:
msg.append(fmt("-", name, '--------------'))
# put a blank line at the end so that there will be a newline at the
# end.
msg.append('')
return '\n'.join(msg)
[docs] def stop_all(self):
# have to wrap _running_processes in a list so the set doesn't change
# during iteration.
for proc in list(self._running_processes):
self.stop(proc)
[docs] def restart_all(self):
"""Restart all managed processes"""
for proc in self._running_processes:
self.restart(proc)
[docs] def status(self, name):
"""Return a string indicating the status of the process associated with
``name``"""
not_running = 'The process "{}" is not running'.format(name)
not_managed = 'Unknown process "{}"'.format(name)
try:
p = self.name_to_process[name]
except KeyError:
return not_managed
if not p.is_running():
return not_running
with p.oneshot():
cmdline = p.cmdline()
cpu_times = p.cpu_times()
cpu_percent = p.cpu_percent()
pid = p.pid
ctime = p.create_time()
status = p.status()
threads = p.num_threads()
ctime = datetime.datetime.fromtimestamp(ctime)
formatted = ctime.strftime('%Y%m%d-%H:%M:%S')
x = []
x.append('{: <12} {}'.format("cmdline", " ".join(cmdline)))
x.append('{: <12} {}'.format("cpu_time", cpu_times))
x.append('{: <12} {}'.format("cpu_percent", cpu_percent))
x.append('{: <12} {}'.format("pid", pid))
x.append('{: <12} {}'.format("time_created", formatted))
x.append('{: <12} {}'.format("status", status))
x.append('{: <12} {}\n'.format("threads", threads))
return '\n'.join(x)
[docs] def is_running(self, name):
try:
proc = self.name_to_process[name]
except KeyError:
return False
try:
status = proc.status()
except psutil.NoSuchProcess:
return False
if status == 'zombie':
proc.wait()
return False
else:
return proc.is_running()
[docs] def start(self, name, *args):
"""
Start a process and make sure it keeps running.
A ProcessManager will not allow multiple processes with identical
`name`s to be started. The `name` is just a string, and is not
necessarily related to the actual command line arguments.
"""
try:
exe = args[0]
arguments = args
except IndexError:
try:
# if the process is already managed, it can be started without
# needing to provide all the arguments again.
arguments = self.cmdlines[name]
exe = arguments[0]
except KeyError:
msg = "Process {} is not managed by this ProcessManager, and no command line was given"
msg = msg.format(name)
logger.error(msg)
raise ProcManError(msg)
exepath = shutil.which(exe)
if not exepath:
raise ValueError('The file "{}" does not exist'.format(exe))
if self.is_running(name):
msg = 'A process with the name "{}" is already running'.format(name)
raise ValueError(msg)
logger.info('starting %s: %s', name, ' '.join(arguments))
p = psutil.Popen(arguments)
self.cmdlines[name] = arguments
self.name_to_process[name] = p
self._process_to_name[p] = name
self._running_processes.add(name)
return p
[docs] def stop(self, name):
def _kill_leafs(proc):
"""Kill all children of a process recursively."""
for p in proc.children():
_kill_leafs(p)
logger.info('killing proc %s', ' '.join(proc.cmdline()))
proc.terminate()
proc.wait()
p = self.name_to_process[name]
# it _should_ be alive, but sometimes it isn't.
if self.is_running(name):
_kill_leafs(p)
ret = True
else:
ret = False
self._running_processes.discard(name)
return ret
[docs] def restart(self, name):
self.stop(name)
self.start(name)
[docs] def process_iter(self):
"""Yield processes that are managed by this class.
"""
for name, proc in self.name_to_process.items():
if name not in self._running_processes:
continue
try:
status = proc.status()
except psutil.NoSuchProcess:
continue
if status == 'zombie':
# gotta remove all references to the thing so it can die
self.name_to_process[name]
proc.wait()
elif proc.is_running():
yield proc
[docs] def watch_processes_forever(self):
"""
Checks on its child processes eternally, polling them every
``self.check_interval`` seconds.
"""
try:
while True:
time.sleep(self.check_interval)
self.revive_dead_children()
except KeyboardInterrupt:
self.stop_all()
raise
except: # noqa
self.stop_all()
logger.exception("ProcessManager is dying and killing all its children")
raise
[docs] def close(self):
self.stop_all()
[docs]class NanoProcessManager(ProcessManager):
"""A ProcessManager that can be controlled and queried via nanomsg."""
COMMANDS = [
"pid",
"start",
"stop",
"restart",
"status",
"cmd",
"cmdline",
]
def __init__(self, addr, recv_timeout=1000, force=False, check_interval=10):
"""
if force==True, ProcessManager will kill another existing
ProcessManager instance using the same addr.
"""
super().__init__(check_interval)
self.addr = addr
self.sock = nanomsg.Socket(nanomsg.REP)
try:
self.sock.bind(addr)
except nanomsg.NanoMsgAPIError as e:
if e.errno == nanomsg.EADDRINUSE and force:
if not self.__kill_other_procman():
raise
self.sock.bind(addr)
else:
raise
self.sock.recv_timeout = recv_timeout
# a mapping from process_name: [args]
def __kill_other_procman(self):
"""
Kills a running ProcessManager that is running on the same port as this
one.
"""
print('gonna kill')
client = nanomsg.Socket(nanomsg.REQ)
# should be connecting to another procman instance
client.connect(self.addr)
# get the PID so we can wait on the process
client.send(b'procman pid')
pid = int(client.recv())
proc = psutil.Process(pid)
client.send(b'procman die')
dying = client.recv()
proc.wait()
if dying == b'dying':
return True
else:
return False
[docs] def recv(self):
"""listen for incoming commands."""
self.sock.recv()
[docs] def send(self, msg):
"""Send response."""
self.sock.send(msg)
[docs] def recv_and_resp(self):
req = self.sock.recv()
resp = self.obey(req)
self.send(resp.encode())
# This is not very elegant; the code that calls this checks the return
# value, and if it is True is required to exit.
if resp.strip() == 'dying':
return True
[docs] def serve(self):
try:
last_checked = time.monotonic()
while True:
try:
should_die = self.recv_and_resp()
if should_die:
self.stop_all()
self.sock.close()
return
except nanomsg.NanoMsgAPIError as e:
now = time.monotonic()
tdiff = now - last_checked
if tdiff > self.check_interval:
self.revive_dead_children()
last_checked = time.monotonic()
except KeyboardInterrupt:
self.stop_all()
raise
except: # noqa
self.stop_all()
logger.exception("ProcessManager is dying and killing all its children")
raise
[docs] def obey(self, data):
"""Respond to incoming commands."""
msg = data.decode()
parts = msg.split()
if len(parts) < 1:
msg = self.handle_meta_command('help', [])
return msg
elif len(parts) < 2:
# by default have the command be "status"
parts.append('status')
# name of the process to control
pname = parts[0]
# what to do with the process
cmd = parts[1]
# any arguments that the process requires (if it's being started or
# restarted)
args = parts[2:]
# handle procman commands differently, because it is THIS process"
if pname == "procman":
msg = self.handle_meta_command(cmd, args)
elif cmd not in self.COMMANDS:
msg = 'unknown command "{}"'.format(cmd)
return msg
else:
msg = self._do_command(cmd, pname, args)
return msg
def _do_command(self, cmd, pname, args):
"""Execute the specified command """
if cmd == 'start':
try:
self.start(pname, *args)
msg = 'process "{}" started'.format(pname)
except ValueError:
msg = 'process "{}" already running; not started'.format(pname)
except ProcManError as e:
msg = str(e)
elif cmd == 'stop':
try:
worked = self.stop(pname)
if worked:
msg = 'process "{}" stopped'.format(pname)
else:
msg = 'process "{}" was already stopped'.format(pname)
except ValueError:
msg = 'Unknown process "{}"'.format(pname)
elif cmd == 'restart':
if pname not in self.name_to_process:
msg = 'Unknown process "{}"'.format(pname)
else:
self.restart(pname)
msg = 'process "{}" restarted'.format(pname)
elif cmd == 'status':
try:
msg = self.status(pname)
except ValueError:
msg = 'No such process "{}"'.format(pname)
elif cmd == 'cmdline' or cmd == 'cmd':
try:
p = self.name_to_process[pname]
msg = ' '.join(p.cmdline())
except KeyError:
msg = 'No such process "{}"'.format(pname)
elif cmd == 'pid':
try:
p = self.name_to_process[pname]
msg = str(p.pid)
except KeyError:
msg = 'No such process "{}"'.format(pname)
else:
raise ValueError('Unknown command "{}"'.format(cmd))
msg += '\n'
return msg
[docs] def close(self):
"""Close the socket associated with the process manager"""
super().close()
self.sock.close()
[docs]class UnknownProcessError(Exception):
pass
[docs]class ProcessStateError(Exception):
pass
[docs]class NanoProcessManagerClient:
"""
Provides an interface to a NanoProcessManager in Python.
"""
def __init__(self, addr, recv_timeout=1000):
self.addr = addr
self.sock = nanomsg.Socket(nanomsg.REQ)
self.sock.connect(addr)
self.sock.recv_timeout = recv_timeout
def __del__(self):
self.sock.close()
@property
def processes(self):
"""
Return a list of strings of the names of all the running processes
"""
msg = self._send_and_recv('procman list')
lines = msg.split('\n')
# strip header
lines = lines[1:]
processes = []
for line in lines:
if not line.strip():
continue
name = line.split()[1]
processes.append(name)
return processes
[docs] def is_managed(self, name):
"""
Return True if the process with given ``name`` is managed by the
ProcesssManager.
"""
ret = self._send_and_recv(name)
return not ret.startswith('Unknown process')
[docs] def stop(self, process):
"""
Stop a running process.
If the process is uknown to the ProcessManager, UnknownProcessError is
raised.
If the process was already stopped, ProcessStateError is raised.
"""
ret = self._send_and_recv('{} stop'.format(process))
if ret.startswith('Unknown'):
raise UnknownProcessError(ret)
elif 'already stopped' in ret:
raise ProcessStateError(ret)
[docs] def start(self, process):
"""
Start a previously-stopped process.
If the process is already running, a ProcessStateError is raised.
"""
ret = self._send_and_recv('{} start'.format(process))
if 'is not managed' in ret:
raise UnknownProcessError(ret)
if 'already running' in ret:
raise ProcessStateError(ret)
[docs] def restart(self, process):
"""
Restart a process.
If the process is unknown, an UnknownProcessError is raised.
"""
ret = self._send_and_recv('{} restart'.format(process))
if ret.startswith('Unknown'):
raise UnknownProcessError(ret)
[docs] def status(self, process):
ret = self._send_and_recv('{} status'.format(process))
if ret.startswith('Unknown process'):
raise UnknownProcessError(ret)
return ret
[docs] def process_is_running(self, process):
ret = self.status(process)
if 'is not running' in ret:
return False
else:
return True
@property
def manager_is_running(self):
"""
Detect if the process manager is running.
"""
try:
self.processes
except nanomsg.NanoMsgAPIError:
return False
return True
def _send_and_recv_raw(self, data):
self.sock.send(data)
return self.sock.recv()
def _send_and_recv(self, data):
return self._send_and_recv_raw(data).decode()