Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.job_metrics

"""This module defines the job metrics collection framework for Galaxy jobs.

The framework consists of two parts - the :class:`JobMetrics` class and
individual :class:`JobInstrumenter` plugins.

A :class:`JobMetrics` object reads any number of plugins from a configuration
source such as an XML file, a YAML file, or a dictionary.

Each :class:`JobInstrumenter` plugin object describes how to inject a bits
of shell code into a job scripts (before and after tool commands run) and then
collect the output of these from a job directory.
"""
import collections
import logging
import os
from abc import (
    ABCMeta,
    abstractmethod,
)
from typing import (
    Any,
    Dict,
    List,
    NamedTuple,
    Optional,
)

from galaxy import util
from galaxy.util import plugin_config
from .safety import (
    DEFAULT_SAFETY,
    Safety,
)
from ..job_metrics import formatting

log = logging.getLogger(__name__)


DEFAULT_FORMATTER = formatting.JobMetricFormatter()


class DictifiableMetric(NamedTuple):
    """The full context of a metric that is ready to be exposed to an external client."""

    title: str
    value: str
    raw_value: str
    name: str
    plugin: str
    safety: Safety = Safety.POTENTIALLY_SENSITVE

    def dict(self) -> Dict[str, str]:
        return dict(
            title=self.title,
            value=self.value,
            plugin=self.plugin,
            name=self.name,
            raw_value=self.raw_value,
        )


class RawMetric(NamedTuple):
    metric_name: str
    metric_value: Any
    metric_plugin: str


class JobMetrics:
    """Load and store a collection of :class:`JobInstrumenter` objects."""

    def __init__(self, conf_file=None, **kwargs):
        """Load :class:`JobInstrumenter` objects from specified configuration file."""
        self.plugin_classes = self.__plugins_dict()
        self.default_job_instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file, **kwargs)
        self.job_instrumenters = collections.defaultdict(lambda: self.default_job_instrumenter)

    def format(self, plugin: str, key: str, value: Any) -> formatting.FormattedMetric:
        """Find :class:`formatting.JobMetricFormatter` corresponding to instrumented plugin value."""
        if plugin in self.plugin_classes:
            plugin_class = self.plugin_classes[plugin]
            formatter = plugin_class.formatter
        else:
            formatter = DEFAULT_FORMATTER
        return formatter.format(key, value)

    def dictifiable_metrics(self, raw_metrics: List[RawMetric], allowed_safety: Safety) -> List[DictifiableMetric]:
        def raw_to_dictifiable(raw_metric: RawMetric) -> DictifiableMetric:
            metric_name, metric_value, metric_plugin = raw_metric
            title, value = self.format(metric_plugin, metric_name, metric_value)
            configured_plugin = self.default_job_instrumenter.get_configured_plugin(metric_plugin)
            if configured_plugin is not None:
                safety = configured_plugin.safety(metric_name)
            elif metric_plugin in self.plugin_classes:
                plugin_class = self.plugin_classes[metric_plugin]
                safety = plugin_class.default_safety
            else:
                safety = DEFAULT_SAFETY
            return DictifiableMetric(
                title,
                value,
                str(metric_value),
                metric_name,
                metric_plugin,
                safety,
            )

        metrics = map(raw_to_dictifiable, raw_metrics)
        return [m for m in metrics if m.safety.value >= allowed_safety.value]

    def set_destination_conf_file(self, destination_id, conf_file):
        instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file)
        self.set_destination_instrumenter(destination_id, instrumenter)

    def set_destination_conf_element(self, destination_id, element):
        plugin_source = plugin_config.PluginConfigSource("xml", element)
        instrumenter = JobInstrumenter(self.plugin_classes, plugin_source)
        self.set_destination_instrumenter(destination_id, instrumenter)

    def set_destination_conf_dicts(self, destination_id, conf_dicts):
        plugin_source = plugin_config.PluginConfigSource("dict", conf_dicts)
        instrumenter = JobInstrumenter(self.plugin_classes, plugin_source)
        self.set_destination_instrumenter(destination_id, instrumenter)

    def set_destination_instrumenter(self, destination_id, job_instrumenter=None):
        if job_instrumenter is None:
            job_instrumenter = NULL_JOB_INSTRUMENTER
        self.job_instrumenters[destination_id] = job_instrumenter

    def collect_properties(self, destination_id, job_id, job_directory):
        return self.job_instrumenters[destination_id].collect_properties(job_id, job_directory)

    def __plugins_dict(self):
        import galaxy.job_metrics.instrumenters

        return plugin_config.plugins_dict(galaxy.job_metrics.instrumenters, "plugin_type")


class JobInstrumenterI(metaclass=ABCMeta):
    @abstractmethod
    def pre_execute_commands(self, job_directory: str) -> Optional[str]:
        return None

    @abstractmethod
    def post_execute_commands(self, job_directory: str) -> Optional[str]:
        return None

    @abstractmethod
    def collect_properties(self, job_id, job_directory: str) -> Dict[str, Any]:
        return {}

    @abstractmethod
    def get_configured_plugin(self, plugin_type: str):
        return None


class NullJobInstrumenter(JobInstrumenterI):
    def pre_execute_commands(self, job_directory):
        return None

    def post_execute_commands(self, job_directory):
        return None

    def collect_properties(self, job_id, job_directory):
        return {}

    def get_configured_plugin(self, plugin_type: str):
        return None


NULL_JOB_INSTRUMENTER = NullJobInstrumenter()


[docs]class JobInstrumenter(JobInstrumenterI):
[docs] def __init__(self, plugin_classes, plugins_source, **kwargs): self.extra_kwargs = kwargs self.plugin_classes = plugin_classes self.plugins = self.__plugins_from_source(plugins_source)
[docs] def get_configured_plugin(self, plugin_type: str): for plugin in self.plugins: if plugin.plugin_type == plugin_type: return plugin return None
[docs] def pre_execute_commands(self, job_directory): commands = [] for plugin in self.plugins: try: plugin_commands = plugin.pre_execute_instrument(job_directory) if plugin_commands: commands.extend(util.listify(plugin_commands)) except Exception: log.exception("Failed to generate pre-execute commands for plugin %s", plugin) return "\n".join(c for c in commands if c)
[docs] def post_execute_commands(self, job_directory): commands = [] for plugin in self.plugins: try: plugin_commands = plugin.post_execute_instrument(job_directory) if plugin_commands: commands.extend(util.listify(plugin_commands)) except Exception: log.exception("Failed to generate post-execute commands for plugin %s", plugin) return "\n".join(c for c in commands if c)
[docs] def collect_properties(self, job_id, job_directory): per_plugin_properties = {} for plugin in self.plugins: try: properties = plugin.job_properties(job_id, job_directory) if properties: per_plugin_properties[plugin.plugin_type] = properties except Exception: log.exception("Failed to collect job properties for plugin %s", plugin) return per_plugin_properties
def __plugins_from_source(self, plugins_source): return plugin_config.load_plugins(self.plugin_classes, plugins_source, self.extra_kwargs)
[docs] @staticmethod def from_file(plugin_classes, conf_file, **kwargs) -> "JobInstrumenterI": if not conf_file or not os.path.exists(conf_file): return NULL_JOB_INSTRUMENTER plugins_source = plugin_config.plugin_source_from_path(conf_file) return JobInstrumenter(plugin_classes, plugins_source, **kwargs)
__all__ = ( "JobInstrumenter", "Safety", )