Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.chronos
import functools
import logging
import os
from galaxy import model
from galaxy.jobs.runners import (
AsynchronousJobRunner,
AsynchronousJobState,
)
from galaxy.util import unicodify
CHRONOS_IMPORT_MSG = (
"The Python 'chronos' package is required to use "
"this feature, please install it or correct the "
"following error:\nImportError {msg!s}"
)
try:
import chronos
chronos_exceptions = (
chronos.ChronosAPIError,
chronos.UnauthorizedError,
chronos.MissingFieldError,
chronos.OneOfViolationError,
)
except ImportError as e:
chronos = None
CHRONOS_IMPORT_MSG.format(msg=unicodify(e))
__all__ = ("ChronosJobRunner",)
LOGGER = logging.getLogger(__name__)
class ChronosRunnerException(Exception):
pass
def handle_exception_call(func):
# Catch chronos exceptions. The latest version of chronos-python does
# support a hierarchy over the exceptions.
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except chronos_exceptions as e:
LOGGER.error(unicodify(e))
return wrapper
def to_dict(segments, v):
if len(segments) == 0:
return v
return {segments[0]: to_dict(segments[1:], v)}
def _write_logfile(logfile, msg):
with open(logfile, "w") as fil:
fil.write(msg)
def _parse_job_volumes_list(li):
# Convert comma separated string to list
volume_list = list(li.split(","))
# Create the list with right mountpoint and permissions
mountpoint_list = []
# Convert each element to right format
for i in volume_list:
hpath, cpath, mode = i.split(":")
mountpoint_list.append({"hostPath": hpath, "containerPath": cpath, "mode": mode})
return mountpoint_list
def _add_galaxy_environment_variables(cpus, memory):
# Set:
# GALAXY_SLOTS: to docker_cpu
# GALAXY_MEMORY_MB to docker_memory
return [{"name": "GALAXY_SLOTS", "value": cpus}, {"name": "GALAXY_MEMORY_MB", "value": memory}]
[docs]class ChronosJobRunner(AsynchronousJobRunner):
runner_name = "ChronosRunner"
RUNNER_PARAM_SPEC_KEY = "runner_param_specs"
JOB_NAME_PREFIX = "galaxy-chronos-"
RUNNER_PARAM_SPEC = {
"chronos": {
"map": str,
},
"insecure": {
"map": lambda x: x in ["true", "True", "TRUE"],
"default": True,
},
"username": {
"map": str,
},
"password": {
"map": str,
},
"owner": {"map": str},
}
DESTINATION_PARAMS_SPEC = {
"docker_cpu": {
"default": 0.1,
"map_name": "cpus",
"map": float,
},
"docker_memory": {
"default": 128,
"map_name": "mem",
"map": int,
},
"docker_disk": {
"default": 256,
"map_name": "disk",
"map": int,
},
"volumes": {
"default": None,
"map_name": "container/volumes",
"map": (lambda x: _parse_job_volumes_list(x) if x is not None else []),
},
"max_retries": {
"default": 2,
"map_name": "retries",
"map": int,
},
}
[docs] def __init__(self, app, nworkers, **kwargs):
"""Initialize this job runner and start the monitor thread"""
assert chronos, CHRONOS_IMPORT_MSG
if self.RUNNER_PARAM_SPEC_KEY not in kwargs:
kwargs[self.RUNNER_PARAM_SPEC_KEY] = {}
kwargs[self.RUNNER_PARAM_SPEC_KEY].update(self.RUNNER_PARAM_SPEC)
super().__init__(app, nworkers, **kwargs)
protocol = "http" if self.runner_params.get("insecure", True) else "https"
self._chronos_client = chronos.connect(
self.runner_params["chronos"],
username=self.runner_params.get("username"),
password=self.runner_params.get("password"),
proto=protocol,
)
[docs] @handle_exception_call
def queue_job(self, job_wrapper):
LOGGER.debug(f"Starting queue_job for job {job_wrapper.get_id_tag()}")
if not self.prepare_job(job_wrapper, include_metadata=False, modify_command_for_container=False):
LOGGER.debug(f"Not ready {job_wrapper.get_id_tag()}")
return
job_destination = job_wrapper.job_destination
chronos_job_spec = self._get_job_spec(job_wrapper)
job_name = chronos_job_spec["name"]
self._chronos_client.add(chronos_job_spec)
ajs = AsynchronousJobState(
files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_id=job_name,
job_destination=job_destination,
)
self.monitor_queue.put(ajs)
[docs] @handle_exception_call
def stop_job(self, job_wrapper):
job_id = job_wrapper.get_id_tag()
job_name = self.JOB_NAME_PREFIX + job_id
job = self._retrieve_job(job_name)
if job:
msg = "Job {name!r} is terminated"
self._chronos_client.delete(job_name)
LOGGER.debug(msg.format(name=job_name))
else:
msg = "Job {name!r} not found. It cannot be terminated."
LOGGER.error(msg.format(name=job_name))
[docs] def recover(self, job, job_wrapper):
msg = "(name!r/runner!r) is still in {state!s} state, adding to the runner monitor queue"
job_id = job.get_job_runner_external_id()
ajs = AsynchronousJobState(
files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_id=self.JOB_NAME_PREFIX + str(job_id),
job_destination=job_wrapper.job_destination,
)
ajs.command_line = job.command_line
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
LOGGER.debug(msg.format(name=job.id, runner=job.job_runner_external_id, state=job.state))
ajs.old_state = model.Job.states.RUNNING
ajs.running = True
self.monitor_queue.put(ajs)
elif job.state == model.Job.states.QUEUED:
LOGGER.debug(msg.format(name=job.id, runner=job.job_runner_external_id, state="queued"))
ajs.old_state = model.Job.states.QUEUED
ajs.running = False
self.monitor_queue.put(ajs)
[docs] @handle_exception_call
def check_watched_item(self, job_state):
job_name = job_state.job_id
# TODO: how can stopped GxIT jobs be handled here?
if job := self._retrieve_job(job_name):
succeeded = job["successCount"]
errors = job["errorCount"]
if succeeded > 0:
return self._mark_as_successful(job_state)
elif not succeeded and not errors:
return self._mark_as_active(job_state)
elif errors:
max_retries = job["retries"]
if max_retries == 0:
msg = "Job {name!r} failed. No retries performed."
else:
msg = "Job {name!r} failed more than {retries!s} times."
reason = msg.format(name=job_name, retries=str(max_retries))
return self._mark_as_failed(job_state, reason)
reason = f"Job {job_name!r} not found"
return self._mark_as_failed(job_state, reason)
def _mark_as_successful(self, job_state):
msg = "Job {name!r} finished successfully"
_write_logfile(job_state.output_file, msg.format(name=job_state.job_id))
_write_logfile(job_state.error_file, "")
job_state.running = False
job_state.job_wrapper.change_state(model.Job.states.OK)
self.mark_as_finished(job_state)
return None
def _mark_as_active(self, job_state):
job_state.running = True
job_state.job_wrapper.change_state(model.Job.states.RUNNING)
return job_state
def _mark_as_failed(self, job_state, reason):
_write_logfile(job_state.error_file, reason)
job_state.running = False
job_state.stop_job = True
job_state.job_wrapper.change_state(model.Job.states.ERROR)
job_state.fail_message = reason
self.mark_as_failed(job_state)
return None
[docs] @handle_exception_call
def finish_job(self, job_state):
super().finish_job(job_state)
self._chronos_client.delete(job_state.job_id)
[docs] def parse_destination_params(self, params):
parsed_params = {}
for k, spec in self.DESTINATION_PARAMS_SPEC.items():
value = params.get(k, spec.get("default"))
map_to = spec.get("map_name")
mapper = spec.get("map")
segments = map_to.split("/")
parsed_params.update(to_dict(segments, mapper(value)))
return parsed_params
[docs] def write_command(self, job_wrapper):
# Create command script instead passing it in the container
# preventing wrong characters parsing.
if not os.path.exists(job_wrapper.working_directory):
LOGGER.error("No working directory found")
path = f"{job_wrapper.working_directory}/chronos_{job_wrapper.get_id_tag()}.sh"
mode = 0o755
with open(path, "w", encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write(job_wrapper.runner_command_line)
os.chmod(path, mode)
return path
def _get_job_spec(self, job_wrapper):
job_name = self.JOB_NAME_PREFIX + job_wrapper.get_id_tag()
job_destination = job_wrapper.job_destination
command_script_path = self.write_command(job_wrapper)
template = {
"async": False,
# 'command': job_wrapper.runner_command_line,
"command": f"$SHELL {command_script_path}",
"owner": self.runner_params["owner"],
"disabled": False,
"schedule": "R1//PT1S",
"name": job_name,
# Add Galaxy environemnt variables to json
"environmentVariables": _add_galaxy_environment_variables(
job_destination.params.get("docker_cpu"), job_destination.params.get("docker_memory")
),
}
if not job_destination.params.get("docker_enabled"):
raise ChronosRunnerException("ChronosJobRunner needs 'docker_enabled' to be set as True")
destination_params = self.parse_destination_params(job_destination.params)
template.update(destination_params)
template["container"]["type"] = "DOCKER"
template["container"]["image"] = self._find_container(job_wrapper).container_id
# Fix the working directory inside the container
template["container"]["parameters"] = [{"key": "workdir", "value": job_wrapper.working_directory}]
return template
def _retrieve_job(self, job_id):
jobs = self._chronos_client.list()
job = [x for x in jobs if x["name"] == job_id]
if len(job) > 1:
msg = f"Multiple jobs found with name {job_id!r}"
LOGGER.error(msg)
raise ChronosRunnerException(msg)
return job[0] if job else None