Source code for galaxy.jobs.runners.cli

Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh).

import logging
import time

from galaxy import model
from galaxy.jobs import JobDestination
from galaxy.jobs.runners import (
from galaxy.util import asbool
from .util.cli import CliInterface, split_params

log = logging.getLogger(__name__)

__all__ = ('ShellJobRunner', )


[docs]class ShellJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "ShellRunner"
[docs] def __init__(self, app, nworkers): """Start the job runner """ super().__init__(app, nworkers) self.cli_interface = CliInterface() self._init_monitor_thread() self._init_worker_threads()
[docs] def get_cli_plugins(self, shell_params, job_params): return self.cli_interface.get_plugins(shell_params, job_params)
[docs] def url_to_destination(self, url): params = {} shell_params, job_params = url.split('/')[2:4] # split 'foo=bar&baz=quux' into { 'foo' : 'bar', 'baz' : 'quux' } shell_params = {'shell_' + k: v for k, v in [kv.split('=', 1) for kv in shell_params.split('&')]} job_params = {'job_' + k: v for k, v in [kv.split('=', 1) for kv in job_params.split('&')]} params.update(shell_params) params.update(job_params) log.debug("Converted URL '{}' to destination runner=cli, params={}".format(url, params)) # Create a dynamic JobDestination return JobDestination(runner='cli', params=params)
[docs] def parse_destination_params(self, params): return split_params(params)
[docs] def queue_job(self, job_wrapper): """Create job script and submit it to the DRM""" # prepare the job include_metadata = asbool(job_wrapper.job_destination.params.get("embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB)) if not self.prepare_job(job_wrapper, include_metadata=include_metadata): return # Get shell and job execution interface job_destination = job_wrapper.job_destination shell_params, job_params = self.parse_destination_params(job_destination.params) shell, job_interface = self.get_cli_plugins(shell_params, job_params) # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. galaxy_id_tag = job_wrapper.get_id_tag() # define job attributes ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper) job_file_kwargs = job_interface.job_script_kwargs(ajs.output_file, ajs.error_file, ajs.job_name) script = self.get_job_file( job_wrapper, exit_code_path=ajs.exit_code_file, shell=job_wrapper.shell, **job_file_kwargs ) try: self.write_executable_script(ajs.job_file, script) except Exception: log.exception("(%s) failure writing job script" % galaxy_id_tag) job_wrapper.fail("failure preparing job script", exception=True) return # job was deleted while we were preparing it if job_wrapper.get_state() == model.Job.states.DELETED: log.info("(%s) Job deleted by user before it entered the queue" % galaxy_id_tag) if job_wrapper.cleanup_job in ("always", "onsuccess"): job_wrapper.cleanup() return log.debug("({}) submitting file: {}".format(galaxy_id_tag, ajs.job_file)) returncode, stdout = self.submit(shell, job_interface, ajs.job_file, galaxy_id_tag, retry=MAX_SUBMIT_RETRY) if returncode != 0: job_wrapper.fail("failure submitting job") return # Some job runners return something like 'Submitted batch job XXXX' # Strip and split to get job ID. external_job_id = stdout.strip().split()[-1] if not external_job_id: log.error('(%s) submission did not return a job identifier, failing job' % galaxy_id_tag) job_wrapper.fail("failure submitting job") return log.info("({}) queued with identifier: {}".format(galaxy_id_tag, external_job_id)) # store runner information for tracking if Galaxy restarts job_wrapper.set_external_id(external_job_id) # Store state information for job ajs.job_id = external_job_id ajs.old_state = 'new' ajs.job_destination = job_destination # Add to our 'queue' of jobs to monitor self.monitor_queue.put(ajs)
[docs] def submit(self, shell, job_interface, job_file, galaxy_id_tag, retry=MAX_SUBMIT_RETRY, timeout=10): """ Handles actual job script submission. If submission fails will retry `retry` time with a timeout of `timeout` seconds. Retuns the returncode of the submission and the stdout, which contains the external job_id. """ cmd_out = shell.execute(job_interface.submit(job_file)) if cmd_out.returncode == 0: return cmd_out.returncode, cmd_out.stdout stdout = '({}) submission failed (stdout): {}'.format(galaxy_id_tag, cmd_out.stdout) stderr = '({}) submission failed (stderr): {}'.format(galaxy_id_tag, cmd_out.stderr) if retry > 0: log.debug("%s, retrying in %s seconds", stdout, timeout) log.debug("%s, retrying in %s seconds", stderr, timeout) time.sleep(timeout) return self.submit(shell, job_interface, job_file, galaxy_id_tag, retry=retry - 1, timeout=timeout) else: log.error(stdout) log.error(stderr) return cmd_out.returncode, cmd_out.stdout
[docs] def check_watched_items(self): """ Called by the monitor thread to look at each watched job and deal with state changes. """ new_watched = [] job_states = self.__get_job_states() for ajs in self.watched: external_job_id = ajs.job_id id_tag = ajs.job_wrapper.get_id_tag() old_state = ajs.old_state state = job_states.get(external_job_id, None) if state is None: if ajs.job_wrapper.get_state() == model.Job.states.DELETED: continue log.debug("({}/{}) job not found in batch state check".format(id_tag, external_job_id)) shell_params, job_params = self.parse_destination_params(ajs.job_destination.params) shell, job_interface = self.get_cli_plugins(shell_params, job_params) cmd_out = shell.execute(job_interface.get_single_status(external_job_id)) state = job_interface.parse_single_status(cmd_out.stdout, external_job_id) if not state == model.Job.states.OK: log.warning('({}/{}) job not found in batch state check, but found in individual state check'.format(id_tag, external_job_id)) if state != old_state: log.debug("({}/{}) state change: from {} to {}".format(id_tag, external_job_id, old_state, state)) if not state == model.Job.states.OK: # No need to change_state when the state is OK, this will be handled by `self.finish_job` ajs.job_wrapper.change_state(state) if state == model.Job.states.ERROR: # Try to find out the reason for exiting self.__handle_out_of_memory(ajs, external_job_id) self.work_queue.put((self.mark_as_failed, ajs)) # Don't add the job to the watched items once it fails, deals with https://github.com/galaxyproject/galaxy/issues/7820 continue if state == model.Job.states.RUNNING and not ajs.running: ajs.running = True ajs.old_state = state if state == model.Job.states.OK: external_metadata = not asbool(ajs.job_wrapper.job_destination.params.get("embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB)) if external_metadata: self.work_queue.put((self.handle_metadata_externally, ajs)) log.debug('({}/{}) job execution finished, running job wrapper finish method'.format(id_tag, external_job_id)) self.work_queue.put((self.finish_job, ajs)) else: new_watched.append(ajs) # Replace the watch list with the updated version self.watched = new_watched
[docs] def handle_metadata_externally(self, ajs): self._handle_metadata_externally(ajs.job_wrapper, resolve_requirements=True)
def __handle_out_of_memory(self, ajs, external_job_id): shell_params, job_params = self.parse_destination_params(ajs.job_destination.params) shell, job_interface = self.get_cli_plugins(shell_params, job_params) cmd_out = shell.execute(job_interface.get_failure_reason(external_job_id)) if cmd_out is not None: if job_interface.parse_failure_reason(cmd_out.stdout, external_job_id) \ == JobState.runner_states.MEMORY_LIMIT_REACHED: ajs.runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED ajs.fail_message = "Tool failed due to insufficient memory. Try with more memory." def __get_job_states(self): job_destinations = {} job_states = {} # unique the list of destinations for ajs in self.watched: if ajs.job_destination.id not in job_destinations: job_destinations[ajs.job_destination.id] = dict(job_destination=ajs.job_destination, job_ids=[ajs.job_id]) else: job_destinations[ajs.job_destination.id]['job_ids'].append(ajs.job_id) # check each destination for the listed job ids for v in job_destinations.values(): job_destination = v['job_destination'] job_ids = v['job_ids'] shell_params, job_params = self.parse_destination_params(job_destination.params) shell, job_interface = self.get_cli_plugins(shell_params, job_params) cmd_out = shell.execute(job_interface.get_status(job_ids)) assert cmd_out.returncode == 0, cmd_out.stderr job_states.update(job_interface.parse_status(cmd_out.stdout, job_ids)) return job_states
[docs] def stop_job(self, job_wrapper): """Attempts to delete a dispatched job""" job = job_wrapper.get_job() try: shell_params, job_params = self.parse_destination_params(job.destination_params) shell, job_interface = self.get_cli_plugins(shell_params, job_params) cmd_out = shell.execute(job_interface.delete(job.job_runner_external_id)) assert cmd_out.returncode == 0, cmd_out.stderr log.debug("({}/{}) Terminated at user's request".format(job.id, job.job_runner_external_id)) except Exception as e: log.debug("({}/{}) User killed running job, but error encountered during termination: {}".format(job.id, job.job_runner_external_id, e))
[docs] def recover(self, job, job_wrapper): """Recovers jobs stuck in the queued/running state when Galaxy started""" job_id = job.get_job_runner_external_id() if job_id is None: self.put(job_wrapper) return ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper) ajs.job_id = str(job_id) ajs.command_line = job.command_line ajs.job_wrapper = job_wrapper ajs.job_destination = job_wrapper.job_destination if job.state == model.Job.states.RUNNING: log.debug("({}/{}) is still in running state, adding to the runner monitor queue".format(job.id, job.job_runner_external_id)) ajs.old_state = model.Job.states.RUNNING ajs.running = True self.monitor_queue.put(ajs) elif job.state == model.Job.states.QUEUED: log.debug("({}/{}) is still in queued state, adding to the runner monitor queue".format(job.id, job.job_runner_external_id)) ajs.old_state = model.Job.states.QUEUED ajs.running = False self.monitor_queue.put(ajs)