Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.workflow.run

import logging
import uuid
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
    TYPE_CHECKING,
    Union,
)

from typing_extensions import Protocol

from galaxy import model
from galaxy.exceptions import MessageException
from galaxy.model import (
    WorkflowInvocation,
    WorkflowInvocationStep,
)
from galaxy.model.base import (
    ensure_object_added_to_session,
    transaction,
)
from galaxy.schema.invocation import (
    CancelReason,
    FailureReason,
    InvocationCancellationHistoryDeleted,
    InvocationFailureCollectionFailed,
    InvocationFailureDatasetFailed,
    InvocationFailureJobFailed,
    InvocationFailureOutputNotFound,
    InvocationUnexpectedFailure,
    InvocationWarningWorkflowOutputNotFound,
    WarningReason,
)
from galaxy.tools.parameters.basic import raw_to_galaxy
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
from galaxy.workflow.run_request import (
    workflow_request_to_run_config,
    workflow_run_config_to_request,
    WorkflowRunConfig,
)

if TYPE_CHECKING:
    from galaxy.model import (
        HistoryItem,
        Workflow,
        WorkflowOutput,
        WorkflowStep,
        WorkflowStepConnection,
    )
    from galaxy.webapps.base.webapp import GalaxyWebTransaction
    from galaxy.work.context import WorkRequestContext

log = logging.getLogger(__name__)

WorkflowOutputsType = Dict[int, Any]


# Entry point for core workflow scheduler.
def schedule(
    trans: "WorkRequestContext",
    workflow: "Workflow",
    workflow_run_config: WorkflowRunConfig,
    workflow_invocation: WorkflowInvocation,
) -> Tuple[WorkflowOutputsType, WorkflowInvocation]:
    return __invoke(trans, workflow, workflow_run_config, workflow_invocation)


def __invoke(
    trans: "WorkRequestContext",
    workflow: "Workflow",
    workflow_run_config: WorkflowRunConfig,
    workflow_invocation: Optional[WorkflowInvocation] = None,
    populate_state: bool = False,
) -> Tuple[WorkflowOutputsType, WorkflowInvocation]:
    """Run the supplied workflow in the supplied target_history."""
    if populate_state:
        modules.populate_module_and_state(
            trans,
            workflow,
            workflow_run_config.param_map,
            allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections,
        )

    invoker = WorkflowInvoker(
        trans,
        workflow,
        workflow_run_config,
        workflow_invocation=workflow_invocation,
    )
    workflow_invocation = invoker.workflow_invocation
    outputs = {}
    try:
        outputs = invoker.invoke()
    except modules.CancelWorkflowEvaluation as e:
        if workflow_invocation.cancel():
            workflow_invocation.add_message(e.why)
    except modules.FailWorkflowEvaluation as e:
        workflow_invocation.fail()
        workflow_invocation.add_message(e.why)
    except MessageException as e:
        # Convention for safe message we can show to users
        workflow_invocation.fail()
        failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure, details=str(e))
        workflow_invocation.add_message(failure)
    except Exception:
        # Could potentially be large and/or contain raw ids or other secrets, don't add details
        log.exception("Failed to execute scheduled workflow.")
        # Running workflow invocation in background, just mark
        # persistent workflow invocation as failed.
        failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure)
        workflow_invocation.fail()
        workflow_invocation.add_message(failure)

    # Be sure to update state of workflow_invocation.
    trans.sa_session.add(workflow_invocation)
    with transaction(trans.sa_session):
        trans.sa_session.commit()

    return outputs, workflow_invocation


[docs]def queue_invoke( trans: "GalaxyWebTransaction", workflow: "Workflow", workflow_run_config: WorkflowRunConfig, request_params: Optional[Dict[str, Any]] = None, populate_state: bool = True, flush: bool = True, ) -> WorkflowInvocation: request_params = request_params or {} if populate_state: modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map, allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections, ) workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow) workflow_invocation.workflow = workflow return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush)
class WorkflowInvoker: progress: "WorkflowProgress" def __init__( self, trans: "WorkRequestContext", workflow: "Workflow", workflow_run_config: WorkflowRunConfig, workflow_invocation: Optional[WorkflowInvocation] = None, progress: Optional["WorkflowProgress"] = None, ) -> None: self.trans = trans self.workflow = workflow self.workflow_invocation: WorkflowInvocation if progress is not None: assert workflow_invocation is None workflow_invocation = progress.workflow_invocation if workflow_invocation is None: invocation_uuid = uuid.uuid1() workflow_invocation = WorkflowInvocation() workflow_invocation.workflow = self.workflow # In one way or another, following attributes will become persistent # so they are available during delayed/revisited workflow scheduling. workflow_invocation.uuid = invocation_uuid workflow_invocation.history = workflow_run_config.target_history self.workflow_invocation = workflow_invocation module_injector = modules.WorkflowModuleInjector(trans) if progress is None: progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs, module_injector, param_map=workflow_run_config.param_map, jobs_per_scheduling_iteration=getattr( trans.app.config, "maximum_workflow_jobs_per_scheduling_iteration", -1 ), copy_inputs_to_history=workflow_run_config.copy_inputs_to_history, use_cached_job=workflow_run_config.use_cached_job, replacement_dict=workflow_run_config.replacement_dict, ) self.progress = progress def invoke(self) -> Dict[int, Any]: workflow_invocation = self.workflow_invocation config = self.trans.app.config maximum_duration = getattr(config, "maximum_workflow_invocation_duration", -1) if maximum_duration > 0 and workflow_invocation.seconds_since_created > maximum_duration: log.debug( f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing." ) workflow_invocation.set_state(model.WorkflowInvocation.states.FAILED) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) # Not flushing in here, because web controller may create multiple # invocations. return self.progress.outputs if workflow_invocation.history.deleted: raise modules.CancelWorkflowEvaluation( why=InvocationCancellationHistoryDeleted( reason=CancelReason.history_deleted, history_id=workflow_invocation.history_id ) ) remaining_steps = self.progress.remaining_steps() delayed_steps = False max_jobs_per_iteration_reached = False for step, workflow_invocation_step in remaining_steps: max_jobs_to_schedule = self.progress.maximum_jobs_to_schedule_or_none if max_jobs_to_schedule is not None and max_jobs_to_schedule <= 0: max_jobs_per_iteration_reached = True break step_delayed = False step_timer = ExecutionTimer() try: self.__check_implicitly_dependent_steps(step) if not workflow_invocation_step: workflow_invocation_step = WorkflowInvocationStep() assert workflow_invocation_step workflow_invocation_step.workflow_invocation = workflow_invocation ensure_object_added_to_session(workflow_invocation_step, object_in_session=workflow_invocation) workflow_invocation_step.workflow_step = step workflow_invocation_step.state = "new" workflow_invocation.steps.append(workflow_invocation_step) assert workflow_invocation_step incomplete_or_none = self._invoke_step(workflow_invocation_step) if incomplete_or_none is False: step_delayed = delayed_steps = True workflow_invocation_step.state = "ready" self.progress.mark_step_outputs_delayed(step, why="Not all jobs scheduled for state.") else: workflow_invocation_step.state = "scheduled" except modules.DelayedWorkflowEvaluation as de: step_delayed = delayed_steps = True self.progress.mark_step_outputs_delayed(step, why=de.why) except Exception as e: log.exception( "Failed to schedule %s, problem occurred on %s.", self.workflow_invocation.workflow.log_str(), step.log_str(), ) if isinstance(e, MessageException): # This is the highest level at which we can inject the step id # to provide some more context to the exception. raise modules.FailWorkflowEvaluation( why=InvocationUnexpectedFailure( reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id ) ) raise if not step_delayed: log.debug(f"Workflow step {step.id} of invocation {workflow_invocation.id} invoked {step_timer}") if delayed_steps or max_jobs_per_iteration_reached: state = model.WorkflowInvocation.states.READY else: state = model.WorkflowInvocation.states.SCHEDULED workflow_invocation.set_state(state) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) # Not flushing in here, because web controller may create multiple # invocations. return self.progress.outputs def __check_implicitly_dependent_steps(self, step): """Method will delay the workflow evaluation if implicitly dependent steps (steps dependent but not through an input->output way) are not yet complete. """ for input_connection in step.input_connections: if input_connection.non_data_connection: output_id = input_connection.output_step.id self.__check_implicitly_dependent_step(output_id, step.id) def __check_implicitly_dependent_step(self, output_id: int, step_id: int): step_invocation = self.workflow_invocation.step_invocation_for_step_id(output_id) # No steps created yet - have to delay evaluation. if not step_invocation: delayed_why = f"depends on step [{output_id}] but that step has not been invoked yet" raise modules.DelayedWorkflowEvaluation(why=delayed_why) if step_invocation.state != "scheduled": delayed_why = f"depends on step [{output_id}] job has not finished scheduling yet" raise modules.DelayedWorkflowEvaluation(delayed_why) # TODO: Handle implicit dependency on stuff like pause steps. for job in step_invocation.jobs: # At least one job in incomplete. if not job.finished: delayed_why = ( f"depends on step [{output_id}] but one or more jobs created from that step have not finished yet" ) raise modules.DelayedWorkflowEvaluation(why=delayed_why) if job.state != job.states.OK: raise modules.FailWorkflowEvaluation( why=InvocationFailureJobFailed( reason=FailureReason.job_failed, job_id=job.id, workflow_step_id=step_id, dependent_workflow_step_id=output_id, ) ) def _invoke_step(self, invocation_step: WorkflowInvocationStep) -> Optional[bool]: incomplete_or_none = invocation_step.workflow_step.module.execute( self.trans, self.progress, invocation_step, use_cached_job=self.progress.use_cached_job, ) return incomplete_or_none STEP_OUTPUT_DELAYED = object() class ModuleInjector(Protocol): trans: "WorkRequestContext" def inject(self, step, step_args=None, steps=None, **kwargs): pass def inject_all(self, workflow: "Workflow", param_map=None, ignore_tool_missing_exception=True, **kwargs): pass def compute_runtime_state(self, step, step_args=None): pass class WorkflowProgress: def __init__( self, workflow_invocation: WorkflowInvocation, inputs_by_step_id: Dict[int, Any], module_injector: ModuleInjector, param_map: Dict[int, Dict[str, Any]], jobs_per_scheduling_iteration: int = -1, copy_inputs_to_history: bool = False, use_cached_job: bool = False, replacement_dict: Optional[Dict[str, str]] = None, subworkflow_collection_info=None, when_values=None, ) -> None: self.outputs: Dict[int, Any] = {} self.module_injector = module_injector self.workflow_invocation = workflow_invocation self.inputs_by_step_id = inputs_by_step_id self.param_map = param_map self.jobs_per_scheduling_iteration = jobs_per_scheduling_iteration self.jobs_scheduled_this_iteration = 0 self.copy_inputs_to_history = copy_inputs_to_history self.use_cached_job = use_cached_job self.replacement_dict = replacement_dict or {} self.runtime_replacements: Dict[str, str] = {} self.subworkflow_collection_info = subworkflow_collection_info self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None self.when_values = when_values @property def maximum_jobs_to_schedule_or_none(self) -> Optional[int]: if self.jobs_per_scheduling_iteration > 0: return self.jobs_per_scheduling_iteration - self.jobs_scheduled_this_iteration else: return None def record_executed_job_count(self, job_count: int) -> None: self.jobs_scheduled_this_iteration += job_count def remaining_steps( self, ) -> List[Tuple["WorkflowStep", Optional[WorkflowInvocationStep]]]: # Previously computed and persisted step states. step_states = self.workflow_invocation.step_states_by_step_id() steps = self.workflow_invocation.workflow.steps # TODO: Wouldn't a generator be much better here so we don't have to reason about # steps we are no where near ready to schedule? remaining_steps = [] step_invocations_by_id = self.workflow_invocation.step_invocations_by_step_id() self.module_injector.inject_all(self.workflow_invocation.workflow, param_map=self.param_map) for step in steps: step_id = step.id step_args = self.param_map.get(step_id, {}) self.module_injector.compute_runtime_state(step, step_args=step_args) if step_id not in step_states: # Can this ever happen? public_message = f"Workflow invocation has no step state for step {step.order_index + 1}" log.error(f"{public_message}. State is known for these step ids: {list(step_states.keys())}.") raise MessageException(public_message) runtime_state = step_states[step_id].value assert step.module step.state = step.module.decode_runtime_state(step, runtime_state) invocation_step = step_invocations_by_id.get(step_id, None) if invocation_step and invocation_step.state == "scheduled": self._recover_mapping(invocation_step) else: remaining_steps.append((step, invocation_step)) return remaining_steps def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[str, Any]): replacement: Union[ modules.NoReplacement, model.DatasetCollectionInstance, List[model.DatasetCollectionInstance], "HistoryItem", ] = modules.NO_REPLACEMENT prefixed_name = input_dict["name"] multiple = input_dict["multiple"] is_data = input_dict["input_type"] in ["dataset", "dataset_collection"] if prefixed_name in step.input_connections_by_name: connection = step.input_connections_by_name[prefixed_name] if input_dict["input_type"] == "dataset" and multiple: temp = [self.replacement_for_connection(c) for c in connection] # If replacement is just one dataset collection, replace tool # input_dict with dataset collection - tool framework will extract # datasets properly. if len(temp) == 1: if isinstance(temp[0], model.HistoryDatasetCollectionAssociation): replacement = temp[0] else: replacement = temp else: replacement = temp else: replacement = self.replacement_for_connection(connection[0], is_data=is_data) else: for step_input in step.inputs: if step_input.name == prefixed_name and step_input.default_value_set: if is_data: replacement = raw_to_galaxy(trans.app, trans.history, step_input.default_value) return replacement def replacement_for_connection(self, connection: "WorkflowStepConnection", is_data: bool = True): output_step_id = connection.output_step.id output_name = connection.output_name if output_step_id not in self.outputs: raise modules.FailWorkflowEvaluation( why=InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=connection.input_step_id, output_name=output_name, dependent_workflow_step_id=output_step_id, ) ) step_outputs = self.outputs[output_step_id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"dependent step [{output_step_id}] delayed, so this step must be delayed" raise modules.DelayedWorkflowEvaluation(why=delayed_why) try: replacement = step_outputs[output_name] except KeyError: raise modules.FailWorkflowEvaluation( why=InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=connection.input_step_id, output_name=output_name, dependent_workflow_step_id=output_step_id, ) ) if isinstance(replacement, model.HistoryDatasetCollectionAssociation): if not replacement.collection.populated: if not replacement.waiting_for_elements: # If we are not waiting for elements, there was some # problem creating the collection. Collection will never # be populated. raise modules.FailWorkflowEvaluation( why=InvocationFailureCollectionFailed( reason=FailureReason.collection_failed, hdca_id=replacement.id, workflow_step_id=connection.input_step_id, dependent_workflow_step_id=output_step_id, ) ) delayed_why = f"dependent collection [{replacement.id}] not yet populated with datasets" raise modules.DelayedWorkflowEvaluation(why=delayed_why) if isinstance(replacement, model.DatasetCollection): raise NotImplementedError if not is_data and isinstance( replacement, (model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation) ): if isinstance(replacement, model.HistoryDatasetAssociation): if replacement.is_pending: raise modules.DelayedWorkflowEvaluation() if not replacement.is_ok: raise modules.FailWorkflowEvaluation( why=InvocationFailureDatasetFailed( reason=FailureReason.dataset_failed, hda_id=replacement.id, workflow_step_id=connection.input_step_id, dependent_workflow_step_id=output_step_id, ) ) else: if not replacement.collection.populated: raise modules.DelayedWorkflowEvaluation() pending = False for dataset_instance in replacement.dataset_instances: if dataset_instance.is_pending: pending = True elif not dataset_instance.is_ok: raise modules.FailWorkflowEvaluation( why=InvocationFailureDatasetFailed( reason=FailureReason.dataset_failed, hda_id=replacement.id, workflow_step_id=connection.input_step_id, dependent_workflow_step_id=output_step_id, ) ) if pending: raise modules.DelayedWorkflowEvaluation() return replacement def get_replacement_workflow_output(self, workflow_output: "WorkflowOutput"): step = workflow_output.workflow_step output_name = workflow_output.output_name step_outputs = self.outputs[step.id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"depends on workflow output [{output_name}] but that output has not been created yet" raise modules.DelayedWorkflowEvaluation(why=delayed_why) else: return step_outputs[output_name] def set_outputs_for_input( self, invocation_step: WorkflowInvocationStep, outputs: Optional[Dict[str, Any]] = None, already_persisted: bool = False, ) -> None: step = invocation_step.workflow_step if outputs is None: outputs = {} if self.inputs_by_step_id: step_id = step.id if step_id not in self.inputs_by_step_id and "output" not in outputs: default_value = step.input_default_value if default_value or step.input_optional: outputs["output"] = default_value else: log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") raise modules.FailWorkflowEvaluation( why=InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=invocation_step.workflow_step_id, output_name="output", dependent_workflow_step_id=invocation_step.workflow_step_id, ) ) elif step_id in self.inputs_by_step_id: outputs["output"] = self.inputs_by_step_id[step_id] if step.label and step.type == "parameter_input" and "output" in outputs: self.runtime_replacements[step.label] = str(outputs["output"]) self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted) def effective_replacement_dict(self): replacement_dict = {} for key, value in self.replacement_dict.items(): replacement_dict[key] = value for key, value in self.runtime_replacements.items(): if key not in replacement_dict: replacement_dict[key] = value return replacement_dict def set_step_outputs( self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False ) -> None: step = invocation_step.workflow_step if invocation_step.output_value: outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value self.outputs[step.id] = outputs if not already_persisted: workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs} for output_name, output_object in outputs.items(): if hasattr(output_object, "history_content_type"): invocation_step.add_output(output_name, output_object) else: # Add this non-data, non workflow-output output to the workflow outputs. # This is required for recovering the output in the next scheduling iteration, # and should be replaced with a WorkflowInvocationStepOutputValue ASAP. if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT: workflow_output = model.WorkflowOutput(step, output_name=output_name) step.workflow_outputs.append(workflow_output) for workflow_output in step.workflow_outputs: assert workflow_output.output_name output_name = workflow_output.output_name if output_name not in outputs: invocation_step.workflow_invocation.add_message( InvocationWarningWorkflowOutputNotFound( reason=WarningReason.workflow_output_not_found, workflow_step_id=step.id, output_name=output_name, ) ) message = f"Failed to find expected workflow output [{output_name}] in step outputs [{outputs}]" log.debug(message) continue output = outputs[output_name] self._record_workflow_output( step, workflow_output, output=output, ) def _record_workflow_output(self, step: "WorkflowStep", workflow_output: "WorkflowOutput", output: Any) -> None: self.workflow_invocation.add_output(workflow_output, step, output) def mark_step_outputs_delayed(self, step: "WorkflowStep", why: Optional[str] = None) -> None: if why: message = f"Marking step {step.id} outputs of invocation {self.workflow_invocation.id} delayed ({why})" log.debug(message) self.outputs[step.id] = STEP_OUTPUT_DELAYED def _subworkflow_invocation(self, step: "WorkflowStep") -> WorkflowInvocation: workflow_invocation = self.workflow_invocation subworkflow_invocation = workflow_invocation.get_subworkflow_invocation_for_step(step) if subworkflow_invocation is None: assert step.order_index raise MessageException(f"Failed to find persisted subworkflow invocation for step [{step.order_index + 1}]") return subworkflow_invocation def subworkflow_invoker( self, trans: "WorkRequestContext", step: "WorkflowStep", use_cached_job: bool = False, subworkflow_collection_info=None, when_values=None, ) -> WorkflowInvoker: subworkflow_invocation = self._subworkflow_invocation(step) subworkflow_invocation.handler = self.workflow_invocation.handler subworkflow_invocation.scheduler = self.workflow_invocation.scheduler workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job) subworkflow_progress = self.subworkflow_progress( subworkflow_invocation, step, workflow_run_config.param_map, subworkflow_collection_info=subworkflow_collection_info, when_values=when_values, ) subworkflow_invocation = subworkflow_progress.workflow_invocation return WorkflowInvoker( trans, workflow=subworkflow_invocation.workflow, workflow_run_config=workflow_run_config, progress=subworkflow_progress, ) def subworkflow_progress( self, subworkflow_invocation: WorkflowInvocation, step: "WorkflowStep", param_map: Dict, subworkflow_collection_info=None, when_values=None, ) -> "WorkflowProgress": subworkflow = subworkflow_invocation.workflow subworkflow_inputs = {} for input_subworkflow_step in subworkflow.input_steps: connection_found = False subworkflow_step_id = input_subworkflow_step.id for input_connection in step.input_connections: if input_connection.input_subworkflow_step_id == subworkflow_step_id: is_data = input_connection.output_step.type != "parameter_input" replacement = self.replacement_for_connection( input_connection, is_data=is_data, ) subworkflow_inputs[subworkflow_step_id] = replacement connection_found = True break if not connection_found and not input_subworkflow_step.input_optional: raise modules.FailWorkflowEvaluation( InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=step.id, output_name=input_connection.output_name, dependent_workflow_step_id=input_connection.output_step.id, ) ) return WorkflowProgress( subworkflow_invocation, subworkflow_inputs, self.module_injector, param_map=param_map, use_cached_job=self.use_cached_job, replacement_dict=self.replacement_dict, subworkflow_collection_info=subworkflow_collection_info, when_values=when_values, ) def raw_to_galaxy(self, value: dict): return raw_to_galaxy(self.module_injector.trans.app, self.module_injector.trans.history, value) def _recover_mapping(self, step_invocation: WorkflowInvocationStep) -> None: try: step_invocation.workflow_step.module.recover_mapping(step_invocation, self) except modules.DelayedWorkflowEvaluation as de: self.mark_step_outputs_delayed(step_invocation.workflow_step, de.why) __all__ = ("queue_invoke", "WorkflowRunConfig")