Source code for galaxy.jobs.runners.util.condor

"""
Condor helper utilities.
"""
from subprocess import (
    CalledProcessError,
    check_call,
)

from galaxy.util import (
    commands,
    unicodify,
)
from ..external import parse_external_id

DEFAULT_QUERY_CLASSAD = dict(
    universe="vanilla",
    getenv="true",
    notification="NEVER",
)

PROBLEM_PARSING_EXTERNAL_ID = "Failed to find job id from condor_submit"

SUBMIT_PARAM_PREFIX = "submit_"


[docs]def submission_params(prefix=SUBMIT_PARAM_PREFIX, **kwds): submission_params = {} prefix_len = len(prefix) for key in kwds: value = kwds[key] key = key.lower() if key.startswith(prefix): condor_key = key[prefix_len:] submission_params[condor_key] = value return submission_params
[docs]def build_submit_description(executable, output, error, user_log, query_params): """ Build up the contents of a condor submit description file. >>> submit_args = dict(executable='/path/to/script', output='o', error='e', user_log='ul') >>> submit_args['query_params'] = dict() >>> default_description = build_submit_description(**submit_args) >>> assert 'executable = /path/to/script' in default_description >>> assert 'output = o' in default_description >>> assert 'error = e' in default_description >>> assert 'queue' in default_description >>> assert 'universe = vanilla' in default_description >>> assert 'universe = standard' not in default_description >>> submit_args['query_params'] = dict(universe='standard') >>> std_description = build_submit_description(**submit_args) >>> assert 'universe = vanilla' not in std_description >>> assert 'universe = standard' in std_description """ all_query_params = DEFAULT_QUERY_CLASSAD.copy() all_query_params.update(query_params) submit_description = [] for key, value in all_query_params.items(): submit_description.append(f"{key} = {value}") submit_description.append(f"executable = {executable}") submit_description.append(f"output = {output}") submit_description.append(f"error = {error}") submit_description.append(f"log = {user_log}") submit_description.append("queue") return "\n".join(submit_description)
[docs]def condor_submit(submit_file): """ Submit a condor job described by the given file. Parse an external id for the submission or return None and a reason for the failure. """ external_id = None failure_message = None try: condor_message = commands.execute(("condor_submit", submit_file)) except commands.CommandLineException as e: failure_message = unicodify(e) else: try: external_id = parse_external_id(condor_message, type="condor") except Exception: failure_message = f"{PROBLEM_PARSING_EXTERNAL_ID}: {condor_message}" return external_id, failure_message
[docs]def condor_stop(external_id): """ Stop running condor job and return a failure_message if this fails. """ failure_message = None try: check_call(("condor_rm", external_id)) except CalledProcessError: failure_message = "condor_rm failed" except Exception as e: failure_message = f"error encountered calling condor_rm: {unicodify(e)}" return failure_message
[docs]def summarize_condor_log(log_file, external_id): """ """ log_job_id = external_id.zfill(3) s1 = s4 = s7 = s5 = s9 = False with open(log_file) as log_handle: for line in log_handle: if f"001 ({log_job_id}." in line: s1 = True if f"004 ({log_job_id}." in line: s4 = True if f"007 ({log_job_id}." in line: s7 = True if f"005 ({log_job_id}." in line: s5 = True if f"009 ({log_job_id}." in line: s9 = True file_size = log_handle.tell() return s1, s4, s7, s5, s9, file_size