Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.util.custom_logging.fluent_log
"""
Provides a `TraceLogger` implementation that logs to a fluentd collector
"""
import json
import threading
import time
try:
    from fluent.sender import FluentSender
except ImportError:
    FluentSender = None
FLUENT_IMPORT_MESSAGE = ('The Python fluent package is required to use this '
                         'feature, please install it')
[docs]class FluentTraceLogger:
[docs]    def __init__(self, name, host='localhost', port=24224):
        assert FluentSender is not None, FLUENT_IMPORT_MESSAGE
        self.lock = threading.Lock()
        self.thread_local = threading.local()
        self.name = name
        self.sender = FluentSender(self.name, host=host, port=port)
[docs]    def context_set(self, key, value):
        self.lock.acquire()
        if not hasattr(self.thread_local, 'context'):
            self.thread_local.context = {}
        self.thread_local.context[key] = value
        self.lock.release()
[docs]    def context_remove(self, key):
        self.lock.acquire()
        del self.thread_local.context[key]
        self.lock.release()
[docs]    def log(self, label, event_time=None, **kwargs):
        self.lock.acquire()
        if hasattr(self.thread_local, 'context'):
            kwargs.update(self.thread_local.context)
        self.lock.release()
        event_time = event_time or time.time()
        self.sender.emit_with_time(label, int(event_time), json.dumps(kwargs, default=str))