Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.cli.job.torque
from logging import getLogger
from galaxy.util import parse_xml_string
from ..job import BaseJobExec, job_states
log = getLogger(__name__)
ERROR_MESSAGE_UNRECOGNIZED_ARG = 'Unrecognized long argument passed to Torque CLI plugin: %s'
argmap = {'destination': '-q',
'Execution_Time': '-a',
'Account_Name': '-A',
'Checkpoint': '-c',
'Error_Path': '-e',
'Group_List': '-g',
'Hold_Types': '-h',
'Join_Paths': '-j',
'Keep_Files': '-k',
'Resource_List': '-l',
'Mail_Points': '-m',
'Mail_Users': '-M',
'Job_Name': '-N',
'Output_Path': '-o',
'Priority': '-p',
'Rerunable': '-r',
'Shell_Path_List': '-S',
'job_array_request': '-t',
'User_List': '-u',
'Variable_List': '-v'}
[docs]class Torque(BaseJobExec):
[docs] def __init__(self, **params):
self.params = {}
for k, v in params.items():
self.params[k] = v
[docs] def job_script_kwargs(self, ofile, efile, job_name):
pbsargs = {'-o': ofile,
'-e': efile,
'-N': job_name}
for k, v in self.params.items():
if k == 'plugin':
continue
try:
if not k.startswith('-'):
k = argmap[k]
pbsargs[k] = v
except KeyError:
log.warning(ERROR_MESSAGE_UNRECOGNIZED_ARG % k)
template_pbsargs = ''
for k, v in pbsargs.items():
template_pbsargs += f'#PBS {k} {v}\n'
return dict(headers=template_pbsargs)
[docs] def parse_status(self, status, job_ids):
# in case there's noise in the output, find the big blob 'o xml
tree = None
rval = {}
for line in status.strip().splitlines():
try:
tree = parse_xml_string(line.strip())
assert tree.tag == 'Data'
break
except Exception:
tree = None
if tree is None:
log.warning(f'No valid qstat XML return from `qstat -x`, got the following: {status}')
return None
else:
for job in tree.findall('Job'):
id = job.find('Job_Id').text
if id in job_ids:
state = job.find('job_state').text
# map PBS job states to Galaxy job states.
rval[id] = self._get_job_state(state)
return rval
[docs] def parse_single_status(self, status, job_id):
for line in status.splitlines():
line = line.split(' = ')
if line[0].strip() == 'job_state':
return self._get_job_state(line[1].strip())
# no state found, job has exited
return job_states.OK
def _get_job_state(self, state):
try:
return {
'E': job_states.RUNNING,
'R': job_states.RUNNING,
'Q': job_states.QUEUED,
'C': job_states.OK
}.get(state)
except KeyError:
raise KeyError(f"Failed to map torque status code [{state}] to job state.")
__all__ = ('Torque',)