Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.metrics.instrumenters.core

"""The module describes the ``core`` job metrics plugin."""
import logging
import time

from ..instrumenters import InstrumentPlugin
from ...metrics import formatting

log = logging.getLogger(__name__)

GALAXY_SLOTS_KEY = "galaxy_slots"
GALAXY_MEMORY_MB_KEY = "galaxy_memory_mb"
START_EPOCH_KEY = "start_epoch"
END_EPOCH_KEY = "end_epoch"
RUNTIME_SECONDS_KEY = "runtime_seconds"


class CorePluginFormatter(formatting.JobMetricFormatter):

    def format(self, key, value):
        value = int(value)
        if key == GALAXY_SLOTS_KEY:
            return ("Cores Allocated", "%d" % value)
        elif key == GALAXY_MEMORY_MB_KEY:
            return ("Memory Allocated (MB)", "%d" % value)
        elif key == RUNTIME_SECONDS_KEY:
            return ("Job Runtime (Wall Clock)", formatting.seconds_to_str(value))
        else:
            # TODO: Use localized version of this from galaxy.ini
            title = "Job Start Time" if key == START_EPOCH_KEY else "Job End Time"
            return (title, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(value)))


[docs]class CorePlugin(InstrumentPlugin): """ Simple plugin that collects data without external dependencies. In particular it currently collects value set for Galaxy slots. """ plugin_type = "core" formatter = CorePluginFormatter()
[docs] def __init__(self, **kwargs): pass
[docs] def pre_execute_instrument(self, job_directory): commands = [] commands.append(self.__record_galaxy_slots_command(job_directory)) commands.append(self.__record_galaxy_memory_mb_command(job_directory)) commands.append(self.__record_seconds_since_epoch_to_file(job_directory, "start")) return commands
[docs] def post_execute_instrument(self, job_directory): commands = [] commands.append(self.__record_seconds_since_epoch_to_file(job_directory, "end")) return commands
[docs] def job_properties(self, job_id, job_directory): galaxy_slots_file = self.__galaxy_slots_file(job_directory) galaxy_memory_mb_file = self.__galaxy_memory_mb_file(job_directory) properties = {} properties[GALAXY_SLOTS_KEY] = self.__read_integer(galaxy_slots_file) properties[GALAXY_MEMORY_MB_KEY] = self.__read_integer(galaxy_memory_mb_file) start = self.__read_seconds_since_epoch(job_directory, "start") end = self.__read_seconds_since_epoch(job_directory, "end") if start is not None and end is not None: properties[START_EPOCH_KEY] = start properties[END_EPOCH_KEY] = end properties[RUNTIME_SECONDS_KEY] = end - start return properties
def __record_galaxy_slots_command(self, job_directory): galaxy_slots_file = self.__galaxy_slots_file(job_directory) return '''echo "$GALAXY_SLOTS" > '%s' ''' % galaxy_slots_file def __record_galaxy_memory_mb_command(self, job_directory): galaxy_memory_mb_file = self.__galaxy_memory_mb_file(job_directory) return '''echo "$GALAXY_MEMORY_MB" > '%s' ''' % galaxy_memory_mb_file def __record_seconds_since_epoch_to_file(self, job_directory, name): path = self._instrument_file_path(job_directory, "epoch_%s" % name) return 'date +"%s" > ' + path def __read_seconds_since_epoch(self, job_directory, name): path = self._instrument_file_path(job_directory, "epoch_%s" % name) return self.__read_integer(path) def __galaxy_slots_file(self, job_directory): return self._instrument_file_path(job_directory, "galaxy_slots") def __galaxy_memory_mb_file(self, job_directory): return self._instrument_file_path(job_directory, "galaxy_memory_mb") def __read_integer(self, path): value = None try: value = int(open(path, "r").read()) except Exception: pass return value
__all__ = ('CorePlugin', )