Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.condor

"""
Condor helper utilities.
"""

from subprocess import (
    CalledProcessError,
    check_call,
)

from galaxy.util import (
    commands,
    unicodify,
)
from ..external import parse_external_id

DEFAULT_QUERY_CLASSAD = dict(
    universe="vanilla",
    getenv="true",
    notification="NEVER",
)

PROBLEM_PARSING_EXTERNAL_ID = "Failed to find job id from condor_submit"

SUBMIT_PARAM_PREFIX = "submit_"


[docs]def submission_params(prefix=SUBMIT_PARAM_PREFIX, **kwds): submission_params = {} prefix_len = len(prefix) for key in kwds: value = kwds[key] key = key.lower() if key.startswith(prefix): condor_key = key[prefix_len:] submission_params[condor_key] = value return submission_params
[docs]def build_submit_description(executable, output, error, user_log, query_params): """ Build up the contents of a condor submit description file. >>> submit_args = dict(executable='/path/to/script', output='o', error='e', user_log='ul') >>> submit_args['query_params'] = dict() >>> default_description = build_submit_description(**submit_args) >>> assert 'executable = /path/to/script' in default_description >>> assert 'output = o' in default_description >>> assert 'error = e' in default_description >>> assert 'queue' in default_description >>> assert 'universe = vanilla' in default_description >>> assert 'universe = standard' not in default_description >>> submit_args['query_params'] = dict(universe='standard') >>> std_description = build_submit_description(**submit_args) >>> assert 'universe = vanilla' not in std_description >>> assert 'universe = standard' in std_description """ all_query_params = DEFAULT_QUERY_CLASSAD.copy() all_query_params.update(query_params) submit_description = [] for key, value in all_query_params.items(): submit_description.append(f"{key} = {value}") submit_description.append(f"executable = {executable}") submit_description.append(f"output = {output}") submit_description.append(f"error = {error}") submit_description.append(f"log = {user_log}") submit_description.append("queue") return "\n".join(submit_description)
[docs]def condor_submit(submit_file): """ Submit a condor job described by the given file. Parse an external id for the submission or return None and a reason for the failure. """ external_id = None failure_message = None try: condor_message = commands.execute(("condor_submit", submit_file)) except commands.CommandLineException as e: failure_message = unicodify(e) else: try: external_id = parse_external_id(condor_message, type="condor") except Exception: failure_message = f"{PROBLEM_PARSING_EXTERNAL_ID}: {condor_message}" return external_id, failure_message
[docs]def condor_stop(external_id): """ Stop running condor job and return a failure_message if this fails. """ failure_message = None try: check_call(("condor_rm", external_id)) except CalledProcessError: failure_message = "condor_rm failed" except Exception as e: failure_message = f"error encountered calling condor_rm: {unicodify(e)}" return failure_message
[docs]def summarize_condor_log(log_file, external_id): """ """ log_job_id = external_id.zfill(3) s1 = s4 = s7 = s5 = s9 = False with open(log_file) as log_handle: for line in log_handle: if f"001 ({log_job_id}." in line: s1 = True if f"004 ({log_job_id}." in line: s4 = True if f"007 ({log_job_id}." in line: s7 = True if f"005 ({log_job_id}." in line: s5 = True if f"009 ({log_job_id}." in line: s9 = True file_size = log_handle.tell() return s1, s4, s7, s5, s9, file_size