Source code for galaxy.jobs.runners.util.cli.job.slurm

# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger

from . import (
    BaseJobExec,
    job_states,
)
from ... import runner_states

log = getLogger(__name__)

argmap = {"time": "-t", "ncpus": "-c", "partition": "-p"}


[docs] class Slurm(BaseJobExec): slurm_longjobstate_to_shortjobstate = { "BOOT_FAIL": "BF", "CANCELLED": "CA", "COMPLETED": "CD", "DEADLINE": "DL", "FAILED": "F", "NODE_FAIL": "NF", "OUT_OF_MEMORY": "OOM", "PENDING": "PD", "PREEMPTED": "PR", "RUNNING": "R", "REQUEUED": "RQ", "RESIZING": "RS", "REVOKED": "RV", "SUSPENDED": "S", "TIMEOUT": "TO", "UNKNOWN": "UN", # Custom for code in case one isn't available here } slurmstate_runnerstate_map = { "OOM": runner_states.MEMORY_LIMIT_REACHED, "TO": runner_states.WALLTIME_REACHED, "UN": runner_states.UNKNOWN_ERROR, }
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self.sacct_available = True
[docs] def job_script_kwargs(self, ofile, efile, job_name): scriptargs = {"-o": ofile, "-e": efile, "-J": job_name} # Map arguments using argmap. for k, v in self.params.items(): if k == "plugin": continue try: if not k.startswith("-"): k = argmap[k] scriptargs[k] = v except Exception: log.warning(f"Unrecognized long argument passed to Slurm CLI plugin: {k}") # Generated template. template_scriptargs = "" for k, v in scriptargs.items(): template_scriptargs += f"#SBATCH {k} {v}\n" return dict(headers=template_scriptargs)
[docs] def submit(self, script_file): return f"sbatch {script_file}"
[docs] def delete(self, job_id): return f"scancel {job_id}"
[docs] def get_status(self, job_ids=None): return "squeue -a -o '%A %t'"
[docs] def get_single_status(self, job_id): return f"squeue -a -o '%A %t' -j {job_id}"
[docs] def parse_status(self, status, job_ids): # Get status for each job, skipping header. rval = {} for line in status.splitlines()[1:]: id, state = line.split() if id in job_ids: # map job states to Galaxy job states. rval[id] = self._get_job_state(state) return rval
[docs] def parse_single_status(self, status, job_id, shell): status = status.splitlines() if len(status) > 1: # Job still on cluster and has state. id, state = status[1].split() return self._get_job_state(state) elif self.sacct_available and len(status) <= 1: log.debug(f"For job '{job_id}', relying on 'sacct' method to determine job state") # Job no longer on cluster, retrieve state pdata = shell.execute(f"sacct -o JobIDRaw,State -P -n -j {job_id}") if "Slurm accounting storage is disabled" in pdata.stderr: log.warning(f"({job_id}) Slurm accounting storage is disabled") self.sacct_available = False # Technically we don't know what the state is, but chances are the job just completed very quickly. return job_states.OK job_data = pdata.stdout.splitlines() if len(job_data) == 0: log.debug(f"Job '{job_id}' cannot be found. Returning error for job.") return self._get_job_state("F") state = "CD" for jobline in job_data: # Ignore the '.batch' and '.extern' in the output if ".batch" in jobline or ".extern" in jobline: continue splitjobdata = jobline.split("|") if len(splitjobdata) >= 2: (s_jobid, s_jobstate) = splitjobdata if " " in s_jobstate: s_jobstate, s_jobotherinfo = s_jobstate.split(" ", 1) state = self.slurm_longjobstate_to_shortjobstate.get(s_jobstate, "UN") # else line like "slurm_load_jobs error: Invalid job id specified" return job_states.OK
def _get_job_state(self, state: str) -> str: try: return { "BF": job_states.ERROR, "CA": job_states.ERROR, "CD": job_states.OK, "CG": job_states.RUNNING, "DL": job_states.ERROR, "F": job_states.ERROR, "NF": job_states.ERROR, "OOM": job_states.ERROR, "PD": job_states.QUEUED, "PR": job_states.RUNNING, "R": job_states.RUNNING, "RQ": job_states.RUNNING, "RS": job_states.RUNNING, "RV": job_states.ERROR, "S": job_states.RUNNING, "TO": job_states.ERROR, "UN": job_states.ERROR, # Custom code for unknown }[state] except KeyError: raise KeyError(f"Failed to map slurm status code [{state}] to job state.")
[docs] def get_failure_reason(self, job_id): return f"sacct -o JobIDRaw,State -P -n -j {job_id}"
[docs] def parse_failure_reason(self, reason, job_id): state = "CD" for line in reason.splitlines(): if ".batch" in line or ".extern" in line: continue splitjobdata = line.split("|") log.debug(f"State split line: {len(splitjobdata)}") if len(splitjobdata) >= 2: (s_jobid, s_jobstate) = splitjobdata if " " in s_jobstate: s_jobstate, s_jobotherinfo = s_jobstate.split(" ", 1) log.debug(f"Found space in jobstate, split into: {s_jobstate} - {s_jobotherinfo}") log.debug(f"State job data: {s_jobid} - {s_jobstate}") state = self.slurm_longjobstate_to_shortjobstate.get(s_jobstate, "UN") if state in self.slurmstate_runnerstate_map: return self.slurmstate_runnerstate_map.get(state, runner_states.UNKNOWN_ERROR) return None
__all__ = ("Slurm",)