Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.cli.job.lsf
# A simple CLI runner for slurm that can be used when running Galaxy from a
# non-submit host and using a Slurm cluster.
from logging import getLogger
from galaxy.jobs import JobState
try:
from galaxy.model import Job
job_states = Job.states
except ImportError:
# Not in Galaxy, map Galaxy job states to Pulsar ones.
from pulsar.util import enum
job_states = enum(RUNNING='running', OK='complete', QUEUED='queued', ERROR="failed")
from ..job import BaseJobExec
log = getLogger(__name__)
argmap = {
'memory': '-M', # There is code in job_script_kwargs relying on this name's setting
'cores': '-n',
'queue': '-q',
'working_dir': '-cwd',
'project': '-P'
}
[docs]class LSF(BaseJobExec):
[docs] def __init__(self, **params):
self.params = {}
for k, v in params.items():
self.params[k] = v
[docs] def job_script_kwargs(self, ofile, efile, job_name):
scriptargs = {'-o': ofile,
'-e': efile,
'-J': job_name}
# Map arguments using argmap.
for k, v in self.params.items():
if k == 'plugin':
continue
try:
if k == 'memory':
# Memory requires both -m and -R rusage[mem=v] request
scriptargs['-R'] = "\"rusage[mem=%s]\"" % v
if not k.startswith('-'):
k = argmap[k]
scriptargs[k] = v
except Exception:
log.warning('Unrecognized long argument passed to LSF CLI plugin: %s' % k)
# Generated template.
template_scriptargs = ''
for k, v in scriptargs.items():
template_scriptargs += '#BSUB {} {}\n'.format(k, v)
return dict(headers=template_scriptargs)
[docs] def submit(self, script_file):
# bsub returns Job <9147983> is submitted to default queue <research-rh7>.
# This should be really handled outside with something like
# parse_external. Currently CLI runner expect this to just send it in the last position
# of the string.
return "bsub <%s | awk '{ print $2}' | sed 's/[<>]//g'" % script_file
[docs] def parse_status(self, status, job_ids):
# Get status for each job, skipping header.
rval = {}
for line in status.splitlines():
job_id, state = line.split()
if job_id in job_ids:
# map job states to Galaxy job states.
rval[job_id] = self._get_job_state(state)
return rval
[docs] def parse_single_status(self, status, job_id):
if not status:
# Job not found in LSF, most probably finished and forgotten.
# lsf outputs: Job <num> is not found -- but that is on the stderr
# Note: a very old failed job job will not be shown here either,
# which would be badly handled here. So this only works well when Galaxy
# is constantly monitoring the jobs. The logic here is that DONE jobs get forgotten
# faster than failed jobs.
log.warning("Job id '%s' not found LSF status check" % job_id)
return job_states.OK
return self._get_job_state(status)
[docs] def parse_failure_reason(self, reason, job_id):
# LSF will produce the following in the job output file:
# TERM_MEMLIMIT: job killed after reaching LSF memory usage limit.
# Exited with exit code 143.
for line in reason.splitlines():
if "TERM_MEMLIMIT" in line:
return JobState.runner_states.MEMORY_LIMIT_REACHED
return None
def _get_job_state(self, state):
# based on:
# https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/job_state_lsf.html
# https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html
try:
return {
'EXIT': job_states.ERROR,
'RUN': job_states.RUNNING,
'PEND': job_states.QUEUED,
'DONE': job_states.OK,
'PSUSP': job_states.ERROR,
'USUSP': job_states.ERROR,
'SSUSP': job_states.ERROR,
'UNKWN': job_states.ERROR,
'WAIT': job_states.QUEUED,
'ZOMBI': job_states.ERROR
}.get(state)
except KeyError:
raise KeyError("Failed to map LSF status code [%s] to job state." % state)
__all__ = ('LSF',)