Source code for galaxy.workflow.run

import logging
import uuid
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
    TYPE_CHECKING,
    Union,
)

from boltons.iterutils import get_path
from typing_extensions import Protocol

from galaxy import model
from galaxy.exceptions import MessageException
from galaxy.model import (
    WorkflowInvocation,
    WorkflowInvocationStep,
)
from galaxy.model.base import (
    ensure_object_added_to_session,
    transaction,
)
from galaxy.schema.invocation import (
    CancelReason,
    FAILURE_REASONS_EXPECTED,
    FailureReason,
    InvocationCancellationHistoryDeleted,
    InvocationFailureCollectionFailed,
    InvocationFailureDatasetFailed,
    InvocationFailureJobFailed,
    InvocationFailureOutputNotFound,
    InvocationUnexpectedFailure,
    InvocationWarningWorkflowOutputNotFound,
    WarningReason,
)
from galaxy.tools.parameters.basic import raw_to_galaxy
from galaxy.tools.parameters.wrapped import nested_key_to_path
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
from galaxy.workflow.run_request import (
    workflow_request_to_run_config,
    workflow_run_config_to_request,
    WorkflowRunConfig,
)

if TYPE_CHECKING:
    from galaxy.model import (
        HistoryItem,
        Workflow,
        WorkflowOutput,
        WorkflowStep,
        WorkflowStepConnection,
    )
    from galaxy.webapps.base.webapp import GalaxyWebTransaction
    from galaxy.work.context import WorkRequestContext

log = logging.getLogger(__name__)

WorkflowOutputsType = Dict[int, Any]


# Entry point for core workflow scheduler.
def schedule(
    trans: "WorkRequestContext",
    workflow: "Workflow",
    workflow_run_config: WorkflowRunConfig,
    workflow_invocation: WorkflowInvocation,
) -> Tuple[WorkflowOutputsType, WorkflowInvocation]:
    return __invoke(trans, workflow, workflow_run_config, workflow_invocation)


def __invoke(
    trans: "WorkRequestContext",
    workflow: "Workflow",
    workflow_run_config: WorkflowRunConfig,
    workflow_invocation: Optional[WorkflowInvocation] = None,
    populate_state: bool = False,
) -> Tuple[WorkflowOutputsType, WorkflowInvocation]:
    """Run the supplied workflow in the supplied target_history."""
    if populate_state:
        modules.populate_module_and_state(
            trans,
            workflow,
            workflow_run_config.param_map,
            allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections,
        )

    invoker = WorkflowInvoker(
        trans,
        workflow,
        workflow_run_config,
        workflow_invocation=workflow_invocation,
    )
    workflow_invocation = invoker.workflow_invocation
    outputs = {}
    try:
        outputs = invoker.invoke()
    except modules.CancelWorkflowEvaluation as e:
        if workflow_invocation.cancel():
            workflow_invocation.add_message(e.why)
    except modules.FailWorkflowEvaluation as e:
        workflow_invocation.fail()
        workflow_invocation.add_message(e.why)
    except MessageException as e:
        # Convention for safe message we can show to users
        workflow_invocation.fail()
        failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure, details=str(e))
        workflow_invocation.add_message(failure)
    except Exception:
        # Could potentially be large and/or contain raw ids or other secrets, don't add details
        log.exception("Failed to execute scheduled workflow.")
        # Running workflow invocation in background, just mark
        # persistent workflow invocation as failed.
        failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure)
        workflow_invocation.fail()
        workflow_invocation.add_message(failure)

    # Be sure to update state of workflow_invocation.
    trans.sa_session.add(workflow_invocation)
    with transaction(trans.sa_session):
        trans.sa_session.commit()

    return outputs, workflow_invocation


[docs]def queue_invoke( trans: "GalaxyWebTransaction", workflow: "Workflow", workflow_run_config: WorkflowRunConfig, request_params: Optional[Dict[str, Any]] = None, populate_state: bool = True, flush: bool = True, ) -> WorkflowInvocation: request_params = request_params or {} if populate_state: modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map, allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections, ) workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow) workflow_invocation.workflow = workflow return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush)
class WorkflowInvoker: progress: "WorkflowProgress" def __init__( self, trans: "WorkRequestContext", workflow: "Workflow", workflow_run_config: WorkflowRunConfig, workflow_invocation: Optional[WorkflowInvocation] = None, progress: Optional["WorkflowProgress"] = None, ) -> None: self.trans = trans self.workflow = workflow self.workflow_invocation: WorkflowInvocation if progress is not None: assert workflow_invocation is None workflow_invocation = progress.workflow_invocation if workflow_invocation is None: invocation_uuid = uuid.uuid1() workflow_invocation = WorkflowInvocation() workflow_invocation.workflow = self.workflow # In one way or another, following attributes will become persistent # so they are available during delayed/revisited workflow scheduling. workflow_invocation.uuid = invocation_uuid workflow_invocation.history = workflow_run_config.target_history self.workflow_invocation = workflow_invocation module_injector = modules.WorkflowModuleInjector(trans) if progress is None: progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs, module_injector, param_map=workflow_run_config.param_map, jobs_per_scheduling_iteration=getattr( trans.app.config, "maximum_workflow_jobs_per_scheduling_iteration", -1 ), copy_inputs_to_history=workflow_run_config.copy_inputs_to_history, use_cached_job=workflow_run_config.use_cached_job, replacement_dict=workflow_run_config.replacement_dict, ) self.progress = progress def invoke(self) -> Dict[int, Any]: workflow_invocation = self.workflow_invocation config = self.trans.app.config maximum_duration = getattr(config, "maximum_workflow_invocation_duration", -1) if maximum_duration > 0 and workflow_invocation.seconds_since_created > maximum_duration: log.debug( f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing." ) workflow_invocation.set_state(model.WorkflowInvocation.states.FAILED) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) # Not flushing in here, because web controller may create multiple # invocations. return self.progress.outputs if workflow_invocation.history.deleted: raise modules.CancelWorkflowEvaluation( why=InvocationCancellationHistoryDeleted( reason=CancelReason.history_deleted, history_id=workflow_invocation.history_id ) ) remaining_steps = self.progress.remaining_steps() delayed_steps = False max_jobs_per_iteration_reached = False for step, workflow_invocation_step in remaining_steps: max_jobs_to_schedule = self.progress.maximum_jobs_to_schedule_or_none if max_jobs_to_schedule is not None and max_jobs_to_schedule <= 0: max_jobs_per_iteration_reached = True break step_delayed = False step_timer = ExecutionTimer() try: self.__check_implicitly_dependent_steps(step) if not workflow_invocation_step: workflow_invocation_step = WorkflowInvocationStep() assert workflow_invocation_step workflow_invocation_step.workflow_invocation = workflow_invocation ensure_object_added_to_session(workflow_invocation_step, object_in_session=workflow_invocation) workflow_invocation_step.workflow_step = step workflow_invocation_step.state = "new" workflow_invocation.steps.append(workflow_invocation_step) assert workflow_invocation_step incomplete_or_none = self._invoke_step(workflow_invocation_step) if incomplete_or_none is False: step_delayed = delayed_steps = True workflow_invocation_step.state = "ready" self.progress.mark_step_outputs_delayed(step, why="Not all jobs scheduled for state.") else: workflow_invocation_step.state = "scheduled" except modules.DelayedWorkflowEvaluation as de: step_delayed = delayed_steps = True self.progress.mark_step_outputs_delayed(step, why=de.why) except Exception as e: log_function = log.exception if isinstance(e, modules.FailWorkflowEvaluation) and e.why.reason in FAILURE_REASONS_EXPECTED: log_function = log.info log_function( "Failed to schedule %s for %s, problem occurred on %s.", self.workflow_invocation.log_str(), self.workflow_invocation.workflow.log_str(), step.log_str(), ) if isinstance(e, MessageException): # This is the highest level at which we can inject the step id # to provide some more context to the exception. raise modules.FailWorkflowEvaluation( why=InvocationUnexpectedFailure( reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id ) ) raise if not step_delayed: log.debug(f"Workflow step {step.id} of invocation {workflow_invocation.id} invoked {step_timer}") if delayed_steps or max_jobs_per_iteration_reached: state = model.WorkflowInvocation.states.READY else: state = model.WorkflowInvocation.states.SCHEDULED workflow_invocation.set_state(state) # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) # Not flushing in here, because web controller may create multiple # invocations. return self.progress.outputs def __check_implicitly_dependent_steps(self, step): """Method will delay the workflow evaluation if implicitly dependent steps (steps dependent but not through an input->output way) are not yet complete. """ for input_connection in step.input_connections: if input_connection.non_data_connection: output_id = input_connection.output_step.id self.__check_implicitly_dependent_step(output_id, step.id) def __check_implicitly_dependent_step(self, output_id: int, step_id: int): step_invocation = self.workflow_invocation.step_invocation_for_step_id(output_id) # No steps created yet - have to delay evaluation. if not step_invocation: delayed_why = f"depends on step [{output_id}] but that step has not been invoked yet" raise modules.DelayedWorkflowEvaluation(why=delayed_why) if step_invocation.state != "scheduled": delayed_why = f"depends on step [{output_id}] job has not finished scheduling yet" raise modules.DelayedWorkflowEvaluation(delayed_why) # TODO: Handle implicit dependency on stuff like pause steps. for job in step_invocation.jobs: # At least one job in incomplete. if not job.finished: delayed_why = ( f"depends on step [{output_id}] but one or more jobs created from that step have not finished yet" ) raise modules.DelayedWorkflowEvaluation(why=delayed_why) if job.state != job.states.OK: raise modules.FailWorkflowEvaluation( why=InvocationFailureJobFailed( reason=FailureReason.job_failed, job_id=job.id, workflow_step_id=step_id, dependent_workflow_step_id=output_id, ) ) def _invoke_step(self, invocation_step: WorkflowInvocationStep) -> Optional[bool]: incomplete_or_none = invocation_step.workflow_step.module.execute( self.trans, self.progress, invocation_step, use_cached_job=self.progress.use_cached_job, ) return incomplete_or_none STEP_OUTPUT_DELAYED = object() class ModuleInjector(Protocol): trans: "WorkRequestContext" def inject(self, step, step_args=None, steps=None, **kwargs): pass def inject_all(self, workflow: "Workflow", param_map=None, ignore_tool_missing_exception=True, **kwargs): pass def compute_runtime_state(self, step, step_args=None): pass class WorkflowProgress: def __init__( self, workflow_invocation: WorkflowInvocation, inputs_by_step_id: Dict[int, Any], module_injector: ModuleInjector, param_map: Dict[int, Dict[str, Any]], jobs_per_scheduling_iteration: int = -1, copy_inputs_to_history: bool = False, use_cached_job: bool = False, replacement_dict: Optional[Dict[str, str]] = None, subworkflow_collection_info=None, when_values=None, ) -> None: self.outputs: Dict[int, Any] = {} self.module_injector = module_injector self.workflow_invocation = workflow_invocation self.inputs_by_step_id = inputs_by_step_id self.param_map = param_map self.jobs_per_scheduling_iteration = jobs_per_scheduling_iteration self.jobs_scheduled_this_iteration = 0 self.copy_inputs_to_history = copy_inputs_to_history self.use_cached_job = use_cached_job self.replacement_dict = replacement_dict or {} self.runtime_replacements: Dict[str, str] = {} self.subworkflow_collection_info = subworkflow_collection_info self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None self.when_values = when_values @property def maximum_jobs_to_schedule_or_none(self) -> Optional[int]: if self.jobs_per_scheduling_iteration > 0: return self.jobs_per_scheduling_iteration - self.jobs_scheduled_this_iteration else: return None def record_executed_job_count(self, job_count: int) -> None: self.jobs_scheduled_this_iteration += job_count def remaining_steps( self, ) -> List[Tuple["WorkflowStep", Optional[WorkflowInvocationStep]]]: # Previously computed and persisted step states. step_states = self.workflow_invocation.step_states_by_step_id() steps = self.workflow_invocation.workflow.steps # TODO: Wouldn't a generator be much better here so we don't have to reason about # steps we are no where near ready to schedule? remaining_steps = [] step_invocations_by_id = self.workflow_invocation.step_invocations_by_step_id() self.module_injector.inject_all(self.workflow_invocation.workflow, param_map=self.param_map) for step in steps: step_id = step.id step_args = self.param_map.get(step_id, {}) self.module_injector.compute_runtime_state(step, step_args=step_args) if step_id not in step_states: # Can this ever happen? public_message = f"Workflow invocation has no step state for step {step.order_index + 1}" log.error(f"{public_message}. State is known for these step ids: {list(step_states.keys())}.") raise MessageException(public_message) runtime_state = step_states[step_id].value assert step.module step.state = step.module.decode_runtime_state(step, runtime_state) invocation_step = step_invocations_by_id.get(step_id, None) if invocation_step and invocation_step.state == "scheduled": self._recover_mapping(invocation_step) else: remaining_steps.append((step, invocation_step)) return remaining_steps def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[str, Any]): replacement: Union[ modules.NoReplacement, model.DatasetCollectionInstance, List[model.DatasetCollectionInstance], "HistoryItem", ] = modules.NO_REPLACEMENT prefixed_name = input_dict["name"] multiple = input_dict["multiple"] is_data = input_dict["input_type"] in ["dataset", "dataset_collection"] if prefixed_name in step.input_connections_by_name: connection = step.input_connections_by_name[prefixed_name] if input_dict["input_type"] == "dataset" and multiple: temp = [self.replacement_for_connection(c) for c in connection] # If replacement is just one dataset collection, replace tool # input_dict with dataset collection - tool framework will extract # datasets properly. if len(temp) == 1: if isinstance(temp[0], model.HistoryDatasetCollectionAssociation): replacement = temp[0] else: replacement = temp else: replacement = temp else: replacement = self.replacement_for_connection(connection[0], is_data=is_data) elif step.state and (state_input := get_path(step.state.inputs, nested_key_to_path(prefixed_name), None)): # workflow submitted with step parameters populates state directly # via populate_module_and_state replacement = state_input else: for step_input in step.inputs: if step_input.name == prefixed_name and step_input.default_value_set: if is_data: replacement = raw_to_galaxy(trans.app, trans.history, step_input.default_value) return replacement def replacement_for_connection(self, connection: "WorkflowStepConnection", is_data: bool = True): output_step_id = connection.output_step.id output_name = connection.output_name if output_step_id not in self.outputs: raise modules.FailWorkflowEvaluation( why=InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=connection.input_step_id, output_name=output_name, dependent_workflow_step_id=output_step_id, ) ) step_outputs = self.outputs[output_step_id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"dependent step [{output_step_id}] delayed, so this step must be delayed" raise modules.DelayedWorkflowEvaluation(why=delayed_why) try: replacement = step_outputs[output_name] except KeyError: raise modules.FailWorkflowEvaluation( why=InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=connection.input_step_id, output_name=output_name, dependent_workflow_step_id=output_step_id, ) ) if isinstance(replacement, model.HistoryDatasetCollectionAssociation): if not replacement.collection.populated: if not replacement.waiting_for_elements: # If we are not waiting for elements, there was some # problem creating the collection. Collection will never # be populated. raise modules.FailWorkflowEvaluation( why=InvocationFailureCollectionFailed( reason=FailureReason.collection_failed, hdca_id=replacement.id, workflow_step_id=connection.input_step_id, dependent_workflow_step_id=output_step_id, ) ) delayed_why = f"dependent collection [{replacement.id}] not yet populated with datasets" raise modules.DelayedWorkflowEvaluation(why=delayed_why) if isinstance(replacement, model.DatasetCollection): raise NotImplementedError if not is_data and isinstance( replacement, (model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation) ): if isinstance(replacement, model.HistoryDatasetAssociation): if replacement.is_pending: raise modules.DelayedWorkflowEvaluation() if not replacement.is_ok: raise modules.FailWorkflowEvaluation( why=InvocationFailureDatasetFailed( reason=FailureReason.dataset_failed, hda_id=replacement.id, workflow_step_id=connection.input_step_id, dependent_workflow_step_id=output_step_id, ) ) else: if not replacement.collection.populated: raise modules.DelayedWorkflowEvaluation() pending = False for dataset_instance in replacement.dataset_instances: if dataset_instance.is_pending: pending = True elif not dataset_instance.is_ok: raise modules.FailWorkflowEvaluation( why=InvocationFailureDatasetFailed( reason=FailureReason.dataset_failed, hda_id=replacement.id, workflow_step_id=connection.input_step_id, dependent_workflow_step_id=output_step_id, ) ) if pending: raise modules.DelayedWorkflowEvaluation() return replacement def get_replacement_workflow_output(self, workflow_output: "WorkflowOutput"): step = workflow_output.workflow_step output_name = workflow_output.output_name step_outputs = self.outputs[step.id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"depends on workflow output [{output_name}] but that output has not been created yet" raise modules.DelayedWorkflowEvaluation(why=delayed_why) else: return step_outputs[output_name] def set_outputs_for_input( self, invocation_step: WorkflowInvocationStep, outputs: Optional[Dict[str, Any]] = None, already_persisted: bool = False, ) -> None: step = invocation_step.workflow_step if outputs is None: outputs = {} if self.inputs_by_step_id: step_id = step.id if step_id not in self.inputs_by_step_id and "output" not in outputs: default_value = step.input_default_value if default_value or step.input_optional: outputs["output"] = default_value else: log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") raise modules.FailWorkflowEvaluation( why=InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=invocation_step.workflow_step_id, output_name="output", dependent_workflow_step_id=invocation_step.workflow_step_id, ) ) elif step_id in self.inputs_by_step_id: outputs["output"] = self.inputs_by_step_id[step_id] if step.label and step.type == "parameter_input" and "output" in outputs: self.runtime_replacements[step.label] = str(outputs["output"]) self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted) def effective_replacement_dict(self): replacement_dict = {} for key, value in self.replacement_dict.items(): replacement_dict[key] = value for key, value in self.runtime_replacements.items(): if key not in replacement_dict: replacement_dict[key] = value return replacement_dict def set_step_outputs( self, invocation_step: WorkflowInvocationStep, outputs: Dict[str, Any], already_persisted: bool = False ) -> None: step = invocation_step.workflow_step if invocation_step.output_value: outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value self.outputs[step.id] = outputs if not already_persisted: workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs} for output_name, output_object in outputs.items(): if hasattr(output_object, "history_content_type"): invocation_step.add_output(output_name, output_object) else: # Add this non-data, non workflow-output output to the workflow outputs. # This is required for recovering the output in the next scheduling iteration, # and should be replaced with a WorkflowInvocationStepOutputValue ASAP. if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT: workflow_output = model.WorkflowOutput(step, output_name=output_name) step.workflow_outputs.append(workflow_output) for workflow_output in step.workflow_outputs: assert workflow_output.output_name output_name = workflow_output.output_name if output_name not in outputs: invocation_step.workflow_invocation.add_message( InvocationWarningWorkflowOutputNotFound( reason=WarningReason.workflow_output_not_found, workflow_step_id=step.id, output_name=output_name, ) ) message = f"Failed to find expected workflow output [{output_name}] in step outputs [{outputs}]" log.debug(message) continue output = outputs[output_name] self._record_workflow_output( step, workflow_output, output=output, ) def _record_workflow_output(self, step: "WorkflowStep", workflow_output: "WorkflowOutput", output: Any) -> None: self.workflow_invocation.add_output(workflow_output, step, output) def mark_step_outputs_delayed(self, step: "WorkflowStep", why: Optional[str] = None) -> None: if why: message = f"Marking step {step.id} outputs of invocation {self.workflow_invocation.id} delayed ({why})" log.debug(message) self.outputs[step.id] = STEP_OUTPUT_DELAYED def _subworkflow_invocation(self, step: "WorkflowStep") -> WorkflowInvocation: workflow_invocation = self.workflow_invocation subworkflow_invocation = workflow_invocation.get_subworkflow_invocation_for_step(step) if subworkflow_invocation is None: assert step.order_index raise MessageException(f"Failed to find persisted subworkflow invocation for step [{step.order_index + 1}]") return subworkflow_invocation def subworkflow_invoker( self, trans: "WorkRequestContext", step: "WorkflowStep", use_cached_job: bool = False, subworkflow_collection_info=None, when_values=None, ) -> WorkflowInvoker: subworkflow_invocation = self._subworkflow_invocation(step) subworkflow_invocation.handler = self.workflow_invocation.handler subworkflow_invocation.scheduler = self.workflow_invocation.scheduler workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job) subworkflow_progress = self.subworkflow_progress( subworkflow_invocation, step, workflow_run_config.param_map, subworkflow_collection_info=subworkflow_collection_info, when_values=when_values, ) subworkflow_invocation = subworkflow_progress.workflow_invocation return WorkflowInvoker( trans, workflow=subworkflow_invocation.workflow, workflow_run_config=workflow_run_config, progress=subworkflow_progress, ) def subworkflow_progress( self, subworkflow_invocation: WorkflowInvocation, step: "WorkflowStep", param_map: Dict, subworkflow_collection_info=None, when_values=None, ) -> "WorkflowProgress": subworkflow = subworkflow_invocation.workflow subworkflow_inputs = {} for input_subworkflow_step in subworkflow.input_steps: connection_found = False subworkflow_step_id = input_subworkflow_step.id for input_connection in step.input_connections: if input_connection.input_subworkflow_step_id == subworkflow_step_id: is_data = input_connection.output_step.type != "parameter_input" replacement = self.replacement_for_connection( input_connection, is_data=is_data, ) subworkflow_inputs[subworkflow_step_id] = replacement connection_found = True break if not connection_found and not input_subworkflow_step.input_optional: raise modules.FailWorkflowEvaluation( InvocationFailureOutputNotFound( reason=FailureReason.output_not_found, workflow_step_id=step.id, output_name=input_connection.output_name, dependent_workflow_step_id=input_connection.output_step.id, ) ) return WorkflowProgress( subworkflow_invocation, subworkflow_inputs, self.module_injector, param_map=param_map, use_cached_job=self.use_cached_job, replacement_dict=self.replacement_dict, subworkflow_collection_info=subworkflow_collection_info, when_values=when_values, ) def raw_to_galaxy(self, value: dict): return raw_to_galaxy(self.module_injector.trans.app, self.module_injector.trans.history, value) def _recover_mapping(self, step_invocation: WorkflowInvocationStep) -> None: try: step_invocation.workflow_step.module.recover_mapping(step_invocation, self) except modules.DelayedWorkflowEvaluation as de: self.mark_step_outputs_delayed(step_invocation.workflow_step, de.why) __all__ = ("queue_invoke", "WorkflowRunConfig")