This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.error_reports.plugins.influxdb

"""The module describes the ``influxdb`` error plugin plugin."""

import datetime
import logging

    import influxdb
except ImportError:
    # This middleware will never be used without influxdb.
    influxdb = None

from galaxy.util import unicodify
from . import ErrorPlugin

log = logging.getLogger(__name__)

[docs]class InfluxDBPlugin(ErrorPlugin): """Send error report to InfluxDB """ plugin_type = "influxdb"
[docs] def __init__(self, **kwargs): if not influxdb: raise ImportError("Could not find InfluxDB Client, this error reporter will be disabled; please pip install influxdb") self.app = kwargs['app'] # Neither of these matter self.verbose = False self.user_submission = False # Anything with influxdb_ gets sent to the client initialization influx_args = { k[len('influxdb_'):]: v for (k, v) in kwargs.items() if k.startswith('influxdb_') } print(influx_args) self.client = influxdb.InfluxDBClient(**influx_args)
[docs] def submit_report(self, dataset, job, tool, **kwargs): """Submit the error report to sentry """ self.client.write_points([{ 'measurement': 'galaxy_tool_error', 'time': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), 'fields': { 'value': 1 }, 'tags': { 'exit_code': job.exit_code, 'tool_id': unicodify(job.tool_id), 'tool_version': unicodify(job.tool_version), 'tool_xml': unicodify(tool.config_file) if tool else None, 'destination_id': unicodify(job.destination_id), 'handler': unicodify(job.handler), } }]) return ('Submitted to InfluxDB', 'success')
__all__ = ('InfluxDBPlugin', )