Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.util.condor
"""
Condor helper utilities.
"""
from subprocess import (
CalledProcessError,
check_call,
PIPE,
Popen,
STDOUT
)
from ..external import parse_external_id
DEFAULT_QUERY_CLASSAD = dict(
universe='vanilla',
getenv='true',
notification='NEVER',
)
PROBLEM_RUNNING_CONDOR_SUBMIT = \
"Problem encountered while running condor_submit."
PROBLEM_PARSING_EXTERNAL_ID = \
"Failed to find job id from condor_submit"
SUBMIT_PARAM_PREFIX = "submit_"
[docs]def submission_params(prefix=SUBMIT_PARAM_PREFIX, **kwds):
submission_params = {}
for key in kwds:
value = kwds[key]
key = key.lower()
if key.startswith(prefix):
condor_key = key[len(prefix):]
submission_params[condor_key] = value
return submission_params
[docs]def build_submit_description(executable, output, error, user_log, query_params):
"""
Build up the contents of a condor submit description file.
>>> submit_args = dict(executable='/path/to/script', output='o', error='e', user_log='ul')
>>> submit_args['query_params'] = dict()
>>> default_description = build_submit_description(**submit_args)
>>> assert 'executable = /path/to/script' in default_description
>>> assert 'output = o' in default_description
>>> assert 'error = e' in default_description
>>> assert 'queue' in default_description
>>> assert 'universe = vanilla' in default_description
>>> assert 'universe = standard' not in default_description
>>> submit_args['query_params'] = dict(universe='standard')
>>> std_description = build_submit_description(**submit_args)
>>> assert 'universe = vanilla' not in std_description
>>> assert 'universe = standard' in std_description
"""
all_query_params = DEFAULT_QUERY_CLASSAD.copy()
all_query_params.update(query_params)
submit_description = []
for key, value in all_query_params.items():
submit_description.append('%s = %s' % (key, value))
submit_description.append('executable = ' + executable)
submit_description.append('output = ' + output)
submit_description.append('error = ' + error)
submit_description.append('log = ' + user_log)
submit_description.append('queue')
return '\n'.join(submit_description)
[docs]def condor_submit(submit_file):
"""
Submit a condor job described by the given file. Parse an external id for
the submission or return None and a reason for the failure.
"""
external_id = None
try:
submit = Popen(('condor_submit', submit_file), stdout=PIPE, stderr=STDOUT)
message, _ = submit.communicate()
if submit.returncode == 0:
external_id = parse_external_id(message, type='condor')
else:
message = PROBLEM_PARSING_EXTERNAL_ID
except Exception as e:
message = str(e)
return external_id, message
[docs]def condor_stop(external_id):
"""
Stop running condor job and return a failure_message if this
fails.
"""
failure_message = None
try:
check_call(('condor_rm', external_id))
except CalledProcessError:
failure_message = "condor_rm failed"
except Exception as e:
"error encountered calling condor_rm: %s" % e
return failure_message
[docs]def summarize_condor_log(log_file, external_id):
"""
"""
log_job_id = external_id.zfill(3)
s1 = s4 = s7 = s5 = s9 = False
with open(log_file, 'r') as log_handle:
for line in log_handle:
if '001 (' + log_job_id + '.' in line:
s1 = True
if '004 (' + log_job_id + '.' in line:
s4 = True
if '007 (' + log_job_id + '.' in line:
s7 = True
if '005 (' + log_job_id + '.' in line:
s5 = True
if '009 (' + log_job_id + '.' in line:
s9 = True
file_size = log_handle.tell()
return s1, s4, s7, s5, s9, file_size