Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.util.custom_logging.fluent_log
"""
Provides a `TraceLogger` implementation that logs to a fluentd collector
"""
import json
import threading
import time
try:
from fluent.sender import FluentSender
except ImportError:
FluentSender = None
FLUENT_IMPORT_MESSAGE = "The Python fluent package is required to use this " "feature, please install it"
[docs]class FluentTraceLogger:
[docs] def __init__(self, name, host="localhost", port=24224):
assert FluentSender is not None, FLUENT_IMPORT_MESSAGE
self.lock = threading.Lock()
self.thread_local = threading.local()
self.name = name
self.sender = FluentSender(self.name, host=host, port=port)
[docs] def context_set(self, key, value):
self.lock.acquire()
if not hasattr(self.thread_local, "context"):
self.thread_local.context = {}
self.thread_local.context[key] = value
self.lock.release()
[docs] def context_remove(self, key):
self.lock.acquire()
del self.thread_local.context[key]
self.lock.release()
[docs] def log(self, label, event_time=None, **kwargs):
self.lock.acquire()
if hasattr(self.thread_local, "context"):
kwargs.update(self.thread_local.context)
self.lock.release()
event_time = event_time or time.time()
self.sender.emit_with_time(label, int(event_time), json.dumps(kwargs, default=str))