Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.state_handlers.resubmit

import logging
from datetime import datetime

from galaxy import model
from galaxy.jobs.runners import JobState
from ._safe_eval import safe_eval

__all__ = ('failure', )

log = logging.getLogger(__name__)

MESSAGES = dict(
    walltime_reached='it reached the walltime',
    memory_limit_reached='it exceeded the amount of allocated memory',
    unknown_error='it encountered an unknown error',
    tool_detected='it encountered a tool detected error condition',
)


def eval_condition(condition, job_state):
    runner_state = getattr(job_state, 'runner_state', None) or JobState.runner_states.UNKNOWN_ERROR

    attempt = 1
    now = datetime.utcnow()
    last_running_state = None
    last_queued_state = None
    for state in job_state.job_wrapper.get_job().state_history:
        if state.state == model.Job.states.RUNNING:
            last_running_state = state
        elif state.state == model.Job.states.QUEUED:
            last_queued_state = state
        elif state.state == model.Job.states.RESUBMITTED:
            attempt = attempt + 1

    seconds_running = 0
    seconds_since_queued = 0
    if last_running_state:
        seconds_running = (now - last_running_state.create_time).total_seconds()
    if last_queued_state:
        seconds_since_queued = (now - last_queued_state.create_time).total_seconds()

    condition_locals = {
        "walltime_reached": runner_state == JobState.runner_states.WALLTIME_REACHED,
        "memory_limit_reached": runner_state == JobState.runner_states.MEMORY_LIMIT_REACHED,
        "tool_detected_failure": runner_state == JobState.runner_states.TOOL_DETECT_ERROR,
        "unknown_error": JobState.runner_states.UNKNOWN_ERROR,
        "any_failure": True,
        "any_potential_job_failure": True,  # Add a hook here - later on allow tools to describe things that are definitely input problems.
        "attempt": attempt,
        "seconds_running": seconds_running,
        "seconds_since_queued": seconds_since_queued,
    }

    # Small optimization to eliminate the need to parse AST and eval for simple variables.
    if condition in condition_locals:
        return condition_locals[condition]
    else:
        return safe_eval(condition, condition_locals)


[docs]def failure(app, job_runner, job_state): # Leave handler quickly if no resubmit conditions specified or if the runner state doesn't allow resubmission. resubmit_definitions = job_state.job_destination.get('resubmit') if not resubmit_definitions: return runner_state = getattr(job_state, 'runner_state', None) or JobState.runner_states.UNKNOWN_ERROR if (runner_state not in (JobState.runner_states.WALLTIME_REACHED, JobState.runner_states.MEMORY_LIMIT_REACHED, JobState.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER, JobState.runner_states.TOOL_DETECT_ERROR, JobState.runner_states.UNKNOWN_ERROR)): # not set or not a handleable runner state return _handle_resubmit_definitions(resubmit_definitions, app, job_runner, job_state)
def _handle_resubmit_definitions(resubmit_definitions, app, job_runner, job_state): runner_state = getattr(job_state, 'runner_state', None) or JobState.runner_states.UNKNOWN_ERROR # Setup environment for evaluating resubmission conditions and related expression. expression_context = _ExpressionContext(job_state) # Intercept jobs that hit the walltime and have a walltime or # nonspecific resubmit destination configured for resubmit in resubmit_definitions: condition = resubmit.get('condition', None) if condition and not expression_context.safe_eval(condition): # There is a resubmit defined for the destination but # its condition is not for the encountered state continue external_id = getattr(job_state, "job_id", None) if external_id: job_log_prefix = "(%s/%s)" % (job_state.job_wrapper.job_id, job_state.job_id) else: job_log_prefix = "(%s)" % (job_state.job_wrapper.job_id) destination = resubmit['destination'] log.info("%s Job will be resubmitted to '%s' because %s at " "the '%s' destination", job_log_prefix, destination, MESSAGES[runner_state], job_state.job_wrapper.job_destination.id) # fetch JobDestination for the id or tag if destination: new_destination = app.job_config.get_destination(destination) else: new_destination = job_state.job_destination # Resolve dynamic if necessary new_destination = (job_state.job_wrapper.job_runner_mapper .cache_job_destination(new_destination)) # Reset job state job_state.job_wrapper.clear_working_directory() job_state.job_wrapper.invalidate_external_metadata() job = job_state.job_wrapper.get_job() if resubmit.get('handler', None): log.debug('%s Job reassigned to handler %s', job_log_prefix, resubmit['handler']) job.set_handler(resubmit['handler']) job_runner.sa_session.add(job) # Is this safe to do here? job_runner.sa_session.flush() # Cache the destination to prevent rerunning dynamic after # resubmit job_state.job_wrapper.job_runner_mapper \ .cached_job_destination = new_destination # Handle delaying before resubmission if needed. raw_delay = resubmit.get('delay') if raw_delay: delay = str(expression_context.safe_eval(str(raw_delay))) try: # ensure result acts like a number when persisted. float(delay) new_destination.params['__resubmit_delay_seconds'] = str(delay) except ValueError: log.warning("Cannot delay job with delay [%s], does not appear to be a number." % delay) job_state.job_wrapper.set_job_destination(new_destination) # Clear external ID (state change below flushes the change) job.job_runner_external_id = None # Allow the UI to query for resubmitted state if job.params is None: job.params = {} job_state.runner_state_handled = True info = "This job was resubmitted to the queue because %s on its " \ "compute resource." % MESSAGES[runner_state] job_runner.mark_as_resubmitted(job_state, info=info) return class _ExpressionContext(object): def __init__(self, job_state): self._job_state = job_state self._lazy_context = None def safe_eval(self, condition): if condition.isdigit(): return int(condition) if self._lazy_context is None: runner_state = getattr(self._job_state, 'runner_state', None) or JobState.runner_states.UNKNOWN_ERROR attempt = 1 now = datetime.utcnow() last_running_state = None last_queued_state = None for state in self._job_state.job_wrapper.get_job().state_history: if state.state == model.Job.states.RUNNING: last_running_state = state elif state.state == model.Job.states.QUEUED: last_queued_state = state elif state.state == model.Job.states.RESUBMITTED: attempt = attempt + 1 seconds_running = 0 seconds_since_queued = 0 if last_running_state: seconds_running = (now - last_running_state.create_time).total_seconds() if last_queued_state: seconds_since_queued = (now - last_queued_state.create_time).total_seconds() self._lazy_context = { "walltime_reached": runner_state == JobState.runner_states.WALLTIME_REACHED, "memory_limit_reached": runner_state == JobState.runner_states.MEMORY_LIMIT_REACHED, "unknown_error": JobState.runner_states.UNKNOWN_ERROR, "tool_detected_failure": runner_state == JobState.runner_states.TOOL_DETECT_ERROR, "any_failure": True, "any_potential_job_failure": True, # Add a hook here - later on allow tools to describe things that are definitely input problems. "attempt": attempt, "seconds_running": seconds_running, "seconds_since_queued": seconds_since_queued, } # Small optimization to eliminate the need to parse AST and eval for simple variables. if condition in self._lazy_context: return self._lazy_context[condition] else: return safe_eval(condition, self._lazy_context)