Source code for galaxy.jobs.runners.drmaa

"""
Job control via the DRMAA API.
"""

import json
import logging
import os
import shlex
import string
import time
from typing import (
    TYPE_CHECKING,
    TypeAlias,
    Union,
)

from galaxy import model
from galaxy.jobs.handler import DEFAULT_JOB_RUNNER_FAILURE_MESSAGE
from galaxy.jobs.job_destination import JobDestination
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState,
)
from galaxy.util import (
    asbool,
    commands,
    unicodify,
)

if TYPE_CHECKING:
    from galaxy.jobs import MinimalJobWrapper

    # Type alias for drmaa.JobState, since drmaa import is delayed
    drmaa_JobState: TypeAlias = str

drmaa = None

log = logging.getLogger(__name__)

__all__ = ("DRMAAJobRunner",)

RETRY_EXCEPTIONS_LOWER = frozenset({"invalidjobexception", "internalexception"})


class DRMAAJobState(AsynchronousJobState):
    old_state: Union["drmaa_JobState", None]  # type: ignore[assignment]


[docs] class DRMAAJobRunner(AsynchronousJobRunner[DRMAAJobState]): """ Job runner backed by a finite pool of worker threads. FIFO scheduling """ runner_name = "DRMAARunner" restrict_job_name_length = 15
[docs] def __init__(self, app, nworkers, **kwargs): """Start the job runner""" global drmaa runner_param_specs = {"drmaa_library_path": dict(map=str, default=os.environ.get("DRMAA_LIBRARY_PATH", None))} for retry_exception in RETRY_EXCEPTIONS_LOWER: runner_param_specs[f"{retry_exception}_state"] = dict( map=str, valid=lambda x: x in (model.Job.states.OK, model.Job.states.ERROR), default=model.Job.states.OK ) runner_param_specs[f"{retry_exception}_retries"] = dict(map=int, valid=lambda x: int(x) >= 0, default=0) if "runner_param_specs" not in kwargs: kwargs["runner_param_specs"] = {} kwargs["runner_param_specs"].update(runner_param_specs) super().__init__(app, nworkers, **kwargs) # This allows multiple drmaa runners (although only one per handler) in the same job config file if "drmaa_library_path" in kwargs: log.info( "Overriding DRMAA_LIBRARY_PATH due to runner plugin parameter: %s", self.runner_params.drmaa_library_path, ) os.environ["DRMAA_LIBRARY_PATH"] = self.runner_params.drmaa_library_path # Import is delayed until runner initialization to allow for the # drmaa_library_path plugin param to override $DRMAA_LIBRARY_PATH try: drmaa = __import__("drmaa") except (ImportError, RuntimeError) as exc: raise exc.__class__( f"The Python drmaa package is required to use this feature, please install it or correct the following error:\n{exc.__class__.__name__}: {str(exc)}" ) from pulsar.managers.util.drmaa import DrmaaSessionFactory # make the drmaa library also available to subclasses self.drmaa = drmaa # Subclasses may need access to state constants self.drmaa_job_states = drmaa.JobState # Descriptive state strings pulled from the drmaa lib itself self.drmaa_job_state_strings = { drmaa.JobState.UNDETERMINED: "process status cannot be determined", drmaa.JobState.QUEUED_ACTIVE: "job is queued and active", drmaa.JobState.SYSTEM_ON_HOLD: "job is queued and in system hold", drmaa.JobState.USER_ON_HOLD: "job is queued and in user hold", drmaa.JobState.USER_SYSTEM_ON_HOLD: "job is queued and in user and system hold", drmaa.JobState.RUNNING: "job is running", drmaa.JobState.SYSTEM_SUSPENDED: "job is system suspended", drmaa.JobState.USER_SUSPENDED: "job is user suspended", drmaa.JobState.DONE: "job finished normally", drmaa.JobState.FAILED: "job finished, but failed", } # Ensure a DRMAA session exists and is initialized self.ds = DrmaaSessionFactory().get() self.userid = None self.redact_email_in_job_name = self.app.config.redact_email_in_job_name
[docs] def url_to_destination(self, url: str) -> JobDestination: """Convert a legacy URL to a job destination""" if native_spec := url.split("/")[2]: params = dict(nativeSpecification=native_spec) log.debug(f"Converted URL '{url}' to destination runner=drmaa, params={params}") return JobDestination(runner="drmaa", params=params) else: log.debug(f"Converted URL '{url}' to destination runner=drmaa") return JobDestination(runner="drmaa")
[docs] def get_native_spec(self, url): """Get any native DRM arguments specified by the site configuration""" try: return url.split("/")[2] or None except Exception: return None
[docs] def queue_job(self, job_wrapper: "MinimalJobWrapper") -> None: """Create job script and submit it to the DRM""" assert drmaa is not None # prepare the job # external_runJob_script can be None, in which case it's not used. external_runjob_script = job_wrapper.get_destination_configuration("drmaa_external_runjob_script", None) include_metadata = asbool(job_wrapper.job_destination.params.get("embed_metadata_in_job", True)) if not self.prepare_job(job_wrapper, include_metadata=include_metadata): return # get configured job destination job_destination = job_wrapper.job_destination # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. galaxy_id_tag = job_wrapper.get_id_tag() job_name = self._job_name(job_wrapper) ajs = DRMAAJobState( job_wrapper=job_wrapper, job_destination=job_destination, files_dir=job_wrapper.working_directory, job_name=job_name, ) # set up the drmaa job template jt = dict( remoteCommand=ajs.job_file, jobName=ajs.job_name, workingDirectory=job_wrapper.working_directory, outputPath=f":{ajs.output_file}", errorPath=f":{ajs.error_file}", ) # Avoid a jt.exitCodePath for now - it's only used when finishing. native_spec = job_destination.params.get("nativeSpecification", None) if native_spec is None: native_spec = job_destination.params.get("native_specification", None) if native_spec is not None: jt["nativeSpecification"] = native_spec # fill in the DRM's job run template script = self.get_job_file(job_wrapper, exit_code_path=ajs.exit_code_file, shell=job_wrapper.shell) try: self.write_executable_script(ajs.job_file, script, job_io=job_wrapper.job_io) except Exception: job_wrapper.fail("failure preparing job script", exception=True) log.exception(f"({galaxy_id_tag}) failure writing job script") return # job was deleted while we were preparing it if job_wrapper.get_state() in (model.Job.states.DELETED, model.Job.states.STOPPED): log.debug("(%s) Job deleted/stopped by user before it entered the queue", galaxy_id_tag) if job_wrapper.cleanup_job in ("always", "onsuccess"): job_wrapper.cleanup() return log.debug("(%s) submitting file %s", galaxy_id_tag, ajs.job_file) if native_spec: log.debug("(%s) native specification is: %s", galaxy_id_tag, native_spec) # runJob will raise if there's a submit problem if external_runjob_script is None: # TODO: create a queue for retrying submission indefinitely # TODO: configurable max tries and sleep trynum = 0 external_job_id = None fail_msg = None while external_job_id is None and trynum < 5: try: external_job_id = self.ds.run_job(**jt) break except (drmaa.InternalException, drmaa.DeniedByDrmException) as e: trynum += 1 log.warning("(%s) drmaa.Session.runJob() failed, will retry: %s", galaxy_id_tag, e) fail_msg = "Unable to run this job due to a cluster error, please retry it later" time.sleep(5) except Exception: log.exception("(%s) drmaa.Session.runJob() failed unconditionally", galaxy_id_tag) trynum = 5 else: log.error(f"({galaxy_id_tag}) All attempts to submit job failed") if not fail_msg: fail_msg = DEFAULT_JOB_RUNNER_FAILURE_MESSAGE job_wrapper.fail(fail_msg) return else: filename = self.store_jobtemplate(job_wrapper, jt) job_wrapper.change_ownership_for_run() # if user credentials are not available, use galaxy credentials (if permitted) allow_guests = asbool(job_wrapper.job_destination.params.get("allow_guests", False)) pwent = job_wrapper.user_system_pwent if pwent is None: if not allow_guests: fail_msg = ( f"User {job_wrapper.user} is not mapped to any real user, and not permitted to start jobs." ) job_wrapper.fail(fail_msg) return pwent = job_wrapper.galaxy_system_pwent log.debug(f"({galaxy_id_tag}) submitting with credentials: {pwent[0]} [uid: {pwent[2]}]") self.userid = pwent[2] external_job_id = self.external_runjob(external_runjob_script, filename, pwent[2]) if external_job_id is None: job_wrapper.fail(f"({galaxy_id_tag}) could not queue job") return log.info(f"({galaxy_id_tag}) queued as {external_job_id}") # store runner information for tracking if Galaxy restarts job_wrapper.set_external_id(external_job_id) # Store DRM related state information for job ajs.job_id = external_job_id ajs.old_state = "new" # Add to our 'queue' of jobs to monitor self.monitor_queue.put(ajs)
def _complete_terminal_job(self, ajs: DRMAAJobState, drmaa_state: str, **kwargs) -> Union[bool, None]: """ Handle a job upon its termination in the DRM. This method is meant to be overridden by subclasses to improve post-mortem and reporting of failures. Returns True if job was not actually terminal, None otherwise. (Note: This function always returns None. Hence this function actually does not determine if a job was terminal, but the implementation in the subclasses is supposed to do this.) """ assert drmaa is not None job_state = ajs.job_wrapper.get_state() if drmaa_state == drmaa.JobState.FAILED and job_state != model.Job.states.STOPPED: if job_state != model.Job.states.DELETED: ajs.stop_job = False ajs.fail_message = "The cluster DRM system terminated this job" self.work_queue.put((self.fail_job, ajs)) elif drmaa_state == drmaa.JobState.DONE or job_state == model.Job.states.STOPPED: # External metadata processing for external runjobs external_metadata = not asbool(ajs.job_wrapper.job_destination.params.get("embed_metadata_in_job", True)) if external_metadata: self._handle_metadata_externally(ajs.job_wrapper, resolve_requirements=True) if job_state != model.Job.states.DELETED: self.work_queue.put((self.finish_job, ajs)) return None
[docs] def check_watched_item_drmaa(self, ajs: DRMAAJobState, new_watched: list[DRMAAJobState]) -> Union[str, None]: """ look at a single watched job, determine its state, and deal with errors that could happen in this process. to be called from check_watched_items() returns the state or None if exceptions occurred in the latter case the job is appended to new_watched if a 1 drmaa.InternalException, 2 drmaa.InvalidJobExceptionnot, or 3 drmaa.DrmCommunicationException occurred (which causes the job to be tested again in the next iteration of check_watched_items) - the job is finished as errored if any other exception occurs - the job is finished OK or errored after the maximum number of retries depending on the exception Note that None is returned in all cases where the loop in check_watched_items is to be continued """ assert drmaa is not None external_job_id = ajs.job_id galaxy_id_tag = ajs.job_wrapper.get_id_tag() state = None try: assert external_job_id not in (None, "None"), f"({galaxy_id_tag}/{external_job_id}) Invalid job id" state = self.ds.job_status(external_job_id) # Reset exception retries for retry_exception in RETRY_EXCEPTIONS_LOWER: setattr(ajs, f"{retry_exception}_retries", 0) except (drmaa.InternalException, drmaa.InvalidJobException) as e: ecn = type(e).__name__ retry_param = f"{ecn.lower()}_retries" state_param = f"{ecn.lower()}_state" retries = getattr(ajs, retry_param, 0) log.warning( "(%s/%s) unable to check job status because of %s exception for %d consecutive tries: %s", galaxy_id_tag, external_job_id, ecn, retries + 1, e, ) if self.runner_params[retry_param] > 0: if retries < self.runner_params[retry_param]: # will retry check on next iteration setattr(ajs, retry_param, retries + 1) new_watched.append(ajs) return None if self.runner_params[state_param] == model.Job.states.OK: log.warning("(%s/%s) job will now be finished OK", galaxy_id_tag, external_job_id) self.work_queue.put((self.finish_job, ajs)) elif self.runner_params[state_param] == model.Job.states.ERROR: log.warning("(%s/%s) job will now be errored", galaxy_id_tag, external_job_id) self.work_queue.put((self.fail_job, ajs)) else: raise Exception( "%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[state_param], ) return None except drmaa.DrmCommunicationException as e: log.warning("(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e) new_watched.append(ajs) return None except Exception: # so we don't kill the monitor thread log.exception(f"({galaxy_id_tag}/{external_job_id}) unable to check job status") log.warning(f"({galaxy_id_tag}/{external_job_id}) job will now be errored") ajs.fail_message = "Cluster could not complete job" self.work_queue.put((self.fail_job, ajs)) return None return state
[docs] def check_watched_items(self) -> None: """ Called by the monitor thread to look at each watched job and deal with state changes. """ assert drmaa is not None new_watched: list[DRMAAJobState] = [] for ajs in self.watched: external_job_id = ajs.job_id galaxy_id_tag = ajs.job_wrapper.get_id_tag() old_state = ajs.old_state state = self.check_watched_item_drmaa(ajs, new_watched) if state is None: continue if state != old_state: log.debug(f"({galaxy_id_tag}/{external_job_id}) state change: {self.drmaa_job_state_strings[state]}") if state == drmaa.JobState.RUNNING and not ajs.running: ajs.running = True ajs.job_wrapper.change_state(model.Job.states.RUNNING) if state in (drmaa.JobState.FAILED, drmaa.JobState.DONE): if self._complete_terminal_job(ajs, drmaa_state=state) is not None: # job was not actually terminal state = ajs.old_state else: continue if ajs.running: # TODO: stop checking at some point ajs.job_wrapper.check_for_entry_points() if ajs.check_limits(): self.work_queue.put((self.fail_job, ajs)) continue ajs.old_state = state new_watched.append(ajs) # Replace the watch list with the updated version self.watched = new_watched
[docs] def stop_job(self, job_wrapper): """Attempts to delete a job from the DRM queue""" job = job_wrapper.get_job() try: ext_id = job.get_job_runner_external_id() assert ext_id not in (None, "None"), "External job id is None" kill_script = job_wrapper.get_destination_configuration("drmaa_external_killjob_script") if kill_script is None: self.ds.kill(ext_id) else: cmd = shlex.split(kill_script) cmd.extend([str(ext_id), str(self.userid)]) commands.execute(cmd) log.info(f"({job.id}/{ext_id}) Removed from DRM queue at user's request") except drmaa.InvalidJobException: log.warning(f"({job.id}/{ext_id}) User killed running job, but it was already dead") except drmaa.InternalException as e: if "already completing or completed" in str(e): log.warning(f"({job.id}/{ext_id}) User killed running job, but job already terminal in DRM queue") else: log.exception( f"({job.id}/{ext_id}) User killed running job, but error encountered removing from DRM queue" ) except commands.CommandLineException as e: log.error(f"({job.id}/{ext_id}) User killed running job, but command execution failed: {unicodify(e)}") except Exception: log.exception(f"({job.id}/{ext_id}) User killed running job, but error encountered removing from DRM queue")
[docs] def recover(self, job: model.Job, job_wrapper: "MinimalJobWrapper") -> None: """Recovers jobs stuck in the queued/running state when Galaxy started""" assert drmaa is not None job_id = job.get_job_runner_external_id() if job_id is None: self.put(job_wrapper) return ajs = DRMAAJobState( job_wrapper=job_wrapper, job_destination=job_wrapper.job_destination, files_dir=job_wrapper.working_directory, job_id=job_id, ) if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED): log.debug( f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the DRM queue" ) ajs.old_state = drmaa.JobState.RUNNING ajs.running = True self.monitor_queue.put(ajs) elif job.get_state() == model.Job.states.QUEUED: log.debug( f"({job.id}/{job.get_job_runner_external_id()}) is still in DRM queued state, adding to the DRM queue" ) ajs.old_state = drmaa.JobState.QUEUED_ACTIVE ajs.running = False self.monitor_queue.put(ajs)
[docs] def store_jobtemplate(self, job_wrapper, jt): """Stores the content of a DRMAA JobTemplate object in a file as a JSON string.""" filename = os.path.join(job_wrapper.working_directory, f"{job_wrapper.get_id_tag()}.jt_json") with open(filename, "w") as fp: json.dump(jt, fp) log.debug(f"({job_wrapper.job_id}) Job script for external submission is: {filename}") return filename
[docs] def external_runjob(self, external_runjob_script, jobtemplate_filename, username): """runs an external script that will QSUB a new job. The external script needs to be run with sudo, and will setuid() to the specified user. Effectively, will QSUB as a different user (than the one used by Galaxy). """ cmd = shlex.split(external_runjob_script) cmd.extend([str(username), jobtemplate_filename]) log.info(f"Running command: {' '.join(cmd)}") try: stdoutdata = commands.execute(cmd).strip() except commands.CommandLineException: log.exception("External_runjob failed") return None # The expected output is a single line containing a single numeric value: # the DRMAA job-ID. If not the case, will throw an error. if not stdoutdata: log.exception("External_runjob did not returned nothing instead of the job id") return None return stdoutdata
def _job_name(self, job_wrapper): external_runjob_script = job_wrapper.get_destination_configuration("drmaa_external_runjob_script", None) galaxy_id_tag = job_wrapper.get_id_tag() # define job attributes job_name = f"g{galaxy_id_tag}" if job_wrapper.tool.old_id: job_name += f"_{job_wrapper.tool.old_id}" if not self.redact_email_in_job_name and external_runjob_script is None: job_name += f"_{job_wrapper.user}" job_name = "".join(x if x in (f"{string.ascii_letters + string.digits}_") else "_" for x in job_name) if self.restrict_job_name_length: job_name = job_name[: self.restrict_job_name_length] return job_name