Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.job_metrics
"""This module defines the job metrics collection framework for Galaxy jobs.
The framework consists of two parts - the :class:`JobMetrics` class and
individual :class:`JobInstrumenter` plugins.
A :class:`JobMetrics` object reads any number of plugins from a configuration
source such as an XML file, a YAML file, or a dictionary.
Each :class:`JobInstrumenter` plugin object describes how to inject a bits
of shell code into a job scripts (before and after tool commands run) and then
collect the output of these from a job directory.
"""
import collections
import logging
import os
from galaxy import util
from galaxy.util import plugin_config
from ..job_metrics import formatting
log = logging.getLogger(__name__)
DEFAULT_FORMATTER = formatting.JobMetricFormatter()
[docs]class JobMetrics(object):
"""Load and store a collection of :class:`JobInstrumenter` objects."""
[docs] def __init__(self, conf_file=None, **kwargs):
"""Load :class:`JobInstrumenter` objects from specified configuration file."""
self.plugin_classes = self.__plugins_dict()
self.default_job_instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file, **kwargs)
self.job_instrumenters = collections.defaultdict(lambda: self.default_job_instrumenter)
[docs] def format(self, plugin, key, value):
"""Find :class:`formatting.JobMetricFormatter` corresponding to instrumented plugin value."""
if plugin in self.plugin_classes:
plugin_class = self.plugin_classes[plugin]
formatter = plugin_class.formatter
else:
formatter = DEFAULT_FORMATTER
return formatter.format(key, value)
[docs] def set_destination_conf_file(self, destination_id, conf_file):
instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file)
self.set_destination_instrumenter(destination_id, instrumenter)
[docs] def set_destination_conf_element(self, destination_id, element):
plugin_source = plugin_config.PluginConfigSource('xml', element)
instrumenter = JobInstrumenter(self.plugin_classes, plugin_source)
self.set_destination_instrumenter(destination_id, instrumenter)
[docs] def set_destination_conf_dicts(self, destination_id, conf_dicts):
plugin_source = plugin_config.PluginConfigSource('dict', conf_dicts)
instrumenter = JobInstrumenter(self.plugin_classes, plugin_source)
self.set_destination_instrumenter(destination_id, instrumenter)
[docs] def set_destination_instrumenter(self, destination_id, job_instrumenter=None):
if job_instrumenter is None:
job_instrumenter = NULL_JOB_INSTRUMENTER
self.job_instrumenters[destination_id] = job_instrumenter
[docs] def collect_properties(self, destination_id, job_id, job_directory):
return self.job_instrumenters[destination_id].collect_properties(job_id, job_directory)
def __plugins_dict(self):
import galaxy.job_metrics.instrumenters
return plugin_config.plugins_dict(galaxy.job_metrics.instrumenters, 'plugin_type')
NULL_JOB_INSTRUMENTER = NullJobInstrumenter()
[docs]class JobInstrumenter(object):
[docs] def __init__(self, plugin_classes, plugins_source, **kwargs):
self.extra_kwargs = kwargs
self.plugin_classes = plugin_classes
self.plugins = self.__plugins_from_source(plugins_source)
[docs] def pre_execute_commands(self, job_directory):
commands = []
for plugin in self.plugins:
try:
plugin_commands = plugin.pre_execute_instrument(job_directory)
if plugin_commands:
commands.extend(util.listify(plugin_commands))
except Exception:
log.exception("Failed to generate pre-execute commands for plugin %s", plugin)
return "\n".join(c for c in commands if c)
[docs] def post_execute_commands(self, job_directory):
commands = []
for plugin in self.plugins:
try:
plugin_commands = plugin.post_execute_instrument(job_directory)
if plugin_commands:
commands.extend(util.listify(plugin_commands))
except Exception:
log.exception("Failed to generate post-execute commands for plugin %s", plugin)
return "\n".join(c for c in commands if c)
[docs] def collect_properties(self, job_id, job_directory):
per_plugin_properties = {}
for plugin in self.plugins:
try:
properties = plugin.job_properties(job_id, job_directory)
if properties:
per_plugin_properties[plugin.plugin_type] = properties
except Exception:
log.exception("Failed to collect job properties for plugin %s", plugin)
return per_plugin_properties
def __plugins_from_source(self, plugins_source):
return plugin_config.load_plugins(self.plugin_classes, plugins_source, self.extra_kwargs)
[docs] @staticmethod
def from_file(plugin_classes, conf_file, **kwargs):
if not conf_file or not os.path.exists(conf_file):
return NULL_JOB_INSTRUMENTER
plugins_source = plugin_config.plugin_source_from_path(conf_file)
return JobInstrumenter(plugin_classes, plugins_source, **kwargs)