Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.runners.state_handlers.resubmit
import logging
from datetime import datetime
from galaxy import model
from galaxy.jobs.runners import JobState
from ._safe_eval import safe_eval
__all__ = ("failure",)
log = logging.getLogger(__name__)
MESSAGES = dict(
walltime_reached="it reached the walltime",
memory_limit_reached="it exceeded the amount of allocated memory",
unknown_error="it encountered an unknown error",
tool_detected="it encountered a tool detected error condition",
)
[docs]def failure(app, job_runner, job_state):
# Leave handler quickly if no resubmit conditions specified or if the runner state doesn't allow resubmission.
resubmit_definitions = job_state.job_destination.get("resubmit")
if not resubmit_definitions:
return
runner_state = getattr(job_state, "runner_state", None) or JobState.runner_states.UNKNOWN_ERROR
if runner_state not in (
JobState.runner_states.WALLTIME_REACHED,
JobState.runner_states.MEMORY_LIMIT_REACHED,
JobState.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER,
JobState.runner_states.TOOL_DETECT_ERROR,
JobState.runner_states.UNKNOWN_ERROR,
):
# not set or not a handleable runner state
return
_handle_resubmit_definitions(resubmit_definitions, app, job_runner, job_state)
def _handle_resubmit_definitions(resubmit_definitions, app, job_runner, job_state):
runner_state = getattr(job_state, "runner_state", None) or JobState.runner_states.UNKNOWN_ERROR
# Setup environment for evaluating resubmission conditions and related expression.
expression_context = _ExpressionContext(job_state)
# Intercept jobs that hit the walltime and have a walltime or
# nonspecific resubmit destination configured
for resubmit in resubmit_definitions:
condition = resubmit.get("condition", None)
if condition and not expression_context.safe_eval(condition):
# There is a resubmit defined for the destination but
# its condition is not for the encountered state
continue
external_id = getattr(job_state, "job_id", None)
if external_id:
job_log_prefix = f"({job_state.job_wrapper.job_id}/{job_state.job_id})"
else:
job_log_prefix = f"({job_state.job_wrapper.job_id})"
# Is destination needed here, might these be serialized to the database?
destination = resubmit.get("environment") or resubmit.get("destination")
log.info(
"%s Job will be resubmitted to '%s' because %s at " "the '%s' destination",
job_log_prefix,
destination,
MESSAGES[runner_state],
job_state.job_wrapper.job_destination.id,
)
# fetch JobDestination for the id or tag
if destination:
new_destination = app.job_config.get_destination(destination)
else:
new_destination = job_state.job_destination
# Resolve dynamic if necessary
new_destination = job_state.job_wrapper.job_runner_mapper.cache_job_destination(new_destination)
# Reset job state
job_state.job_wrapper.clear_working_directory()
job_state.job_wrapper.invalidate_external_metadata()
job = job_state.job_wrapper.get_job()
if resubmit.get("handler", None):
log.debug("%s Job reassigned to handler %s", job_log_prefix, resubmit["handler"])
job.set_handler(resubmit["handler"])
job_runner.sa_session.add(job)
# Is this safe to do here?
job_runner.sa_session.flush()
# Cache the destination to prevent rerunning dynamic after
# resubmit
job_state.job_wrapper.job_runner_mapper.cached_job_destination = new_destination
# Handle delaying before resubmission if needed.
raw_delay = resubmit.get("delay")
if raw_delay:
delay = str(expression_context.safe_eval(str(raw_delay)))
try:
# ensure result acts like a number when persisted.
float(delay)
new_destination.params["__resubmit_delay_seconds"] = str(delay)
except ValueError:
log.warning(f"Cannot delay job with delay [{delay}], does not appear to be a number.")
job_state.job_wrapper.set_job_destination(new_destination)
# Clear external ID (state change below flushes the change)
job.job_runner_external_id = None
# Allow the UI to query for resubmitted state
if job.params is None:
job.params = {}
job_state.runner_state_handled = True
info = "This job was resubmitted to the queue because %s on its " "compute resource." % MESSAGES[runner_state]
job_runner.mark_as_resubmitted(job_state, info=info)
return
class _ExpressionContext:
def __init__(self, job_state):
self._job_state = job_state
self._lazy_context = None
def safe_eval(self, condition):
if condition.isdigit():
return int(condition)
if self._lazy_context is None:
runner_state = getattr(self._job_state, "runner_state", None) or JobState.runner_states.UNKNOWN_ERROR
attempt = 1
now = datetime.utcnow()
last_running_state = None
last_queued_state = None
for state in self._job_state.job_wrapper.get_job().state_history:
if state.state == model.Job.states.RUNNING:
last_running_state = state
elif state.state == model.Job.states.QUEUED:
last_queued_state = state
elif state.state == model.Job.states.RESUBMITTED:
attempt = attempt + 1
seconds_running = 0
seconds_since_queued = 0
if last_running_state:
seconds_running = (now - last_running_state.create_time).total_seconds()
if last_queued_state:
seconds_since_queued = (now - last_queued_state.create_time).total_seconds()
self._lazy_context = {
"walltime_reached": runner_state == JobState.runner_states.WALLTIME_REACHED,
"memory_limit_reached": runner_state == JobState.runner_states.MEMORY_LIMIT_REACHED,
"unknown_error": runner_state == JobState.runner_states.UNKNOWN_ERROR,
"tool_detected_failure": runner_state == JobState.runner_states.TOOL_DETECT_ERROR,
"any_failure": True,
"any_potential_job_failure": True, # Add a hook here - later on allow tools to describe things that are definitely input problems.
"attempt": attempt,
"seconds_running": seconds_running,
"seconds_since_queued": seconds_since_queued,
}
# Small optimization to eliminate the need to parse AST and eval for simple variables.
if condition in self._lazy_context:
return self._lazy_context[condition]
else:
return safe_eval(condition, self._lazy_context)