Source code for galaxy.jobs.runners.chronos

import functools
import logging
import os

from galaxy import model
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState,
)
from galaxy.util import unicodify

CHRONOS_IMPORT_MSG = (
    "The Python 'chronos' package is required to use "
    "this feature, please install it or correct the "
    "following error:\nImportError {msg!s}"
)

try:
    import chronos

    chronos_exceptions = (
        chronos.ChronosAPIError,
        chronos.UnauthorizedError,
        chronos.MissingFieldError,
        chronos.OneOfViolationError,
    )
except ImportError as e:
    chronos = None
    CHRONOS_IMPORT_MSG.format(msg=unicodify(e))


__all__ = ("ChronosJobRunner",)
LOGGER = logging.getLogger(__name__)


class ChronosRunnerException(Exception):
    pass


def handle_exception_call(func):
    # Catch chronos exceptions. The latest version of chronos-python does
    # support a hierarchy over the exceptions.

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except chronos_exceptions as e:
            LOGGER.error(unicodify(e))

    return wrapper


def to_dict(segments, v):
    if len(segments) == 0:
        return v
    return {segments[0]: to_dict(segments[1:], v)}


def _write_logfile(logfile, msg):
    with open(logfile, "w") as fil:
        fil.write(msg)


def _parse_job_volumes_list(li):
    # Convert comma separated string to list
    volume_list = list(li.split(","))
    # Create the list with right mountpoint and permissions
    mountpoint_list = []
    # Convert each element to right format
    for i in volume_list:
        hpath, cpath, mode = i.split(":")
        mountpoint_list.append({"hostPath": hpath, "containerPath": cpath, "mode": mode})
    return mountpoint_list


def _add_galaxy_environment_variables(cpus, memory):
    # Set:
    # GALAXY_SLOTS: to docker_cpu
    # GALAXY_MEMORY_MB to docker_memory
    return [{"name": "GALAXY_SLOTS", "value": cpus}, {"name": "GALAXY_MEMORY_MB", "value": memory}]


[docs]class ChronosJobRunner(AsynchronousJobRunner): runner_name = "ChronosRunner" RUNNER_PARAM_SPEC_KEY = "runner_param_specs" JOB_NAME_PREFIX = "galaxy-chronos-" RUNNER_PARAM_SPEC = { "chronos": { "map": str, }, "insecure": { "map": lambda x: x in ["true", "True", "TRUE"], "default": True, }, "username": { "map": str, }, "password": { "map": str, }, "owner": {"map": str}, } DESTINATION_PARAMS_SPEC = { "docker_cpu": { "default": 0.1, "map_name": "cpus", "map": float, }, "docker_memory": { "default": 128, "map_name": "mem", "map": int, }, "docker_disk": { "default": 256, "map_name": "disk", "map": int, }, "volumes": { "default": None, "map_name": "container/volumes", "map": (lambda x: _parse_job_volumes_list(x) if x is not None else []), }, "max_retries": { "default": 2, "map_name": "retries", "map": int, }, }
[docs] def __init__(self, app, nworkers, **kwargs): """Initialize this job runner and start the monitor thread""" assert chronos, CHRONOS_IMPORT_MSG if self.RUNNER_PARAM_SPEC_KEY not in kwargs: kwargs[self.RUNNER_PARAM_SPEC_KEY] = {} kwargs[self.RUNNER_PARAM_SPEC_KEY].update(self.RUNNER_PARAM_SPEC) super().__init__(app, nworkers, **kwargs) protocol = "http" if self.runner_params.get("insecure", True) else "https" self._chronos_client = chronos.connect( self.runner_params["chronos"], username=self.runner_params.get("username"), password=self.runner_params.get("password"), proto=protocol, )
[docs] @handle_exception_call def queue_job(self, job_wrapper): LOGGER.debug(f"Starting queue_job for job {job_wrapper.get_id_tag()}") if not self.prepare_job(job_wrapper, include_metadata=False, modify_command_for_container=False): LOGGER.debug(f"Not ready {job_wrapper.get_id_tag()}") return job_destination = job_wrapper.job_destination chronos_job_spec = self._get_job_spec(job_wrapper) job_name = chronos_job_spec["name"] self._chronos_client.add(chronos_job_spec) ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=job_name, job_destination=job_destination, ) self.monitor_queue.put(ajs)
[docs] @handle_exception_call def stop_job(self, job_wrapper): job_id = job_wrapper.get_id_tag() job_name = self.JOB_NAME_PREFIX + job_id job = self._retrieve_job(job_name) if job: msg = "Job {name!r} is terminated" self._chronos_client.delete(job_name) LOGGER.debug(msg.format(name=job_name)) else: msg = "Job {name!r} not found. It cannot be terminated." LOGGER.error(msg.format(name=job_name))
[docs] def recover(self, job, job_wrapper): msg = "(name!r/runner!r) is still in {state!s} state, adding to" " the runner monitor queue" job_id = job.get_job_runner_external_id() ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_id=self.JOB_NAME_PREFIX + str(job_id), job_destination=job_wrapper.job_destination, ) ajs.command_line = job.command_line if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED): LOGGER.debug(msg.format(name=job.id, runner=job.job_runner_external_id, state=job.state)) ajs.old_state = model.Job.states.RUNNING ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: LOGGER.debug(msg.format(name=job.id, runner=job.job_runner_external_id, state="queued")) ajs.old_state = model.Job.states.QUEUED ajs.running = False self.monitor_queue.put(ajs)
[docs] @handle_exception_call def check_watched_item(self, job_state): job_name = job_state.job_id # TODO: how can stopped GxIT jobs be handled here? if job := self._retrieve_job(job_name): succeeded = job["successCount"] errors = job["errorCount"] if succeeded > 0: return self._mark_as_successful(job_state) elif not succeeded and not errors: return self._mark_as_active(job_state) elif errors: max_retries = job["retries"] if max_retries == 0: msg = "Job {name!r} failed. No retries performed." else: msg = "Job {name!r} failed more than {retries!s} times." reason = msg.format(name=job_name, retries=str(max_retries)) return self._mark_as_failed(job_state, reason) reason = f"Job {job_name!r} not found" return self._mark_as_failed(job_state, reason)
def _mark_as_successful(self, job_state): msg = "Job {name!r} finished successfully" _write_logfile(job_state.output_file, msg.format(name=job_state.job_id)) _write_logfile(job_state.error_file, "") job_state.running = False job_state.job_wrapper.change_state(model.Job.states.OK) self.mark_as_finished(job_state) return None def _mark_as_active(self, job_state): job_state.running = True job_state.job_wrapper.change_state(model.Job.states.RUNNING) return job_state def _mark_as_failed(self, job_state, reason): _write_logfile(job_state.error_file, reason) job_state.running = False job_state.stop_job = True job_state.job_wrapper.change_state(model.Job.states.ERROR) job_state.fail_message = reason self.mark_as_failed(job_state) return None
[docs] @handle_exception_call def finish_job(self, job_state): super().finish_job(job_state) self._chronos_client.delete(job_state.job_id)
[docs] def parse_destination_params(self, params): parsed_params = {} for k, spec in self.DESTINATION_PARAMS_SPEC.items(): value = params.get(k, spec.get("default")) map_to = spec.get("map_name") mapper = spec.get("map") segments = map_to.split("/") parsed_params.update(to_dict(segments, mapper(value))) return parsed_params
[docs] def write_command(self, job_wrapper): # Create command script instead passing it in the container # preventing wrong characters parsing. if not os.path.exists(job_wrapper.working_directory): LOGGER.error("No working directory found") path = f"{job_wrapper.working_directory}/chronos_{job_wrapper.get_id_tag()}.sh" mode = 0o755 with open(path, "w", encoding="utf-8") as f: f.write("#!/bin/bash\n") f.write(job_wrapper.runner_command_line) os.chmod(path, mode) return path
def _get_job_spec(self, job_wrapper): job_name = self.JOB_NAME_PREFIX + job_wrapper.get_id_tag() job_destination = job_wrapper.job_destination command_script_path = self.write_command(job_wrapper) template = { "async": False, # 'command': job_wrapper.runner_command_line, "command": f"$SHELL {command_script_path}", "owner": self.runner_params["owner"], "disabled": False, "schedule": "R1//PT1S", "name": job_name, # Add Galaxy environemnt variables to json "environmentVariables": _add_galaxy_environment_variables( job_destination.params.get("docker_cpu"), job_destination.params.get("docker_memory") ), } if not job_destination.params.get("docker_enabled"): raise ChronosRunnerException("ChronosJobRunner needs 'docker_enabled' to be set as True") destination_params = self.parse_destination_params(job_destination.params) template.update(destination_params) template["container"]["type"] = "DOCKER" template["container"]["image"] = self._find_container(job_wrapper).container_id # Fix the working directory inside the container template["container"]["parameters"] = [{"key": "workdir", "value": job_wrapper.working_directory}] return template def _retrieve_job(self, job_id): jobs = self._chronos_client.list() job = [x for x in jobs if x["name"] == job_id] if len(job) > 1: msg = f"Multiple jobs found with name {job_id!r}" LOGGER.error(msg) raise ChronosRunnerException(msg) return job[0] if job else None