Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.custom_logging.fluent_log

"""
Provides a `TraceLogger` implementation that logs to a fluentd collector
"""

import json
import threading
import time

try:
    from fluent.sender import FluentSender
except ImportError:
    FluentSender = None


FLUENT_IMPORT_MESSAGE = ('The Python fluent package is required to use this '
                         'feature, please install it')


[docs]class FluentTraceLogger:
[docs] def __init__(self, name, host='localhost', port=24224): assert FluentSender is not None, FLUENT_IMPORT_MESSAGE self.lock = threading.Lock() self.thread_local = threading.local() self.name = name self.sender = FluentSender(self.name, host=host, port=port)
[docs] def context_set(self, key, value): self.lock.acquire() if not hasattr(self.thread_local, 'context'): self.thread_local.context = {} self.thread_local.context[key] = value self.lock.release()
[docs] def context_remove(self, key): self.lock.acquire() del self.thread_local.context[key] self.lock.release()
[docs] def log(self, label, event_time=None, **kwargs): self.lock.acquire() if hasattr(self.thread_local, 'context'): kwargs.update(self.thread_local.context) self.lock.release() event_time = event_time or time.time() self.sender.emit_with_time(label, int(event_time), json.dumps(kwargs, default=str))