Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.job_metrics.instrumenters.core

"""The module describes the ``core`` job metrics plugin."""

import logging
import time
from typing import (
    Any,
    Dict,
    List,
)

from . import InstrumentPlugin
from ..formatting import (
    FormattedMetric,
    JobMetricFormatter,
    seconds_to_str,
)
from ..safety import Safety

log = logging.getLogger(__name__)

GALAXY_SLOTS_KEY = "galaxy_slots"
GALAXY_MEMORY_MB_KEY = "galaxy_memory_mb"
START_EPOCH_KEY = "start_epoch"
END_EPOCH_KEY = "end_epoch"
RUNTIME_SECONDS_KEY = "runtime_seconds"


class CorePluginFormatter(JobMetricFormatter):
    def format(self, key: str, value: Any) -> FormattedMetric:
        value = int(value)
        if key == GALAXY_SLOTS_KEY:
            return FormattedMetric("Cores Allocated", "%d" % value)
        elif key == GALAXY_MEMORY_MB_KEY:
            return FormattedMetric("Memory Allocated (MB)", "%d" % value)
        elif key == RUNTIME_SECONDS_KEY:
            return FormattedMetric("Job Runtime (Wall Clock)", seconds_to_str(value))
        else:
            # TODO: Use localized version of this from galaxy.ini
            title = "Job Start Time" if key == START_EPOCH_KEY else "Job End Time"
            return FormattedMetric(title, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(value)))


[docs]class CorePlugin(InstrumentPlugin): """Simple plugin that collects data without external dependencies. In particular it currently collects value set for Galaxy slots. """ plugin_type = "core" formatter = CorePluginFormatter() default_safety = Safety.SAFE
[docs] def __init__(self, **kwargs): pass
[docs] def pre_execute_instrument(self, job_directory: str) -> List[str]: commands = [] commands.append(self.__record_galaxy_slots_command(job_directory)) commands.append(self.__record_galaxy_memory_mb_command(job_directory)) commands.append(self.__record_seconds_since_epoch_to_file(job_directory, "start")) return commands
[docs] def post_execute_instrument(self, job_directory: str) -> List[str]: commands = [] commands.append(self.__record_seconds_since_epoch_to_file(job_directory, "end")) return commands
[docs] def job_properties(self, job_id, job_directory: str) -> Dict[str, Any]: galaxy_slots_file = self.__galaxy_slots_file(job_directory) galaxy_memory_mb_file = self.__galaxy_memory_mb_file(job_directory) properties = {} properties[GALAXY_SLOTS_KEY] = self.__read_integer(galaxy_slots_file) properties[GALAXY_MEMORY_MB_KEY] = self.__read_integer(galaxy_memory_mb_file) start = self.__read_seconds_since_epoch(job_directory, "start") end = self.__read_seconds_since_epoch(job_directory, "end") if start is not None and end is not None: properties[START_EPOCH_KEY] = start properties[END_EPOCH_KEY] = end properties[RUNTIME_SECONDS_KEY] = end - start return properties
def __record_galaxy_slots_command(self, job_directory): galaxy_slots_file = self.__galaxy_slots_file(job_directory) return f"""echo "$GALAXY_SLOTS" > '{galaxy_slots_file}' """ def __record_galaxy_memory_mb_command(self, job_directory): galaxy_memory_mb_file = self.__galaxy_memory_mb_file(job_directory) return f"""echo "$GALAXY_MEMORY_MB" > '{galaxy_memory_mb_file}' """ def __record_seconds_since_epoch_to_file(self, job_directory, name): path = self._instrument_file_path(job_directory, f"epoch_{name}") return f'date +"%s" > {path}' def __read_seconds_since_epoch(self, job_directory, name): path = self._instrument_file_path(job_directory, f"epoch_{name}") return self.__read_integer(path) def __galaxy_slots_file(self, job_directory): return self._instrument_file_path(job_directory, "galaxy_slots") def __galaxy_memory_mb_file(self, job_directory): return self._instrument_file_path(job_directory, "galaxy_memory_mb") def __read_integer(self, path): value = None try: value = int(open(path).read()) except Exception: pass return value
__all__ = ("CorePlugin",)