Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.drmaa

"""
Job control via the DRMAA API.
"""
from __future__ import absolute_import

import json
import logging
import os
import shlex
import string
import time

from galaxy import model
from galaxy.jobs import JobDestination
from galaxy.jobs.handler import DEFAULT_JOB_PUT_FAILURE_MESSAGE
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState
)
from galaxy.util import (
    asbool,
    commands,
    unicodify
)

drmaa = None

log = logging.getLogger(__name__)

__all__ = ('DRMAAJobRunner',)

RETRY_EXCEPTIONS_LOWER = frozenset({'invalidjobexception', 'internalexception'})


[docs]class DRMAAJobRunner(AsynchronousJobRunner): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "DRMAARunner" restrict_job_name_length = 15
[docs] def __init__(self, app, nworkers, **kwargs): """Start the job runner""" global drmaa runner_param_specs = { 'drmaa_library_path': dict(map=str, default=os.environ.get('DRMAA_LIBRARY_PATH', None))} for retry_exception in RETRY_EXCEPTIONS_LOWER: runner_param_specs[retry_exception + '_state'] = dict(map=str, valid=lambda x: x in (model.Job.states.OK, model.Job.states.ERROR), default=model.Job.states.OK) runner_param_specs[retry_exception + '_retries'] = dict(map=int, valid=lambda x: int(x) >= 0, default=0) if 'runner_param_specs' not in kwargs: kwargs['runner_param_specs'] = dict() kwargs['runner_param_specs'].update(runner_param_specs) super(DRMAAJobRunner, self).__init__(app, nworkers, **kwargs) # This allows multiple drmaa runners (although only one per handler) in the same job config file if 'drmaa_library_path' in kwargs: log.info('Overriding DRMAA_LIBRARY_PATH due to runner plugin parameter: %s', self.runner_params.drmaa_library_path) os.environ['DRMAA_LIBRARY_PATH'] = self.runner_params.drmaa_library_path # Import is delayed until runner initialization to allow for the # drmaa_library_path plugin param to override $DRMAA_LIBRARY_PATH try: drmaa = __import__("drmaa") except (ImportError, RuntimeError) as exc: raise exc.__class__('The Python drmaa package is required to use this ' 'feature, please install it or correct the ' 'following error:\n%s: %s' % (exc.__class__.__name__, str(exc))) from pulsar.managers.util.drmaa import DrmaaSessionFactory # make the drmaa library also available to subclasses self.drmaa = drmaa # Subclasses may need access to state constants self.drmaa_job_states = drmaa.JobState # Descriptive state strings pulled from the drmaa lib itself self.drmaa_job_state_strings = { drmaa.JobState.UNDETERMINED: 'process status cannot be determined', drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', drmaa.JobState.RUNNING: 'job is running', drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', drmaa.JobState.USER_SUSPENDED: 'job is user suspended', drmaa.JobState.DONE: 'job finished normally', drmaa.JobState.FAILED: 'job finished, but failed', } # Ensure a DRMAA session exists and is initialized self.ds = DrmaaSessionFactory().get() self.userid = None self._init_monitor_thread() self._init_worker_threads() self.redact_email_in_job_name = self.app.config.redact_email_in_job_name
[docs] def url_to_destination(self, url): """Convert a legacy URL to a job destination""" if not url: return native_spec = url.split('/')[2] if native_spec: params = dict(nativeSpecification=native_spec) log.debug("Converted URL '%s' to destination runner=drmaa, params=%s" % (url, params)) return JobDestination(runner='drmaa', params=params) else: log.debug("Converted URL '%s' to destination runner=drmaa" % url) return JobDestination(runner='drmaa')
[docs] def get_native_spec(self, url): """Get any native DRM arguments specified by the site configuration""" try: return url.split('/')[2] or None except Exception: return None
[docs] def queue_job(self, job_wrapper): """Create job script and submit it to the DRM""" # prepare the job # external_runJob_script can be None, in which case it's not used. external_runjob_script = job_wrapper.get_destination_configuration("drmaa_external_runjob_script", None) include_metadata = asbool(job_wrapper.job_destination.params.get("embed_metadata_in_job", True)) if not self.prepare_job(job_wrapper, include_metadata=include_metadata): return # get configured job destination job_destination = job_wrapper.job_destination # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. galaxy_id_tag = job_wrapper.get_id_tag() job_name = self._job_name(job_wrapper) ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_name=job_name) # set up the drmaa job template jt = dict( remoteCommand=ajs.job_file, jobName=ajs.job_name, workingDirectory=job_wrapper.working_directory, outputPath=":%s" % ajs.output_file, errorPath=":%s" % ajs.error_file ) # Avoid a jt.exitCodePath for now - it's only used when finishing. native_spec = job_destination.params.get('nativeSpecification', None) if native_spec is None: native_spec = job_destination.params.get('native_specification', None) if native_spec is not None: jt['nativeSpecification'] = native_spec # fill in the DRM's job run template script = self.get_job_file(job_wrapper, exit_code_path=ajs.exit_code_file, shell=job_wrapper.shell) try: self.write_executable_script(ajs.job_file, script) except Exception: job_wrapper.fail("failure preparing job script", exception=True) log.exception("(%s) failure writing job script" % galaxy_id_tag) return # job was deleted while we were preparing it if job_wrapper.get_state() == model.Job.states.DELETED: log.debug("(%s) Job deleted by user before it entered the queue" % galaxy_id_tag) if job_wrapper.cleanup_job in ("always", "onsuccess"): job_wrapper.cleanup() return log.debug("(%s) submitting file %s", galaxy_id_tag, ajs.job_file) if native_spec: log.debug("(%s) native specification is: %s", galaxy_id_tag, native_spec) # runJob will raise if there's a submit problem if external_runjob_script is None: # TODO: create a queue for retrying submission indefinitely # TODO: configurable max tries and sleep trynum = 0 external_job_id = None fail_msg = None while external_job_id is None and trynum < 5: try: external_job_id = self.ds.run_job(**jt) break except (drmaa.InternalException, drmaa.DeniedByDrmException) as e: trynum += 1 log.warning('(%s) drmaa.Session.runJob() failed, will retry: %s', galaxy_id_tag, e) fail_msg = "Unable to run this job due to a cluster error, please retry it later" time.sleep(5) except Exception: log.exception('(%s) drmaa.Session.runJob() failed unconditionally', galaxy_id_tag) trynum = 5 else: log.error("(%s) All attempts to submit job failed" % galaxy_id_tag) if not fail_msg: fail_msg = DEFAULT_JOB_PUT_FAILURE_MESSAGE job_wrapper.fail(fail_msg) return else: job_wrapper.change_ownership_for_run() # if user credentials are not available, use galaxy credentials (if permitted) allow_guests = asbool(job_wrapper.job_destination.params.get("allow_guests", False)) pwent = job_wrapper.user_system_pwent if pwent is None: if not allow_guests: fail_msg = "User %s is not mapped to any real user, and not permitted to start jobs." % job_wrapper.user job_wrapper.fail(fail_msg) return pwent = job_wrapper.galaxy_system_pwent log.debug('(%s) submitting with credentials: %s [uid: %s]' % (galaxy_id_tag, pwent[0], pwent[2])) filename = self.store_jobtemplate(job_wrapper, jt) self.userid = pwent[2] external_job_id = self.external_runjob(external_runjob_script, filename, pwent[2]) if external_job_id is None: job_wrapper.fail("(%s) could not queue job" % galaxy_id_tag) return log.info("(%s) queued as %s" % (galaxy_id_tag, external_job_id)) # store runner information for tracking if Galaxy restarts job_wrapper.set_external_id(external_job_id) # Store DRM related state information for job ajs.job_id = external_job_id ajs.old_state = 'new' ajs.job_destination = job_destination # Add to our 'queue' of jobs to monitor self.monitor_queue.put(ajs)
def _complete_terminal_job(self, ajs, drmaa_state, **kwargs): """ Handle a job upon its termination in the DRM. This method is meant to be overridden by subclasses to improve post-mortem and reporting of failures. Returns True if job was not actually terminal, None otherwise. (Note: This function always returns None. Hence this function actually does not determine if a job was terminal, but the implementation in the subclasses is supposed to do this.) """ if drmaa_state == drmaa.JobState.FAILED: if ajs.job_wrapper.get_state() != model.Job.states.DELETED: ajs.stop_job = False ajs.fail_message = "The cluster DRM system terminated this job" self.work_queue.put((self.fail_job, ajs)) elif drmaa_state == drmaa.JobState.DONE: # External metadata processing for external runjobs external_metadata = not asbool(ajs.job_wrapper.job_destination.params.get("embed_metadata_in_job", True)) if external_metadata: self._handle_metadata_externally(ajs.job_wrapper, resolve_requirements=True) if ajs.job_wrapper.get_state() != model.Job.states.DELETED: self.work_queue.put((self.finish_job, ajs))
[docs] def check_watched_item(self, ajs, new_watched): """ look at a single watched job, determine its state, and deal with errors that could happen in this process. to be called from check_watched_items() returns the state or None if exceptions occurred in the latter case the job is appended to new_watched if a 1 drmaa.InternalException, 2 drmaa.InvalidJobExceptionnot, or 3 drmaa.DrmCommunicationException occurred (which causes the job to be tested again in the next iteration of check_watched_items) - the job is finished as errored if any other exception occurs - the job is finished OK or errored after the maximum number of retries depending on the exception Note that None is returned in all cases where the loop in check_watched_items is to be continued """ external_job_id = ajs.job_id galaxy_id_tag = ajs.job_wrapper.get_id_tag() state = None try: assert external_job_id not in (None, 'None'), '(%s/%s) Invalid job id' % (galaxy_id_tag, external_job_id) state = self.ds.job_status(external_job_id) # Reset exception retries for retry_exception in RETRY_EXCEPTIONS_LOWER: setattr(ajs, retry_exception + '_retries', 0) except (drmaa.InternalException, drmaa.InvalidJobException) as e: ecn = type(e).__name__ retry_param = ecn.lower() + '_retries' state_param = ecn.lower() + '_state' retries = getattr(ajs, retry_param, 0) log.warning("(%s/%s) unable to check job status because of %s exception for %d consecutive tries: %s", galaxy_id_tag, external_job_id, ecn, retries + 1, e) if self.runner_params[retry_param] > 0: if retries < self.runner_params[retry_param]: # will retry check on next iteration setattr(ajs, retry_param, retries + 1) new_watched.append(ajs) return None if self.runner_params[state_param] == model.Job.states.OK: log.warning("(%s/%s) job will now be finished OK", galaxy_id_tag, external_job_id) self.work_queue.put((self.finish_job, ajs)) elif self.runner_params[state_param] == model.Job.states.ERROR: log.warning("(%s/%s) job will now be errored", galaxy_id_tag, external_job_id) self.work_queue.put((self.fail_job, ajs)) else: raise Exception("%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[state_param]) return None except drmaa.DrmCommunicationException as e: log.warning("(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e) new_watched.append(ajs) return None except Exception: # so we don't kill the monitor thread log.exception("(%s/%s) unable to check job status" % (galaxy_id_tag, external_job_id)) log.warning("(%s/%s) job will now be errored" % (galaxy_id_tag, external_job_id)) ajs.fail_message = "Cluster could not complete job" self.work_queue.put((self.fail_job, ajs)) return None return state
[docs] def check_watched_items(self): """ Called by the monitor thread to look at each watched job and deal with state changes. """ new_watched = [] for ajs in self.watched: external_job_id = ajs.job_id galaxy_id_tag = ajs.job_wrapper.get_id_tag() old_state = ajs.old_state state = self.check_watched_item(ajs, new_watched) if state is None: continue if state != old_state: log.debug("(%s/%s) state change: %s" % (galaxy_id_tag, external_job_id, self.drmaa_job_state_strings[state])) if state == drmaa.JobState.RUNNING and not ajs.running: ajs.running = True ajs.job_wrapper.change_state(model.Job.states.RUNNING) if state in (drmaa.JobState.FAILED, drmaa.JobState.DONE): if self._complete_terminal_job(ajs, drmaa_state=state) is not None: # job was not actually terminal state = ajs.old_state else: continue if ajs.running: # TODO: stop checking at some point ajs.job_wrapper.check_for_entry_points() if ajs.check_limits(): self.work_queue.put((self.fail_job, ajs)) continue ajs.old_state = state new_watched.append(ajs) # Replace the watch list with the updated version self.watched = new_watched
[docs] def stop_job(self, job_wrapper): """Attempts to delete a job from the DRM queue""" job = job_wrapper.get_job() try: ext_id = job.get_job_runner_external_id() assert ext_id not in (None, 'None'), 'External job id is None' kill_script = job_wrapper.get_destination_configuration("drmaa_external_killjob_script") if kill_script is None: self.ds.kill(ext_id) else: cmd = shlex.split(kill_script) cmd.extend([str(ext_id), str(self.userid)]) commands.execute(cmd) log.info("(%s/%s) Removed from DRM queue at user's request" % (job.id, ext_id)) except drmaa.InvalidJobException: log.exception("(%s/%s) User killed running job, but it was already dead" % (job.id, ext_id)) except commands.CommandLineException as e: log.error("(%s/%s) User killed running job, but command execution failed: %s" % (job.id, ext_id, unicodify(e))) except Exception: log.exception("(%s/%s) User killed running job, but error encountered removing from DRM queue" % (job.id, ext_id))
[docs] def recover(self, job, job_wrapper): """Recovers jobs stuck in the queued/running state when Galaxy started""" job_id = job.get_job_runner_external_id() if job_id is None: self.put(job_wrapper) return ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper) ajs.job_id = str(job_id) ajs.command_line = job.get_command_line() ajs.job_wrapper = job_wrapper ajs.job_destination = job_wrapper.job_destination if job.state == model.Job.states.RUNNING: log.debug("(%s/%s) is still in running state, adding to the DRM queue" % (job.id, job.get_job_runner_external_id())) ajs.old_state = drmaa.JobState.RUNNING ajs.running = True self.monitor_queue.put(ajs) elif job.get_state() == model.Job.states.QUEUED: log.debug("(%s/%s) is still in DRM queued state, adding to the DRM queue" % (job.id, job.get_job_runner_external_id())) ajs.old_state = drmaa.JobState.QUEUED_ACTIVE ajs.running = False self.monitor_queue.put(ajs)
[docs] def store_jobtemplate(self, job_wrapper, jt): """ Stores the content of a DRMAA JobTemplate object in a file as a JSON string. Path is hard-coded, but it's no worse than other path in this module. Uses Galaxy's JobID, so file is expected to be unique.""" filename = "%s/%s.jt_json" % (self.app.config.cluster_files_directory, job_wrapper.get_id_tag()) with open(filename, 'w+') as fp: json.dump(jt, fp) log.debug('(%s) Job script for external submission is: %s' % (job_wrapper.job_id, filename)) return filename
[docs] def external_runjob(self, external_runjob_script, jobtemplate_filename, username): """ runs an external script that will QSUB a new job. The external script needs to be run with sudo, and will setuid() to the specified user. Effectively, will QSUB as a different user (than the one used by Galaxy). """ cmd = shlex.split(external_runjob_script) cmd.extend([str(username), jobtemplate_filename]) log.info("Running command %s" % cmd) try: stdoutdata = commands.execute(cmd).strip() except commands.CommandLineException: log.exception("External_runjob failed") return None # The expected output is a single line containing a single numeric value: # the DRMAA job-ID. If not the case, will throw an error. if not stdoutdata: log.exception("External_runjob did not returned nothing instead of the job id") return None return stdoutdata
def _job_name(self, job_wrapper): external_runjob_script = job_wrapper.get_destination_configuration("drmaa_external_runjob_script", None) galaxy_id_tag = job_wrapper.get_id_tag() # define job attributes job_name = 'g%s' % galaxy_id_tag if job_wrapper.tool.old_id: job_name += '_%s' % job_wrapper.tool.old_id if not self.redact_email_in_job_name and external_runjob_script is None: job_name += '_%s' % job_wrapper.user job_name = ''.join(x if x in (string.ascii_letters + string.digits + '_') else '_' for x in job_name) if self.restrict_job_name_length: job_name = job_name[:self.restrict_job_name_length] return job_name