Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.cli.job.lsf

# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger

from galaxy.jobs import JobState
try:
    from galaxy.model import Job
    job_states = Job.states
except ImportError:
    # Not in Galaxy, map Galaxy job states to Pulsar ones.
    from pulsar.util import enum
    job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed")
from ..job import BaseJobExec

log = getLogger(__name__)

argmap = {
    'memory': '-M',  # There is code in job_script_kwargs relying on this name's setting
    'cores': '-n',
    'queue': '-q',
    'working_dir': '-cwd',
    'project': '-P'
}


[docs]class LSF(BaseJobExec):
[docs] def __init__(self, **params): self.params = {} for k, v in params.items(): self.params[k] = v
[docs] def job_script_kwargs(self, ofile, efile, job_name): scriptargs = {'-o': ofile, '-e': efile, '-J': job_name} # Map arguments using argmap. for k, v in self.params.items(): if k == 'plugin': continue try: if k == 'memory': # Memory requires both -m and -R rusage[mem=v] request scriptargs['-R'] = "\"rusage[mem=%s]\"" % v if not k.startswith('-'): k = argmap[k] scriptargs[k] = v except Exception: log.warning('Unrecognized long argument passed to LSF CLI plugin: %s' % k) # Generated template. template_scriptargs = '' for k, v in scriptargs.items(): template_scriptargs += '#BSUB %s %s\n' % (k, v) return dict(headers=template_scriptargs)
[docs] def submit(self, script_file): # bsub returns Job <9147983> is submitted to default queue <research-rh7>. # This should be really handled outside with something like # parse_external. Currently CLI runner expect this to just send it in the last position # of the string. return "bsub <%s | awk '{ print $2}' | sed 's/[<>]//g'" % script_file
[docs] def delete(self, job_id): return 'bkill %s' % job_id
[docs] def get_status(self, job_ids=None): return "bjobs -a -o \"id stat\" -noheader" # check this
[docs] def get_single_status(self, job_id): return "bjobs -o stat -noheader " + job_id
[docs] def parse_status(self, status, job_ids): # Get status for each job, skipping header. rval = {} for line in status.splitlines(): job_id, state = line.split() if job_id in job_ids: # map job states to Galaxy job states. rval[job_id] = self._get_job_state(state) return rval
[docs] def parse_single_status(self, status, job_id): if not status: # Job not found in LSF, most probably finished and forgotten. # lsf outputs: Job <num> is not found -- but that is on the stderr # Note: a very old failed job job will not be shown here either, # which would be badly handled here. So this only works well when Galaxy # is constantly monitoring the jobs. The logic here is that DONE jobs get forgotten # faster than failed jobs. log.warning("Job id '%s' not found LSF status check" % job_id) return job_states.OK return self._get_job_state(status)
[docs] def get_failure_reason(self, job_id): return "bjobs -l " + job_id
[docs] def parse_failure_reason(self, reason, job_id): # LSF will produce the following in the job output file: # TERM_MEMLIMIT: job killed after reaching LSF memory usage limit. # Exited with exit code 143. for line in reason.splitlines(): if "TERM_MEMLIMIT" in line: return JobState.runner_states.MEMORY_LIMIT_REACHED return None
def _get_job_state(self, state): # based on: # https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/job_state_lsf.html # https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html try: return { 'EXIT': job_states.ERROR, 'RUN': job_states.RUNNING, 'PEND': job_states.QUEUED, 'DONE': job_states.OK, 'PSUSP': job_states.ERROR, 'USUSP': job_states.ERROR, 'SSUSP': job_states.ERROR, 'UNKWN': job_states.ERROR, 'WAIT': job_states.QUEUED, 'ZOMBI': job_states.ERROR }.get(state) except KeyError: raise KeyError("Failed to map LSF status code [%s] to job state." % state)
__all__ = ('LSF',)