"""
A Nanomsg log handler for Python. NanomsgHandler publishes JSON logs to the
specified address. It's up to something else to display those logs.
"""
from __future__ import print_function
import datetime
import errno
import json
import logging
import os
import sys
import traceback
import re
import nanomsg
from wni.nano.util import subscribe
import wni.config as config
# colors for printing log messages to stdout.
# key is log level, value will get passed to termcolor.cprint.
# val is list of [args, kwargs].
COLOR_MAP = {
logging.DEBUG: [['cyan'], {}],
logging.INFO: [['white'], {}],
logging.WARNING: [['yellow'], {}],
logging.ERROR: [['red'], {'attrs': ['bold']}],
logging.CRITICAL: [['white', 'on_red'], {'attrs': ['bold']}],
}
MAX_LOG_FILESIZE = int(100e6)
[docs]def fixup_exc_info(d):
"""Put exception information into the msg of the dict."""
exc_info = d['exc_info']
orig_msg = d['msg']
exc = traceback.format_exception(*exc_info)
# Remove blank lines
exc = filter(lambda x: not re.match(r'^\s*$', x), exc)
exc = ''.join(exc)
new_msg = orig_msg + '\n%s' if orig_msg else '%s'
d['args'] = d['args'] + (exc,)
d['exc_info'] = None
d['msg'] = new_msg
class _JSONForceReprEncoder(json.JSONEncoder):
"""A JSON encoder that encodes more things"""
def default(self, o):
""""""
d = {
'__UNSERIALIZABLE__': {
'type': str(type(o)),
'repr': repr(o)
}
}
msg = self.encode(d)
return msg
[docs]class NanomsgHandler(logging.Handler):
"""
A handler class which logs messages to a specified nanomsg host and port.
Publishes the log message as utf-8 encoded JSON string. To receive the
message, the receiver decode the message:
# assuming you already have a suitable socket
bytes = sock.recv()
json_str = bytes.decode('utf-8')
dict = json.loads(json_str)
# do whatever you want with the dict
"""
_encoder = _JSONForceReprEncoder(indent=4)
def __init__(self, connect_addr=config.NANOLOG_ADDR):
super(NanomsgHandler, self).__init__()
self.addr = connect_addr
self.sock = None
self.create_socket()
[docs] def create_socket(self):
self.sock = nanomsg.Socket(nanomsg.PUB)
self.sock.connect(self.addr)
[docs] def send(self, msg):
"""Publish the message.
msg (bytes): the formatted log message to send.
"""
if self.sock is None:
self.create_socket()
self.sock.send(msg)
[docs] def emit(self, record):
bytes = self.format_record(record)
self.send(bytes)
[docs]def color_print(msg, level, file=sys.stdout):
# don't want to require termcolor if it's not needed, so only import it
# here where it is used.
from termcolor import cprint
args, kwargs = COLOR_MAP.get(level, COLOR_MAP[logging.INFO])
try:
cprint(msg, *args, file=file, **kwargs)
except OSError as e:
if e.errno == errno.EIO:
# this happens if stderr/stdout get closed, e.g. if the process
# gets disowned and the shell session gets closed this branch will
# be taken.
pass
else:
raise
[docs]def nanologger(sub_addr=config.NANOLOG_ADDR, pub_addr=config.LOGPUB_ADDR, log_filename=config.LOG_FILE):
"""Receives logs and prints the message as a dict to stderr and to the
specified file.
To do something else with the received dict, you can subclass this class
and override the handle_messsage method.
"""
import colorama
colorama.init()
fmt = config.LOG_STDERR_FORMAT
log_dir = os.path.dirname(os.path.abspath(log_filename))
if log_dir:
os.makedirs(log_dir, exist_ok=True)
log_file = open(log_filename, 'a')
def handle_message(msg):
"""
Args:
msg (dict): a logged message, as a dict.
"""
ctimestamp = msg['created']
dt = datetime.datetime.fromtimestamp(ctimestamp)
ctime = dt.strftime(config.STRFTIME_FORMAT)
msg['ctime'] = ctime
try:
msg['msg'] = msg['msg'] % tuple(msg.get('args', ()))
except TypeError:
print('Invalidly formatted log arguments')
print(json.dumps(msg), file=log_file)
log_file.flush()
fmtd = (fmt % msg)
color_print(fmtd, msg['levelno'], file=sys.stderr)
# listen for logs on this socket.
sub_sock = nanomsg.Socket(nanomsg.SUB)
sub_sock.bind(sub_addr)
sub_sock.recv_timeout = 2000
subscribe(sub_sock, b'')
# re-publish the logs from this socket.
pub_sock = nanomsg.Socket(nanomsg.PUB)
pub_sock.bind(pub_addr)
def handle_exception(msg):
print(msg, file=log_file)
color_print(msg, logging.WARNING, file=sys.stderr)
traceback.print_exc(file=log_file)
traceback.print_exc()
def rotate_logs(current_file):
current_file.close()
os.rename(log_filename, "{}.1".format(log_filename))
log_file = open(log_filename, 'a')
return log_file
while True:
try:
msg = sub_sock.recv()
except nanomsg.NanoMsgAPIError:
continue
except KeyboardInterrupt:
break
except:
handle_exception('something unexpected happened while trying to recv logs')
continue
# republish
pub_sock.send(msg)
msg = msg.decode('utf-8')
try:
dictified = json.loads(msg)
except:
handle_exception('could not load message as json')
continue
try:
handle_message(dictified)
if log_file.tell() > MAX_LOG_FILESIZE:
log_file = rotate_logs(log_file)
except:
msg = 'could not print log; not enough fields in JSON message'
handle_exception(msg)
continue
if __name__ == "__main__":
import IPython
logger = logging.getLogger('wni.nanolog')
logger.addHandler(NanomsgHandler())
logger.setLevel(logging.DEBUG)
IPython.embed()