Source code for galaxy.job_metrics

"""This module defines the job metrics collection framework for Galaxy jobs.

The framework consists of two parts - the :class:`JobMetrics` class and
individual :class:`JobInstrumenter` plugins.

A :class:`JobMetrics` object reads any number of plugins from a configuration
source such as an XML file, a YAML file, or a dictionary.

Each :class:`JobInstrumenter` plugin object describes how to inject a bits
of shell code into a job scripts (before and after tool commands run) and then
collect the output of these from a job directory.
"""

import collections
import logging
import os
from abc import (
    ABCMeta,
    abstractmethod,
)
from typing import (
    Any,
    cast,
    Dict,
    List,
    NamedTuple,
    Optional,
    TYPE_CHECKING,
)

from galaxy import util
from galaxy.util import plugin_config
from . import formatting
from .safety import (
    DEFAULT_SAFETY,
    Safety,
)

if TYPE_CHECKING:
    from galaxy.job_metrics.instrumenters import InstrumentPlugin

log = logging.getLogger(__name__)


DEFAULT_FORMATTER = formatting.JobMetricFormatter()
DEFAULT_CONFIG = [{"type": "core"}]


class DictifiableMetric(NamedTuple):
    """The full context of a metric that is ready to be exposed to an external client."""

    title: str
    value: str
    raw_value: str
    name: str
    plugin: str
    safety: Safety = Safety.POTENTIALLY_SENSITVE

    def dict(self) -> Dict[str, str]:
        return dict(
            title=self.title,
            value=self.value,
            plugin=self.plugin,
            name=self.name,
            raw_value=self.raw_value,
        )


class RawMetric(NamedTuple):
    metric_name: str
    metric_value: Any
    metric_plugin: str


class JobMetrics:
    """Load and store a collection of :class:`JobInstrumenter` objects."""

    def __init__(self, conf_file=None, conf_dict=None, **kwargs):
        """Load :class:`JobInstrumenter` objects from specified configuration file."""
        self.plugin_classes = cast(Dict[str, "InstrumentPlugin"], self.__plugins_dict())
        if conf_file and os.path.exists(conf_file):
            self.default_job_instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file, **kwargs)
        elif conf_dict or conf_dict is None:
            if conf_dict is None:
                conf_dict = DEFAULT_CONFIG
            self.default_job_instrumenter = JobInstrumenter.from_dict(self.plugin_classes, conf_dict, **kwargs)
        else:
            # allows for setting non-None falsey values to get no metrics config whatsoever
            self.default_job_instrumenter = NULL_JOB_INSTRUMENTER
        self.job_instrumenters = collections.defaultdict(lambda: self.default_job_instrumenter)

    def format(self, plugin: str, key: str, value: Any) -> formatting.FormattedMetric:
        """Find :class:`formatting.JobMetricFormatter` corresponding to instrumented plugin value."""
        if plugin in self.plugin_classes:
            plugin_class = self.plugin_classes[plugin]
            formatter = plugin_class.formatter
        else:
            formatter = DEFAULT_FORMATTER
        return formatter.format(key, value)

    def dictifiable_metrics(self, raw_metrics: List[RawMetric], allowed_safety: Safety) -> List[DictifiableMetric]:
        def raw_to_dictifiable(raw_metric: RawMetric) -> DictifiableMetric:
            metric_name, metric_value, metric_plugin = raw_metric
            title, value = self.format(metric_plugin, metric_name, metric_value)
            configured_plugin = self.default_job_instrumenter.get_configured_plugin(metric_plugin)
            if configured_plugin is not None:
                safety = configured_plugin.safety(metric_name)
            elif metric_plugin in self.plugin_classes:
                plugin_class = self.plugin_classes[metric_plugin]
                safety = plugin_class.default_safety
            else:
                safety = DEFAULT_SAFETY
            return DictifiableMetric(
                title,
                value,
                str(metric_value),
                metric_name,
                metric_plugin,
                safety,
            )

        metrics = map(raw_to_dictifiable, raw_metrics)
        return [m for m in metrics if m.safety.value >= allowed_safety.value]

    def set_destination_conf_file(self, destination_id, conf_file):
        instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file)
        self.set_destination_instrumenter(destination_id, instrumenter)

    def set_destination_conf_element(self, destination_id, element):
        plugin_source = plugin_config.PluginConfigSource("xml", element)
        instrumenter = JobInstrumenter(self.plugin_classes, plugin_source)
        self.set_destination_instrumenter(destination_id, instrumenter)

    def set_destination_conf_dicts(self, destination_id, conf_dicts):
        plugin_source = plugin_config.PluginConfigSource("dict", conf_dicts)
        instrumenter = JobInstrumenter(self.plugin_classes, plugin_source)
        self.set_destination_instrumenter(destination_id, instrumenter)

    def set_destination_instrumenter(self, destination_id, job_instrumenter=None):
        if job_instrumenter is None:
            job_instrumenter = NULL_JOB_INSTRUMENTER
        self.job_instrumenters[destination_id] = job_instrumenter

    def collect_properties(self, destination_id, job_id, job_directory):
        return self.job_instrumenters[destination_id].collect_properties(job_id, job_directory)

    def __plugins_dict(self):
        import galaxy.job_metrics.instrumenters

        return plugin_config.plugins_dict(galaxy.job_metrics.instrumenters, "plugin_type")


class JobInstrumenterI(metaclass=ABCMeta):
    @abstractmethod
    def pre_execute_commands(self, job_directory: str) -> Optional[str]:
        return None

    @abstractmethod
    def post_execute_commands(self, job_directory: str) -> Optional[str]:
        return None

    @abstractmethod
    def collect_properties(self, job_id, job_directory: str) -> Dict[str, Any]:
        return {}

    @abstractmethod
    def get_configured_plugin(self, plugin_type: str):
        return None


class NullJobInstrumenter(JobInstrumenterI):
    def pre_execute_commands(self, job_directory):
        return None

    def post_execute_commands(self, job_directory):
        return None

    def collect_properties(self, job_id, job_directory):
        return {}

    def get_configured_plugin(self, plugin_type: str):
        return None


NULL_JOB_INSTRUMENTER = NullJobInstrumenter()


[docs]class JobInstrumenter(JobInstrumenterI):
[docs] def __init__(self, plugin_classes, plugins_source, **kwargs): self.extra_kwargs = kwargs self.plugin_classes = plugin_classes self.plugins = self.__plugins_from_source(plugins_source)
[docs] def get_configured_plugin(self, plugin_type: str): for plugin in self.plugins: if plugin.plugin_type == plugin_type: return plugin return None
[docs] def pre_execute_commands(self, job_directory): commands = [] for plugin in self.plugins: try: plugin_commands = plugin.pre_execute_instrument(job_directory) if plugin_commands: commands.extend(util.listify(plugin_commands)) except Exception: log.exception("Failed to generate pre-execute commands for plugin %s", plugin) return "\n".join(c for c in commands if c)
[docs] def post_execute_commands(self, job_directory): commands = [] for plugin in self.plugins: try: plugin_commands = plugin.post_execute_instrument(job_directory) if plugin_commands: commands.extend(util.listify(plugin_commands)) except Exception: log.exception("Failed to generate post-execute commands for plugin %s", plugin) return "\n".join(c for c in commands if c)
[docs] def collect_properties(self, job_id, job_directory): per_plugin_properties = {} for plugin in self.plugins: try: properties = plugin.job_properties(job_id, job_directory) if properties: per_plugin_properties[plugin.plugin_type] = properties except FileNotFoundError as e: log.warning("Failed to collect job properties for plugin %s: %s", plugin, e) except Exception: log.exception("Failed to collect job properties for plugin %s", plugin) return per_plugin_properties
def __plugins_from_source(self, plugins_source): return plugin_config.load_plugins(self.plugin_classes, plugins_source, self.extra_kwargs)
[docs] @staticmethod def from_file(plugin_classes, conf_file, **kwargs) -> "JobInstrumenterI": if not conf_file or not os.path.exists(conf_file): return NULL_JOB_INSTRUMENTER plugins_source = plugin_config.plugin_source_from_path(conf_file) return JobInstrumenter(plugin_classes, plugins_source, **kwargs)
[docs] @staticmethod def from_dict(plugin_classes, conf_dict, **kwargs) -> "JobInstrumenterI": plugin_source = plugin_config.plugin_source_from_dict(conf_dict) return JobInstrumenter(plugin_classes, plugin_source, **kwargs)
__all__ = ( "JobInstrumenter", "Safety", )