# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger
from . import (
BaseJobExec,
job_states,
)
log = getLogger(__name__)
argmap = {"time": "-t", "ncpus": "-c", "partition": "-p"}
[docs]class Slurm(BaseJobExec):
[docs] def job_script_kwargs(self, ofile, efile, job_name):
scriptargs = {"-o": ofile, "-e": efile, "-J": job_name}
# Map arguments using argmap.
for k, v in self.params.items():
if k == "plugin":
continue
try:
if not k.startswith("-"):
k = argmap[k]
scriptargs[k] = v
except Exception:
log.warning(f"Unrecognized long argument passed to Slurm CLI plugin: {k}")
# Generated template.
template_scriptargs = ""
for k, v in scriptargs.items():
template_scriptargs += f"#SBATCH {k} {v}\n"
return dict(headers=template_scriptargs)
[docs] def submit(self, script_file):
return f"sbatch {script_file}"
[docs] def delete(self, job_id):
return f"scancel {job_id}"
[docs] def get_status(self, job_ids=None):
return "squeue -a -o '%A %t'"
[docs] def get_single_status(self, job_id):
return f"squeue -a -o '%A %t' -j {job_id}"
[docs] def parse_status(self, status, job_ids):
# Get status for each job, skipping header.
rval = {}
for line in status.splitlines()[1:]:
id, state = line.split()
if id in job_ids:
# map job states to Galaxy job states.
rval[id] = self._get_job_state(state)
return rval
[docs] def parse_single_status(self, status, job_id):
status = status.splitlines()
if len(status) > 1:
# Job still on cluster and has state.
id, state = status[1].split()
return self._get_job_state(state)
# else line like "slurm_load_jobs error: Invalid job id specified"
return job_states.OK
def _get_job_state(self, state: str) -> str:
try:
return {
"F": job_states.ERROR,
"R": job_states.RUNNING,
"CG": job_states.RUNNING,
"PD": job_states.QUEUED,
"CD": job_states.OK,
}[state]
except KeyError:
raise KeyError(f"Failed to map slurm status code [{state}] to job state.")
__all__ = ("Slurm",)