Source code for galaxy.job_metrics.instrumenters.env

"""The module describes the ``env`` job metrics plugin."""

import logging
import re
from typing import (
    List,
    Optional,
)

from . import InstrumentPlugin
from ..formatting import JobMetricFormatter
from ..safety import Safety

log = logging.getLogger(__name__)


class EnvFormatter(JobMetricFormatter):
    pass


[docs]class EnvPlugin(InstrumentPlugin): """Instrumentation plugin capable of recording all or specific environment variables for a job at runtime. """ plugin_type = "env" formatter = EnvFormatter() variables: Optional[List[str]] default_safety = Safety.UNSAFE
[docs] def __init__(self, **kwargs): variables_str = kwargs.get("variables", None) if isinstance(variables_str, list): self.variables = variables_str elif variables_str: self.variables = [v.strip() for v in variables_str.split(",")] else: self.variables = None
[docs] def pre_execute_instrument(self, job_directory: str): """Use env to dump all environment variables to a file.""" return f"env > '{self.__env_file(job_directory)}'"
[docs] def post_execute_instrument(self, job_directory): return None
[docs] def job_properties(self, job_id, job_directory): """Recover environment variables dumped out on compute server and filter out specific variables if needed. """ variables = self.variables properties = {} env_string = "".join(open(self.__env_file(job_directory)).readlines()) while env_string: # Check if the next lines contain a shell function. # We use '\n\}\n' as regex termination because shell # functions can be nested. # We use the non-greedy '.+?' because of re.DOTALL . m = re.match(r"([^=]+)=(\(\) \{.+?\n\})\n", env_string, re.DOTALL) if m is None: m = re.match("([^=]+)=(.*)\n", env_string) if m is None: # Some problem recording or reading back env output. message_template = "Problem parsing env metric output for job %s - properties will be incomplete" message = message_template % job_id log.debug(message) break (var, value) = m.groups() if not variables or var in variables: properties[var] = value env_string = env_string[m.end() :] return properties
def __env_file(self, job_directory): return self._instrument_file_path(job_directory, "vars")
__all__ = ("EnvPlugin",)