Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.job_metrics

"""This module defines the job metrics collection framework for Galaxy jobs.

The framework consists of two parts - the :class:`JobMetrics` class and
individual :class:`JobInstrumenter` plugins.

A :class:`JobMetrics` object reads any number of plugins from a configuration
source such as an XML file, a YAML file, or a dictionary.

Each :class:`JobInstrumenter` plugin object describes how to inject a bits
of shell code into a job scripts (before and after tool commands run) and then
collect the output of these from a job directory.
"""
import collections
import logging
import os

from galaxy import util
from galaxy.util import plugin_config
from ..job_metrics import formatting

log = logging.getLogger(__name__)


DEFAULT_FORMATTER = formatting.JobMetricFormatter()


[docs]class JobMetrics: """Load and store a collection of :class:`JobInstrumenter` objects."""
[docs] def __init__(self, conf_file=None, **kwargs): """Load :class:`JobInstrumenter` objects from specified configuration file.""" self.plugin_classes = self.__plugins_dict() self.default_job_instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file, **kwargs) self.job_instrumenters = collections.defaultdict(lambda: self.default_job_instrumenter)
[docs] def format(self, plugin, key, value): """Find :class:`formatting.JobMetricFormatter` corresponding to instrumented plugin value.""" if plugin in self.plugin_classes: plugin_class = self.plugin_classes[plugin] formatter = plugin_class.formatter else: formatter = DEFAULT_FORMATTER return formatter.format(key, value)
[docs] def set_destination_conf_file(self, destination_id, conf_file): instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file) self.set_destination_instrumenter(destination_id, instrumenter)
[docs] def set_destination_conf_element(self, destination_id, element): plugin_source = plugin_config.PluginConfigSource('xml', element) instrumenter = JobInstrumenter(self.plugin_classes, plugin_source) self.set_destination_instrumenter(destination_id, instrumenter)
[docs] def set_destination_conf_dicts(self, destination_id, conf_dicts): plugin_source = plugin_config.PluginConfigSource('dict', conf_dicts) instrumenter = JobInstrumenter(self.plugin_classes, plugin_source) self.set_destination_instrumenter(destination_id, instrumenter)
[docs] def set_destination_instrumenter(self, destination_id, job_instrumenter=None): if job_instrumenter is None: job_instrumenter = NULL_JOB_INSTRUMENTER self.job_instrumenters[destination_id] = job_instrumenter
[docs] def collect_properties(self, destination_id, job_id, job_directory): return self.job_instrumenters[destination_id].collect_properties(job_id, job_directory)
def __plugins_dict(self): import galaxy.job_metrics.instrumenters return plugin_config.plugins_dict(galaxy.job_metrics.instrumenters, 'plugin_type')
[docs]class NullJobInstrumenter:
[docs] def pre_execute_commands(self, job_directory): return None
[docs] def post_execute_commands(self, job_directory): return None
[docs] def collect_properties(self, job_id, job_directory): return {}
NULL_JOB_INSTRUMENTER = NullJobInstrumenter()
[docs]class JobInstrumenter:
[docs] def __init__(self, plugin_classes, plugins_source, **kwargs): self.extra_kwargs = kwargs self.plugin_classes = plugin_classes self.plugins = self.__plugins_from_source(plugins_source)
[docs] def pre_execute_commands(self, job_directory): commands = [] for plugin in self.plugins: try: plugin_commands = plugin.pre_execute_instrument(job_directory) if plugin_commands: commands.extend(util.listify(plugin_commands)) except Exception: log.exception("Failed to generate pre-execute commands for plugin %s", plugin) return "\n".join(c for c in commands if c)
[docs] def post_execute_commands(self, job_directory): commands = [] for plugin in self.plugins: try: plugin_commands = plugin.post_execute_instrument(job_directory) if plugin_commands: commands.extend(util.listify(plugin_commands)) except Exception: log.exception("Failed to generate post-execute commands for plugin %s", plugin) return "\n".join(c for c in commands if c)
[docs] def collect_properties(self, job_id, job_directory): per_plugin_properties = {} for plugin in self.plugins: try: properties = plugin.job_properties(job_id, job_directory) if properties: per_plugin_properties[plugin.plugin_type] = properties except Exception: log.exception("Failed to collect job properties for plugin %s", plugin) return per_plugin_properties
def __plugins_from_source(self, plugins_source): return plugin_config.load_plugins(self.plugin_classes, plugins_source, self.extra_kwargs)
[docs] @staticmethod def from_file(plugin_classes, conf_file, **kwargs): if not conf_file or not os.path.exists(conf_file): return NULL_JOB_INSTRUMENTER plugins_source = plugin_config.plugin_source_from_path(conf_file) return JobInstrumenter(plugin_classes, plugins_source, **kwargs)