Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.cli.job.slurm

# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger

from ..job import BaseJobExec

try:
    from galaxy.model import Job
    job_states = Job.states
except ImportError:
    # Not in Galaxy, map Galaxy job states to Pulsar ones.
    from galaxy.util import enum
    job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed")

log = getLogger(__name__)

argmap = {
    'time': '-t',
    'ncpus': '-c',
    'partition': '-p'
}


[docs]class Slurm(BaseJobExec):
[docs] def __init__(self, **params): self.params = {} for k, v in params.items(): self.params[k] = v
[docs] def job_script_kwargs(self, ofile, efile, job_name): scriptargs = {'-o': ofile, '-e': efile, '-J': job_name} # Map arguments using argmap. for k, v in self.params.items(): if k == 'plugin': continue try: if not k.startswith('-'): k = argmap[k] scriptargs[k] = v except: log.warning('Unrecognized long argument passed to Slurm CLI plugin: %s' % k) # Generated template. template_scriptargs = '' for k, v in scriptargs.items(): template_scriptargs += '#SBATCH %s %s\n' % (k, v) return dict(headers=template_scriptargs)
[docs] def submit(self, script_file): return 'sbatch %s' % script_file
[docs] def delete(self, job_id): return 'scancel %s' % job_id
[docs] def get_status(self, job_ids=None): return "squeue -a -o '%A %t'"
[docs] def get_single_status(self, job_id): return "squeue -a -o '%A %t' -j " + job_id
[docs] def parse_status(self, status, job_ids): # Get status for each job, skipping header. rval = {} for line in status.splitlines()[1:]: id, state = line.split() if id in job_ids: # map job states to Galaxy job states. rval[id] = self._get_job_state(state) return rval
[docs] def parse_single_status(self, status, job_id): status = status.splitlines() if len(status) > 1: # Job still on cluster and has state. id, state = status[1].split() return self._get_job_state(state) # else line like "slurm_load_jobs error: Invalid job id specified" return job_states.OK
def _get_job_state(self, state): try: return { 'F': job_states.ERROR, 'R': job_states.RUNNING, 'CG': job_states.RUNNING, 'PD': job_states.QUEUED, 'CD': job_states.OK }.get(state) except KeyError: raise KeyError("Failed to map slurm status code [%s] to job state." % state)
__all__ = ('Slurm',)