Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.run
import logging
import uuid
from typing import (
List,
Union,
)
from galaxy import model
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
from galaxy.workflow.run_request import (
workflow_request_to_run_config,
workflow_run_config_to_request,
WorkflowRunConfig,
)
log = logging.getLogger(__name__)
# Entry point for core workflow scheduler.
def schedule(trans, workflow, workflow_run_config, workflow_invocation):
return __invoke(trans, workflow, workflow_run_config, workflow_invocation)
def __invoke(trans, workflow, workflow_run_config, workflow_invocation=None, populate_state=False):
"""Run the supplied workflow in the supplied target_history."""
if populate_state:
modules.populate_module_and_state(
trans,
workflow,
workflow_run_config.param_map,
allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections,
)
invoker = WorkflowInvoker(
trans,
workflow,
workflow_run_config,
workflow_invocation=workflow_invocation,
)
try:
outputs = invoker.invoke()
except modules.CancelWorkflowEvaluation:
if workflow_invocation:
if workflow_invocation.cancel():
trans.sa_session.add(workflow_invocation)
outputs = []
except Exception:
log.exception("Failed to execute scheduled workflow.")
if workflow_invocation:
# Running workflow invocation in background, just mark
# persistent workflow invocation as failed.
workflow_invocation.fail()
trans.sa_session.add(workflow_invocation)
else:
# Running new transient workflow invocation in legacy
# controller action - propage the exception up.
raise
outputs = []
if workflow_invocation:
# Be sure to update state of workflow_invocation.
trans.sa_session.flush()
return outputs, invoker.workflow_invocation
[docs]def queue_invoke(trans, workflow, workflow_run_config, request_params=None, populate_state=True, flush=True):
request_params = request_params or {}
if populate_state:
modules.populate_module_and_state(
trans,
workflow,
workflow_run_config.param_map,
allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections,
)
workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow)
workflow_invocation.workflow = workflow
return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush)
class WorkflowInvoker:
def __init__(self, trans, workflow, workflow_run_config, workflow_invocation=None, progress=None):
self.trans = trans
self.workflow = workflow
if progress is not None:
assert workflow_invocation is None
workflow_invocation = progress.workflow_invocation
if workflow_invocation is None:
invocation_uuid = uuid.uuid1()
workflow_invocation = model.WorkflowInvocation()
workflow_invocation.workflow = self.workflow
# In one way or another, following attributes will become persistent
# so they are available during delayed/revisited workflow scheduling.
workflow_invocation.uuid = invocation_uuid
workflow_invocation.history = workflow_run_config.target_history
self.workflow_invocation = workflow_invocation
else:
self.workflow_invocation = workflow_invocation
self.workflow_invocation.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history
self.workflow_invocation.use_cached_job = workflow_run_config.use_cached_job
self.workflow_invocation.replacement_dict = workflow_run_config.replacement_dict
module_injector = modules.WorkflowModuleInjector(trans)
if progress is None:
progress = WorkflowProgress(
self.workflow_invocation,
workflow_run_config.inputs,
module_injector,
param_map=workflow_run_config.param_map,
jobs_per_scheduling_iteration=getattr(
trans.app.config, "maximum_workflow_jobs_per_scheduling_iteration", -1
),
)
self.progress = progress
def invoke(self):
workflow_invocation = self.workflow_invocation
config = self.trans.app.config
maximum_duration = getattr(config, "maximum_workflow_invocation_duration", -1)
if maximum_duration > 0 and workflow_invocation.seconds_since_created > maximum_duration:
log.debug(
f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing."
)
workflow_invocation.state = model.WorkflowInvocation.states.FAILED
# All jobs ran successfully, so we can save now
self.trans.sa_session.add(workflow_invocation)
# Not flushing in here, because web controller may create multiple
# invocations.
return self.progress.outputs
if workflow_invocation.history.deleted:
log.info("Cancelled workflow evaluation due to deleted history")
raise modules.CancelWorkflowEvaluation()
remaining_steps = self.progress.remaining_steps()
delayed_steps = False
max_jobs_per_iteration_reached = False
for (step, workflow_invocation_step) in remaining_steps:
max_jobs_to_schedule = self.progress.maximum_jobs_to_schedule_or_none
if max_jobs_to_schedule is not None and max_jobs_to_schedule <= 0:
max_jobs_per_iteration_reached = True
break
step_delayed = False
step_timer = ExecutionTimer()
try:
self.__check_implicitly_dependent_steps(step)
if not workflow_invocation_step:
workflow_invocation_step = model.WorkflowInvocationStep()
workflow_invocation_step.workflow_invocation = workflow_invocation
workflow_invocation_step.workflow_step = step
workflow_invocation_step.state = "new"
workflow_invocation.steps.append(workflow_invocation_step)
incomplete_or_none = self._invoke_step(workflow_invocation_step)
if incomplete_or_none is False:
step_delayed = delayed_steps = True
workflow_invocation_step.state = "ready"
self.progress.mark_step_outputs_delayed(step, why="Not all jobs scheduled for state.")
else:
workflow_invocation_step.state = "scheduled"
except modules.DelayedWorkflowEvaluation as de:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception:
log.exception(
"Failed to schedule %s, problem occurred on %s.",
self.workflow_invocation.workflow.log_str(),
step.log_str(),
)
raise
if not step_delayed:
log.debug(f"Workflow step {step.id} of invocation {workflow_invocation.id} invoked {step_timer}")
if delayed_steps or max_jobs_per_iteration_reached:
state = model.WorkflowInvocation.states.READY
else:
state = model.WorkflowInvocation.states.SCHEDULED
workflow_invocation.state = state
# All jobs ran successfully, so we can save now
self.trans.sa_session.add(workflow_invocation)
# Not flushing in here, because web controller may create multiple
# invocations.
return self.progress.outputs
def __check_implicitly_dependent_steps(self, step):
"""Method will delay the workflow evaluation if implicitly dependent
steps (steps dependent but not through an input->output way) are not
yet complete.
"""
for input_connection in step.input_connections:
if input_connection.non_data_connection:
output_id = input_connection.output_step.id
self.__check_implicitly_dependent_step(output_id)
def __check_implicitly_dependent_step(self, output_id):
step_invocation = self.workflow_invocation.step_invocation_for_step_id(output_id)
# No steps created yet - have to delay evaluation.
if not step_invocation:
delayed_why = f"depends on step [{output_id}] but that step has not been invoked yet"
raise modules.DelayedWorkflowEvaluation(why=delayed_why)
if step_invocation.state != "scheduled":
delayed_why = f"depends on step [{output_id}] job has not finished scheduling yet"
raise modules.DelayedWorkflowEvaluation(delayed_why)
# TODO: Handle implicit dependency on stuff like pause steps.
for job in step_invocation.jobs:
# At least one job in incomplete.
if not job.finished:
delayed_why = (
f"depends on step [{output_id}] but one or more jobs created from that step have not finished yet"
)
raise modules.DelayedWorkflowEvaluation(why=delayed_why)
if job.state != job.states.OK:
raise modules.CancelWorkflowEvaluation()
def _invoke_step(self, invocation_step):
incomplete_or_none = invocation_step.workflow_step.module.execute(
self.trans, self.progress, invocation_step, use_cached_job=self.workflow_invocation.use_cached_job
)
return incomplete_or_none
STEP_OUTPUT_DELAYED = object()
class WorkflowProgress:
def __init__(
self, workflow_invocation, inputs_by_step_id, module_injector, param_map, jobs_per_scheduling_iteration=-1
):
self.outputs = {}
self.module_injector = module_injector
self.workflow_invocation = workflow_invocation
self.inputs_by_step_id = inputs_by_step_id
self.param_map = param_map
self.jobs_per_scheduling_iteration = jobs_per_scheduling_iteration
self.jobs_scheduled_this_iteration = 0
@property
def maximum_jobs_to_schedule_or_none(self):
if self.jobs_per_scheduling_iteration > 0:
return self.jobs_per_scheduling_iteration - self.jobs_scheduled_this_iteration
else:
return None
def record_executed_job_count(self, job_count):
self.jobs_scheduled_this_iteration += job_count
def remaining_steps(self):
# Previously computed and persisted step states.
step_states = self.workflow_invocation.step_states_by_step_id()
steps = self.workflow_invocation.workflow.steps
# TODO: Wouldn't a generator be much better here so we don't have to reason about
# steps we are no where near ready to schedule?
remaining_steps = []
step_invocations_by_id = self.workflow_invocation.step_invocations_by_step_id()
for step in steps:
step_id = step.id
if not hasattr(step, "module"):
self.module_injector.inject(step, step_args=self.param_map.get(step.id, {}))
if step_id not in step_states:
template = "Workflow invocation [%s] has no step state for step id [%s]. States ids are %s."
message = template % (self.workflow_invocation.id, step_id, list(step_states.keys()))
raise Exception(message)
runtime_state = step_states[step_id].value
step.state = step.module.decode_runtime_state(runtime_state)
invocation_step = step_invocations_by_id.get(step_id, None)
if invocation_step and invocation_step.state == "scheduled":
self._recover_mapping(invocation_step)
else:
remaining_steps.append((step, invocation_step))
return remaining_steps
def replacement_for_input(self, step, input_dict):
replacement: Union[
modules.NoReplacement,
model.DatasetCollectionInstance,
List[model.DatasetCollectionInstance],
] = modules.NO_REPLACEMENT
prefixed_name = input_dict["name"]
multiple = input_dict["multiple"]
if prefixed_name in step.input_connections_by_name:
connection = step.input_connections_by_name[prefixed_name]
if input_dict["input_type"] == "dataset" and multiple:
temp = [self.replacement_for_connection(c) for c in connection]
# If replacement is just one dataset collection, replace tool
# input_dict with dataset collection - tool framework will extract
# datasets properly.
if len(temp) == 1:
if isinstance(temp[0], model.HistoryDatasetCollectionAssociation):
replacement = temp[0]
else:
replacement = temp
else:
replacement = temp
else:
is_data = input_dict["input_type"] in ["dataset", "dataset_collection"]
replacement = self.replacement_for_connection(connection[0], is_data=is_data)
return replacement
def replacement_for_connection(self, connection, is_data=True):
output_step_id = connection.output_step.id
if output_step_id not in self.outputs:
message = f"No outputs found for step id {output_step_id}, outputs are {self.outputs}"
raise Exception(message)
step_outputs = self.outputs[output_step_id]
if step_outputs is STEP_OUTPUT_DELAYED:
delayed_why = f"dependent step [{output_step_id}] delayed, so this step must be delayed"
raise modules.DelayedWorkflowEvaluation(why=delayed_why)
output_name = connection.output_name
try:
replacement = step_outputs[output_name]
except KeyError:
# Must resolve.
template = "Workflow evaluation problem - failed to find output_name %s in step_outputs %s"
message = template % (output_name, step_outputs)
raise Exception(message)
if isinstance(replacement, model.HistoryDatasetCollectionAssociation):
if not replacement.collection.populated:
if not replacement.waiting_for_elements:
# If we are not waiting for elements, there was some
# problem creating the collection. Collection will never
# be populated.
# TODO: consider distinguish between cancelled and failed?
raise modules.CancelWorkflowEvaluation()
delayed_why = f"dependent collection [{replacement.id}] not yet populated with datasets"
raise modules.DelayedWorkflowEvaluation(why=delayed_why)
if isinstance(replacement, model.DatasetCollection):
raise NotImplementedError
if not is_data and isinstance(
replacement, (model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation)
):
if isinstance(replacement, model.HistoryDatasetAssociation):
if replacement.is_pending:
raise modules.DelayedWorkflowEvaluation()
if not replacement.is_ok:
raise modules.CancelWorkflowEvaluation()
else:
if not replacement.collection.populated:
raise modules.DelayedWorkflowEvaluation()
pending = False
for dataset_instance in replacement.dataset_instances:
if dataset_instance.is_pending:
pending = True
elif not dataset_instance.is_ok:
raise modules.CancelWorkflowEvaluation()
if pending:
raise modules.DelayedWorkflowEvaluation()
return replacement
def get_replacement_workflow_output(self, workflow_output):
step = workflow_output.workflow_step
output_name = workflow_output.output_name
step_outputs = self.outputs[step.id]
if step_outputs is STEP_OUTPUT_DELAYED:
delayed_why = f"depends on workflow output [{output_name}] but that output has not been created yet"
raise modules.DelayedWorkflowEvaluation(why=delayed_why)
else:
return step_outputs[output_name]
def set_outputs_for_input(self, invocation_step, outputs=None, already_persisted=False):
step = invocation_step.workflow_step
if outputs is None:
outputs = {}
if self.inputs_by_step_id:
step_id = step.id
if step_id not in self.inputs_by_step_id and "output" not in outputs:
default_value = step.input_default_value
if default_value or step.input_optional:
outputs["output"] = default_value
else:
raise ValueError(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}")
elif step_id in self.inputs_by_step_id:
outputs["output"] = self.inputs_by_step_id[step_id]
self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted)
def set_step_outputs(self, invocation_step, outputs, already_persisted=False):
step = invocation_step.workflow_step
if invocation_step.output_value:
outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value
self.outputs[step.id] = outputs
if not already_persisted:
workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs}
for output_name, output_object in outputs.items():
if hasattr(output_object, "history_content_type"):
invocation_step.add_output(output_name, output_object)
else:
# Add this non-data, non workflow-output output to the workflow outputs.
# This is required for recovering the output in the next scheduling iteration,
# and should be replaced with a WorkflowInvocationStepOutputValue ASAP.
if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT:
workflow_output = model.WorkflowOutput(step, output_name=output_name)
step.workflow_outputs.append(workflow_output)
for workflow_output in step.workflow_outputs:
output_name = workflow_output.output_name
if output_name not in outputs:
message = f"Failed to find expected workflow output [{output_name}] in step outputs [{outputs}]"
# raise KeyError(message)
# Pre-18.01 we would have never even detected this output wasn't configured
# and even in 18.01 we don't have a way to tell the user something bad is
# happening so I guess we just log a debug message and continue sadly for now.
# Once https://github.com/galaxyproject/galaxy/issues/5142 is complete we could
# at least tell the user what happened, give them a warning.
log.debug(message)
continue
output = outputs[output_name]
self._record_workflow_output(
step,
workflow_output,
output=output,
)
def _record_workflow_output(self, step, workflow_output, output):
self.workflow_invocation.add_output(workflow_output, step, output)
def mark_step_outputs_delayed(self, step, why=None):
if why:
message = f"Marking step {step.id} outputs of invocation {self.workflow_invocation.id} delayed ({why})"
log.debug(message)
self.outputs[step.id] = STEP_OUTPUT_DELAYED
def _subworkflow_invocation(self, step):
workflow_invocation = self.workflow_invocation
subworkflow_invocation = workflow_invocation.get_subworkflow_invocation_for_step(step)
if subworkflow_invocation is None:
raise Exception(f"Failed to find persisted workflow invocation for step [{step.id}]")
return subworkflow_invocation
def subworkflow_invoker(self, trans, step, use_cached_job=False):
subworkflow_invocation = self._subworkflow_invocation(step)
workflow_run_config = workflow_request_to_run_config(trans, subworkflow_invocation)
subworkflow_progress = self.subworkflow_progress(subworkflow_invocation, step, workflow_run_config.param_map)
subworkflow_invocation = subworkflow_progress.workflow_invocation
return WorkflowInvoker(
trans,
workflow=subworkflow_invocation.workflow,
workflow_run_config=workflow_run_config,
progress=subworkflow_progress,
)
def subworkflow_progress(self, subworkflow_invocation, step, param_map):
subworkflow = subworkflow_invocation.workflow
subworkflow_inputs = {}
for input_subworkflow_step in subworkflow.input_steps:
connection_found = False
subworkflow_step_id = input_subworkflow_step.id
for input_connection in step.input_connections:
if input_connection.input_subworkflow_step_id == subworkflow_step_id:
is_data = input_connection.output_step.type != "parameter_input"
replacement = self.replacement_for_connection(
input_connection,
is_data=is_data,
)
subworkflow_inputs[subworkflow_step_id] = replacement
connection_found = True
break
if not connection_found and not input_subworkflow_step.input_optional:
raise Exception("Could not find connections for all subworkflow inputs.")
return WorkflowProgress(subworkflow_invocation, subworkflow_inputs, self.module_injector, param_map=param_map)
def _recover_mapping(self, step_invocation):
try:
step_invocation.workflow_step.module.recover_mapping(step_invocation, self)
except modules.DelayedWorkflowEvaluation as de:
self.mark_step_outputs_delayed(step_invocation.workflow_step, de.why)
__all__ = ("queue_invoke", "WorkflowRunConfig")