Source code for wni.processes

"""
Contains all (maybe all) of the processes that are controlled with nanomsg
sockets.
"""
from __future__ import absolute_import, division, print_function

import datetime
import logging
import os

import shutil
import time

import nanomsg
import psutil


logger = logging.getLogger(__name__)


[docs]class OutOfSyncError(Exception): """This gets raised if ProcessManager gets out of sync with the processes it should know about. """
[docs]class ProcManError(Exception): """This gets raised if ProcessManager is told to start a process without giving enough information it should know about. """
# we're defining the help text here so it doesn't make the indentation in the # class look horrible. _procman_help_text = """\ ProcessManager: manages processes, accepting commands via NanoMessage. Generally, a command can be given via e.g. nanocat with the following format: nanocat --req --connect {address_of_procman} --raw -D "PROCESS_NAME COMMAND [ARGS...]" --raw Available commands: stop: stop the managed process restart: restarts the managed process with the same cmdline it was initially given status: show process information cmdline: show the command line arguments used for the process if PROCESS_NAME == procman, the process manager can execute certain commands which affect itself and/or all child processes. Available meta commands: help: show this help restart: restart all managed processes stop: stop all managed processes list: list all processes managed by the process mananger die: kill all children and stop running pid: show the pid of the process manager start NAME /PATH/TO/PROGRAM [ARGS...]: if the process identified by ``name`` is already managed by the process manager and currently running, does nothing. If it is managed but has been stopped previously, it is restarted with its original commandline. If it is not a managed process, the specified process will be started with the provided cmdline. """
[docs]class ProcessManager(object): """ This class starts and manages processes that are started by the script wni_process. This class is modeled after Linux services, and knows how to respond to the commands "start", "stop", "restart", and "status". """ def __init__(self, check_interval=10): self.check_interval = check_interval self.cmdlines = {} self.name_to_process = {} self._process_to_name = {} self._running_processes = set()
[docs] def manage(self, processes): """Adds `processes` to the list of running processes. `processes` is a list of processes to start; each element of processes is a list of arguments passed to psutil.Popen """ for proc in processes: name = proc[0] cmdline = proc[1] self.start(name, *cmdline)
[docs] def revive_dead_children(self): """Called from serve() approximately every `check_interval` seconds. Revive processes that died and need to be revived. """ processes = list(self.process_iter()) pnames = {self._process_to_name[proc] for proc in processes} out_of_sync = pnames - self._running_processes dead_processes = self._running_processes - pnames if out_of_sync: msg = 'self._running_processes got out of sync with managed processes. ' msg += 'Unmanaged process: "%s"' % out_of_sync logger.error("ProcessManager internal error: processes out of sync now") raise OutOfSyncError(msg) for procname in dead_processes: self.start(procname) logger.warning("Restarting process %s", procname)
[docs] def list_processes(self, verbose=False): fmt = '{:>5} {: <20s} {}'.format msg = [fmt('pid', 'name', 'status')] for name, process in self.name_to_process.items(): if name in self._running_processes: pid = process.pid msg.append(fmt(pid, name, process.status())) else: msg.append(fmt("-", name, '--------------')) # put a blank line at the end so that there will be a newline at the # end. msg.append('') return '\n'.join(msg)
[docs] def stop_all(self): # have to wrap _running_processes in a list so the set doesn't change # during iteration. for proc in list(self._running_processes): self.stop(proc)
[docs] def restart_all(self): """Restart all managed processes""" for proc in self._running_processes: self.restart(proc)
[docs] def status(self, name): """Return a string indicating the status of the process associated with ``name``""" not_running = 'The process "{}" is not running'.format(name) not_managed = 'Unknown process "{}"'.format(name) try: p = self.name_to_process[name] except KeyError: return not_managed if not p.is_running(): return not_running with p.oneshot(): cmdline = p.cmdline() cpu_times = p.cpu_times() cpu_percent = p.cpu_percent() pid = p.pid ctime = p.create_time() status = p.status() threads = p.num_threads() ctime = datetime.datetime.fromtimestamp(ctime) formatted = ctime.strftime('%Y%m%d-%H:%M:%S') x = [] x.append('{: <12} {}'.format("cmdline", " ".join(cmdline))) x.append('{: <12} {}'.format("cpu_time", cpu_times)) x.append('{: <12} {}'.format("cpu_percent", cpu_percent)) x.append('{: <12} {}'.format("pid", pid)) x.append('{: <12} {}'.format("time_created", formatted)) x.append('{: <12} {}'.format("status", status)) x.append('{: <12} {}\n'.format("threads", threads)) return '\n'.join(x)
[docs] def is_running(self, name): try: proc = self.name_to_process[name] except KeyError: return False try: status = proc.status() except psutil.NoSuchProcess: return False if status == 'zombie': proc.wait() return False else: return proc.is_running()
[docs] def start(self, name, *args): """ Start a process and make sure it keeps running. A ProcessManager will not allow multiple processes with identical `name`s to be started. The `name` is just a string, and is not necessarily related to the actual command line arguments. """ try: exe = args[0] arguments = args except IndexError: try: # if the process is already managed, it can be started without # needing to provide all the arguments again. arguments = self.cmdlines[name] exe = arguments[0] except KeyError: msg = "Process {} is not managed by this ProcessManager, and no command line was given" msg = msg.format(name) logger.error(msg) raise ProcManError(msg) exepath = shutil.which(exe) if not exepath: raise ValueError('The file "{}" does not exist'.format(exe)) if self.is_running(name): msg = 'A process with the name "{}" is already running'.format(name) raise ValueError(msg) logger.info('starting %s: %s', name, ' '.join(arguments)) p = psutil.Popen(arguments) self.cmdlines[name] = arguments self.name_to_process[name] = p self._process_to_name[p] = name self._running_processes.add(name) return p
[docs] def stop(self, name): def _kill_leafs(proc): """Kill all children of a process recursively.""" for p in proc.children(): _kill_leafs(p) logger.info('killing proc %s', ' '.join(proc.cmdline())) proc.terminate() proc.wait() p = self.name_to_process[name] # it _should_ be alive, but sometimes it isn't. if self.is_running(name): _kill_leafs(p) ret = True else: ret = False self._running_processes.discard(name) return ret
[docs] def restart(self, name): self.stop(name) self.start(name)
[docs] def process_iter(self): """Yield processes that are managed by this class. """ for name, proc in self.name_to_process.items(): if name not in self._running_processes: continue try: status = proc.status() except psutil.NoSuchProcess: continue if status == 'zombie': # gotta remove all references to the thing so it can die self.name_to_process[name] proc.wait() elif proc.is_running(): yield proc
[docs] def watch_processes_forever(self): """ Checks on its child processes eternally, polling them every ``self.check_interval`` seconds. """ try: while True: time.sleep(self.check_interval) self.revive_dead_children() except KeyboardInterrupt: self.stop_all() raise except: # noqa self.stop_all() logger.exception("ProcessManager is dying and killing all its children") raise
[docs] def close(self): self.stop_all()
[docs]class NanoProcessManager(ProcessManager): """A ProcessManager that can be controlled and queried via nanomsg.""" COMMANDS = [ "pid", "start", "stop", "restart", "status", "cmd", "cmdline", ] def __init__(self, addr, recv_timeout=1000, force=False, check_interval=10): """ if force==True, ProcessManager will kill another existing ProcessManager instance using the same addr. """ super().__init__(check_interval) self.addr = addr self.sock = nanomsg.Socket(nanomsg.REP) try: self.sock.bind(addr) except nanomsg.NanoMsgAPIError as e: if e.errno == nanomsg.EADDRINUSE and force: if not self.__kill_other_procman(): raise self.sock.bind(addr) else: raise self.sock.recv_timeout = recv_timeout # a mapping from process_name: [args] def __kill_other_procman(self): """ Kills a running ProcessManager that is running on the same port as this one. """ print('gonna kill') client = nanomsg.Socket(nanomsg.REQ) # should be connecting to another procman instance client.connect(self.addr) # get the PID so we can wait on the process client.send(b'procman pid') pid = int(client.recv()) proc = psutil.Process(pid) client.send(b'procman die') dying = client.recv() proc.wait() if dying == b'dying': return True else: return False
[docs] def recv(self): """listen for incoming commands.""" self.sock.recv()
[docs] def send(self, msg): """Send response.""" self.sock.send(msg)
[docs] def recv_and_resp(self): req = self.sock.recv() resp = self.obey(req) self.send(resp.encode()) # This is not very elegant; the code that calls this checks the return # value, and if it is True is required to exit. if resp.strip() == 'dying': return True
[docs] def serve(self): try: last_checked = time.monotonic() while True: try: should_die = self.recv_and_resp() if should_die: self.stop_all() self.sock.close() return except nanomsg.NanoMsgAPIError as e: now = time.monotonic() tdiff = now - last_checked if tdiff > self.check_interval: self.revive_dead_children() last_checked = time.monotonic() except KeyboardInterrupt: self.stop_all() raise except: # noqa self.stop_all() logger.exception("ProcessManager is dying and killing all its children") raise
[docs] def obey(self, data): """Respond to incoming commands.""" msg = data.decode() parts = msg.split() if len(parts) < 1: msg = self.handle_meta_command('help', []) return msg elif len(parts) < 2: # by default have the command be "status" parts.append('status') # name of the process to control pname = parts[0] # what to do with the process cmd = parts[1] # any arguments that the process requires (if it's being started or # restarted) args = parts[2:] # handle procman commands differently, because it is THIS process" if pname == "procman": msg = self.handle_meta_command(cmd, args) elif cmd not in self.COMMANDS: msg = 'unknown command "{}"'.format(cmd) return msg else: msg = self._do_command(cmd, pname, args) return msg
[docs] def handle_meta_command(self, cmd, args): """Responds to a command that changes stuff about this class""" if cmd == "disown": if not args: msg = 'Not enough arguments to command "%s"' % cmd else: name = args[0] if name not in self.cmdlines: msg = 'I do not know a process with the name "%s"' % name else: del self.cmdlines[name] del self.name_to_process[name] self._running_processes.discard(name) msg = 'unmanaged %s' % args[0] elif cmd == 'start': self.start(*args) msg = 'starting process {}'.format(args[0]) elif cmd == 'restart': self.restart_all() msg = 'restarted all processes' elif cmd == 'stop': self.stop_all() msg = 'stopped all managed processes' elif cmd == 'list' or cmd == 'status': verbose = True if '-v' in args else False msg = self.list_processes(verbose=verbose) elif cmd == 'die': msg = 'dying' elif cmd == 'pid': msg = str(os.getpid()) elif cmd == 'help': msg = _procman_help_text else: msg = "unrecognized command %s" % cmd msg += '\n' return msg
def _do_command(self, cmd, pname, args): """Execute the specified command """ if cmd == 'start': try: self.start(pname, *args) msg = 'process "{}" started'.format(pname) except ValueError: msg = 'process "{}" already running; not started'.format(pname) except ProcManError as e: msg = str(e) elif cmd == 'stop': try: worked = self.stop(pname) if worked: msg = 'process "{}" stopped'.format(pname) else: msg = 'process "{}" was already stopped'.format(pname) except ValueError: msg = 'Unknown process "{}"'.format(pname) elif cmd == 'restart': if pname not in self.name_to_process: msg = 'Unknown process "{}"'.format(pname) else: self.restart(pname) msg = 'process "{}" restarted'.format(pname) elif cmd == 'status': try: msg = self.status(pname) except ValueError: msg = 'No such process "{}"'.format(pname) elif cmd == 'cmdline' or cmd == 'cmd': try: p = self.name_to_process[pname] msg = ' '.join(p.cmdline()) except KeyError: msg = 'No such process "{}"'.format(pname) elif cmd == 'pid': try: p = self.name_to_process[pname] msg = str(p.pid) except KeyError: msg = 'No such process "{}"'.format(pname) else: raise ValueError('Unknown command "{}"'.format(cmd)) msg += '\n' return msg
[docs] def close(self): """Close the socket associated with the process manager""" super().close() self.sock.close()
[docs]class UnknownProcessError(Exception): pass
[docs]class ProcessStateError(Exception): pass
[docs]class NanoProcessManagerClient: """ Provides an interface to a NanoProcessManager in Python. """ def __init__(self, addr, recv_timeout=1000): self.addr = addr self.sock = nanomsg.Socket(nanomsg.REQ) self.sock.connect(addr) self.sock.recv_timeout = recv_timeout def __del__(self): self.sock.close() @property def processes(self): """ Return a list of strings of the names of all the running processes """ msg = self._send_and_recv('procman list') lines = msg.split('\n') # strip header lines = lines[1:] processes = [] for line in lines: if not line.strip(): continue name = line.split()[1] processes.append(name) return processes
[docs] def is_managed(self, name): """ Return True if the process with given ``name`` is managed by the ProcesssManager. """ ret = self._send_and_recv(name) return not ret.startswith('Unknown process')
[docs] def stop(self, process): """ Stop a running process. If the process is uknown to the ProcessManager, UnknownProcessError is raised. If the process was already stopped, ProcessStateError is raised. """ ret = self._send_and_recv('{} stop'.format(process)) if ret.startswith('Unknown'): raise UnknownProcessError(ret) elif 'already stopped' in ret: raise ProcessStateError(ret)
[docs] def start(self, process): """ Start a previously-stopped process. If the process is already running, a ProcessStateError is raised. """ ret = self._send_and_recv('{} start'.format(process)) if 'is not managed' in ret: raise UnknownProcessError(ret) if 'already running' in ret: raise ProcessStateError(ret)
[docs] def restart(self, process): """ Restart a process. If the process is unknown, an UnknownProcessError is raised. """ ret = self._send_and_recv('{} restart'.format(process)) if ret.startswith('Unknown'): raise UnknownProcessError(ret)
[docs] def status(self, process): ret = self._send_and_recv('{} status'.format(process)) if ret.startswith('Unknown process'): raise UnknownProcessError(ret) return ret
[docs] def process_is_running(self, process): ret = self.status(process) if 'is not running' in ret: return False else: return True
@property def manager_is_running(self): """ Detect if the process manager is running. """ try: self.processes except nanomsg.NanoMsgAPIError: return False return True def _send_and_recv_raw(self, data): self.sock.send(data) return self.sock.recv() def _send_and_recv(self, data): return self._send_and_recv_raw(data).decode()