Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.cli
"""
Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh).
"""
import logging
import time
from galaxy import model
from galaxy.jobs import JobDestination
from galaxy.jobs.runners import (
AsynchronousJobRunner,
AsynchronousJobState,
JobState,
)
from galaxy.util import asbool
from .util.cli import (
CliInterface,
split_params,
)
log = logging.getLogger(__name__)
__all__ = ("ShellJobRunner",)
DEFAULT_EMBED_METADATA_IN_JOB = True
MAX_SUBMIT_RETRY = 3
[docs]class ShellJobRunner(AsynchronousJobRunner):
"""
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
runner_name = "ShellRunner"
[docs] def __init__(self, app, nworkers):
"""Start the job runner"""
super().__init__(app, nworkers)
self.cli_interface = CliInterface()
[docs] def get_cli_plugins(self, shell_params, job_params):
return self.cli_interface.get_plugins(shell_params, job_params)
[docs] def url_to_destination(self, url):
params = {}
shell_params, job_params = url.split("/")[2:4]
# split 'foo=bar&baz=quux' into { 'foo' : 'bar', 'baz' : 'quux' }
shell_params = {f"shell_{k}": v for k, v in [kv.split("=", 1) for kv in shell_params.split("&")]}
job_params = {f"job_{k}": v for k, v in [kv.split("=", 1) for kv in job_params.split("&")]}
params.update(shell_params)
params.update(job_params)
log.debug(f"Converted URL '{url}' to destination runner=cli, params={params}")
# Create a dynamic JobDestination
return JobDestination(runner="cli", params=params)
[docs] def queue_job(self, job_wrapper):
"""Create job script and submit it to the DRM"""
# prepare the job
include_metadata = asbool(
job_wrapper.job_destination.params.get("embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB)
)
if not self.prepare_job(job_wrapper, include_metadata=include_metadata):
return
# Get shell and job execution interface
job_destination = job_wrapper.job_destination
shell_params, job_params = self.parse_destination_params(job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
# wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers.
galaxy_id_tag = job_wrapper.get_id_tag()
# define job attributes
ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper)
job_file_kwargs = job_interface.job_script_kwargs(ajs.output_file, ajs.error_file, ajs.job_name)
script = self.get_job_file(
job_wrapper, exit_code_path=ajs.exit_code_file, shell=job_wrapper.shell, **job_file_kwargs
)
try:
self.write_executable_script(ajs.job_file, script, job_io=job_wrapper.job_io)
except Exception:
log.exception(f"({galaxy_id_tag}) failure writing job script")
job_wrapper.fail("failure preparing job script", exception=True)
return
# job was deleted while we were preparing it
if job_wrapper.get_state() in (model.Job.states.DELETED, model.Job.states.STOPPED):
log.debug("(%s) Job deleted/stopped by user before it entered the queue", galaxy_id_tag)
if job_wrapper.cleanup_job in ("always", "onsuccess"):
job_wrapper.cleanup()
return
log.debug(f"({galaxy_id_tag}) submitting file: {ajs.job_file}")
returncode, stdout = self.submit(shell, job_interface, ajs.job_file, galaxy_id_tag, retry=MAX_SUBMIT_RETRY)
if returncode != 0:
job_wrapper.fail("failure submitting job")
return
# Some job runners return something like 'Submitted batch job XXXX'
# Strip and split to get job ID.
submit_stdout = stdout.strip()
external_job_id = submit_stdout and submit_stdout.split()[-1]
if not external_job_id:
log.error(f"({galaxy_id_tag}) submission did not return a job identifier, failing job")
job_wrapper.fail("failure submitting job")
return
log.info(f"({galaxy_id_tag}) queued with identifier: {external_job_id}")
# store runner information for tracking if Galaxy restarts
job_wrapper.set_external_id(external_job_id)
# Store state information for job
ajs.job_id = external_job_id
ajs.old_state = "new"
ajs.job_destination = job_destination
# Add to our 'queue' of jobs to monitor
self.monitor_queue.put(ajs)
[docs] def submit(self, shell, job_interface, job_file, galaxy_id_tag, retry=MAX_SUBMIT_RETRY, timeout=10):
"""
Handles actual job script submission.
If submission fails will retry `retry` time with a timeout of `timeout` seconds.
Retuns the returncode of the submission and the stdout, which contains the external job_id.
"""
cmd_out = shell.execute(job_interface.submit(job_file))
if cmd_out.returncode == 0:
return cmd_out.returncode, cmd_out.stdout
stdout = f"({galaxy_id_tag}) submission failed (stdout): {cmd_out.stdout}"
stderr = f"({galaxy_id_tag}) submission failed (stderr): {cmd_out.stderr}"
if retry > 0:
log.debug("%s, retrying in %s seconds", stdout, timeout)
log.debug("%s, retrying in %s seconds", stderr, timeout)
time.sleep(timeout)
return self.submit(shell, job_interface, job_file, galaxy_id_tag, retry=retry - 1, timeout=timeout)
else:
log.error(stdout)
log.error(stderr)
return cmd_out.returncode, cmd_out.stdout
[docs] def check_watched_items(self):
"""
Called by the monitor thread to look at each watched job and deal
with state changes.
"""
new_watched = []
job_states = self.__get_job_states()
for ajs in self.watched:
external_job_id = ajs.job_id
id_tag = ajs.job_wrapper.get_id_tag()
old_state = ajs.old_state
state = job_states.get(external_job_id, None)
if state is None:
if ajs.job_wrapper.get_state() == model.Job.states.DELETED:
continue
log.debug(f"({id_tag}/{external_job_id}) job not found in batch state check")
shell_params, job_params = self.parse_destination_params(ajs.job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.get_single_status(external_job_id))
state = job_interface.parse_single_status(cmd_out.stdout, external_job_id)
if not state == model.Job.states.OK:
log.warning(
f"({id_tag}/{external_job_id}) job not found in batch state check, but found in individual state check"
)
job_state = ajs.job_wrapper.get_state()
if state != old_state:
log.debug(f"({id_tag}/{external_job_id}) state change: from {old_state} to {state}")
if state == model.Job.states.ERROR and job_state != model.Job.states.STOPPED:
# Try to find out the reason for exiting - this needs to happen before change_state
# otherwise jobs depending on resubmission outputs see that job as failed and pause.
self.__handle_out_of_memory(ajs, external_job_id)
self.work_queue.put((self.mark_as_failed, ajs))
# Don't add the job to the watched items once it fails, deals with https://github.com/galaxyproject/galaxy/issues/7820
continue
if not state == model.Job.states.OK:
# No need to change_state when the state is OK, this will be handled by `self.finish_job`
ajs.job_wrapper.change_state(state)
if state == model.Job.states.RUNNING and not ajs.running:
ajs.running = True
ajs.old_state = state
if state == model.Job.states.OK or job_state == model.Job.states.STOPPED:
external_metadata = not asbool(
ajs.job_wrapper.job_destination.params.get("embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB)
)
if external_metadata:
self.work_queue.put((self.handle_metadata_externally, ajs))
log.debug(f"({id_tag}/{external_job_id}) job execution finished, running job wrapper finish method")
self.work_queue.put((self.finish_job, ajs))
else:
new_watched.append(ajs)
# Replace the watch list with the updated version
self.watched = new_watched
[docs] def handle_metadata_externally(self, ajs):
self._handle_metadata_externally(ajs.job_wrapper, resolve_requirements=True)
def __handle_out_of_memory(self, ajs, external_job_id):
shell_params, job_params = self.parse_destination_params(ajs.job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.get_failure_reason(external_job_id))
if cmd_out is not None:
if (
job_interface.parse_failure_reason(cmd_out.stdout, external_job_id)
== JobState.runner_states.MEMORY_LIMIT_REACHED
):
ajs.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED
ajs.fail_message = "Tool failed due to insufficient memory. Try with more memory."
def __get_job_states(self):
job_destinations = {}
job_states = {}
# unique the list of destinations
for ajs in self.watched:
if ajs.job_destination.id not in job_destinations:
job_destinations[ajs.job_destination.id] = dict(
job_destination=ajs.job_destination, job_ids=[ajs.job_id]
)
else:
job_destinations[ajs.job_destination.id]["job_ids"].append(ajs.job_id)
# check each destination for the listed job ids
for v in job_destinations.values():
job_destination = v["job_destination"]
job_ids = v["job_ids"]
shell_params, job_params = self.parse_destination_params(job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.get_status(job_ids))
assert cmd_out.returncode == 0, cmd_out.stderr
job_states.update(job_interface.parse_status(cmd_out.stdout, job_ids))
return job_states
[docs] def stop_job(self, job_wrapper):
"""Attempts to delete a dispatched job"""
job = job_wrapper.get_job()
try:
shell_params, job_params = self.parse_destination_params(job.destination_params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.delete(job.job_runner_external_id))
assert cmd_out.returncode == 0, cmd_out.stderr
log.debug(f"({job.id}/{job.job_runner_external_id}) Terminated at user's request")
except Exception as e:
log.debug(
f"({job.id}/{job.job_runner_external_id}) User killed running job, but error encountered during termination: {e}"
)
[docs] def recover(self, job, job_wrapper):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
job_id = job.get_job_runner_external_id()
if job_id is None:
self.put(job_wrapper)
return
ajs = AsynchronousJobState(
files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_id=job_id,
job_destination=job_wrapper.job_destination,
)
ajs.command_line = job.command_line
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
log.debug(
f"({job.id}/{job.job_runner_external_id}) is still in {job.state} state, adding to the runner monitor queue"
)
ajs.old_state = model.Job.states.RUNNING
ajs.running = True
self.monitor_queue.put(ajs)
elif job.state == model.Job.states.QUEUED:
log.debug(
f"({job.id}/{job.job_runner_external_id}) is still in queued state, adding to the runner monitor queue"
)
ajs.old_state = model.Job.states.QUEUED
ajs.running = False
self.monitor_queue.put(ajs)