Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.cli.job.slurm
# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger
try:
from galaxy.model import Job
job_states = Job.states
except ImportError:
# Not in Galaxy, map Galaxy job states to Pulsar ones.
from pulsar.util import enum
job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed")
from ..job import BaseJobExec
log = getLogger(__name__)
argmap = {
'time': '-t',
'ncpus': '-c',
'partition': '-p'
}
[docs]class Slurm(BaseJobExec):
[docs] def __init__(self, **params):
self.params = {}
for k, v in params.items():
self.params[k] = v
[docs] def job_script_kwargs(self, ofile, efile, job_name):
scriptargs = {'-o': ofile,
'-e': efile,
'-J': job_name}
# Map arguments using argmap.
for k, v in self.params.items():
if k == 'plugin':
continue
try:
if not k.startswith('-'):
k = argmap[k]
scriptargs[k] = v
except Exception:
log.warning('Unrecognized long argument passed to Slurm CLI plugin: %s' % k)
# Generated template.
template_scriptargs = ''
for k, v in scriptargs.items():
template_scriptargs += '#SBATCH {} {}\n'.format(k, v)
return dict(headers=template_scriptargs)
[docs] def parse_status(self, status, job_ids):
# Get status for each job, skipping header.
rval = {}
for line in status.splitlines()[1:]:
id, state = line.split()
if id in job_ids:
# map job states to Galaxy job states.
rval[id] = self._get_job_state(state)
return rval
[docs] def parse_single_status(self, status, job_id):
status = status.splitlines()
if len(status) > 1:
# Job still on cluster and has state.
id, state = status[1].split()
return self._get_job_state(state)
# else line like "slurm_load_jobs error: Invalid job id specified"
return job_states.OK
def _get_job_state(self, state):
try:
return {
'F': job_states.ERROR,
'R': job_states.RUNNING,
'CG': job_states.RUNNING,
'PD': job_states.QUEUED,
'CD': job_states.OK
}.get(state)
except KeyError:
raise KeyError("Failed to map slurm status code [%s] to job state." % state)
__all__ = ('Slurm',)