Source code for wni.nanolog

"""
A Nanomsg log handler for Python.  NanomsgHandler publishes JSON logs to the
specified address.  It's up to something else to display those logs.
"""
from __future__ import print_function

import datetime
import errno
import json
import logging
import os
import sys
import traceback
import re

import nanomsg

from wni.nano.util import subscribe
import wni.config as config


# colors for printing log messages to stdout.
# key is log level, value will get passed to termcolor.cprint.
# val is list of [args, kwargs].
COLOR_MAP = {
    logging.DEBUG: [['cyan'], {}],
    logging.INFO: [['white'], {}],
    logging.WARNING: [['yellow'], {}],
    logging.ERROR: [['red'], {'attrs': ['bold']}],
    logging.CRITICAL: [['white', 'on_red'], {'attrs': ['bold']}],

}


MAX_LOG_FILESIZE = int(100e6)


[docs]def fixup_exc_info(d): """Put exception information into the msg of the dict.""" exc_info = d['exc_info'] orig_msg = d['msg'] exc = traceback.format_exception(*exc_info) # Remove blank lines exc = filter(lambda x: not re.match(r'^\s*$', x), exc) exc = ''.join(exc) new_msg = orig_msg + '\n%s' if orig_msg else '%s' d['args'] = d['args'] + (exc,) d['exc_info'] = None d['msg'] = new_msg
class _JSONForceReprEncoder(json.JSONEncoder): """A JSON encoder that encodes more things""" def default(self, o): """""" d = { '__UNSERIALIZABLE__': { 'type': str(type(o)), 'repr': repr(o) } } msg = self.encode(d) return msg
[docs]class NanomsgHandler(logging.Handler): """ A handler class which logs messages to a specified nanomsg host and port. Publishes the log message as utf-8 encoded JSON string. To receive the message, the receiver decode the message: # assuming you already have a suitable socket bytes = sock.recv() json_str = bytes.decode('utf-8') dict = json.loads(json_str) # do whatever you want with the dict """ _encoder = _JSONForceReprEncoder(indent=4) def __init__(self, connect_addr=config.NANOLOG_ADDR): super(NanomsgHandler, self).__init__() self.addr = connect_addr self.sock = None self.create_socket()
[docs] def create_socket(self): self.sock = nanomsg.Socket(nanomsg.PUB) self.sock.connect(self.addr)
[docs] def send(self, msg): """Publish the message. msg (bytes): the formatted log message to send. """ if self.sock is None: self.create_socket() self.sock.send(msg)
[docs] def format_record(self, record): """Return formatted bytes ready to be sent.""" d = record.__dict__ exc_info = d['exc_info'] if exc_info is not None: fixup_exc_info(d) assert d['exc_info'] is None j = self._encoder.encode(d) b = j.encode('utf-8') return b
[docs] def emit(self, record): bytes = self.format_record(record) self.send(bytes)
[docs]def color_print(msg, level, file=sys.stdout): # don't want to require termcolor if it's not needed, so only import it # here where it is used. from termcolor import cprint args, kwargs = COLOR_MAP.get(level, COLOR_MAP[logging.INFO]) try: cprint(msg, *args, file=file, **kwargs) except OSError as e: if e.errno == errno.EIO: # this happens if stderr/stdout get closed, e.g. if the process # gets disowned and the shell session gets closed this branch will # be taken. pass else: raise
[docs]def nanologger(sub_addr=config.NANOLOG_ADDR, pub_addr=config.LOGPUB_ADDR, log_filename=config.LOG_FILE): """Receives logs and prints the message as a dict to stderr and to the specified file. To do something else with the received dict, you can subclass this class and override the handle_messsage method. """ import colorama colorama.init() fmt = config.LOG_STDERR_FORMAT log_dir = os.path.dirname(os.path.abspath(log_filename)) if log_dir: os.makedirs(log_dir, exist_ok=True) log_file = open(log_filename, 'a') def handle_message(msg): """ Args: msg (dict): a logged message, as a dict. """ ctimestamp = msg['created'] dt = datetime.datetime.fromtimestamp(ctimestamp) ctime = dt.strftime(config.STRFTIME_FORMAT) msg['ctime'] = ctime try: msg['msg'] = msg['msg'] % tuple(msg.get('args', ())) except TypeError: print('Invalidly formatted log arguments') print(json.dumps(msg), file=log_file) log_file.flush() fmtd = (fmt % msg) color_print(fmtd, msg['levelno'], file=sys.stderr) # listen for logs on this socket. sub_sock = nanomsg.Socket(nanomsg.SUB) sub_sock.bind(sub_addr) sub_sock.recv_timeout = 2000 subscribe(sub_sock, b'') # re-publish the logs from this socket. pub_sock = nanomsg.Socket(nanomsg.PUB) pub_sock.bind(pub_addr) def handle_exception(msg): print(msg, file=log_file) color_print(msg, logging.WARNING, file=sys.stderr) traceback.print_exc(file=log_file) traceback.print_exc() def rotate_logs(current_file): current_file.close() os.rename(log_filename, "{}.1".format(log_filename)) log_file = open(log_filename, 'a') return log_file while True: try: msg = sub_sock.recv() except nanomsg.NanoMsgAPIError: continue except KeyboardInterrupt: break except: handle_exception('something unexpected happened while trying to recv logs') continue # republish pub_sock.send(msg) msg = msg.decode('utf-8') try: dictified = json.loads(msg) except: handle_exception('could not load message as json') continue try: handle_message(dictified) if log_file.tell() > MAX_LOG_FILESIZE: log_file = rotate_logs(log_file) except: msg = 'could not print log; not enough fields in JSON message' handle_exception(msg) continue
if __name__ == "__main__": import IPython logger = logging.getLogger('wni.nanolog') logger.addHandler(NanomsgHandler()) logger.setLevel(logging.DEBUG) IPython.embed()