Source code for galaxy.workflow.run

import logging
import uuid
from typing import (
    List,
    Union,
)

from galaxy import model
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
from galaxy.workflow.run_request import (
    workflow_request_to_run_config,
    workflow_run_config_to_request,
    WorkflowRunConfig,
)

log = logging.getLogger(__name__)


# Entry point for core workflow scheduler.
def schedule(trans, workflow, workflow_run_config, workflow_invocation):
    return __invoke(trans, workflow, workflow_run_config, workflow_invocation)


def __invoke(trans, workflow, workflow_run_config, workflow_invocation=None, populate_state=False):
    """Run the supplied workflow in the supplied target_history."""
    if populate_state:
        modules.populate_module_and_state(
            trans,
            workflow,
            workflow_run_config.param_map,
            allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections,
        )

    invoker = WorkflowInvoker(
        trans,
        workflow,
        workflow_run_config,
        workflow_invocation=workflow_invocation,
    )
    try:
        outputs = invoker.invoke()
    except modules.CancelWorkflowEvaluation:
        if workflow_invocation:
            if workflow_invocation.cancel():
                trans.sa_session.add(workflow_invocation)
        outputs = []
    except Exception:
        log.exception("Failed to execute scheduled workflow.")
        if workflow_invocation:
            # Running workflow invocation in background, just mark
            # persistent workflow invocation as failed.
            workflow_invocation.fail()
            trans.sa_session.add(workflow_invocation)
        else:
            # Running new transient workflow invocation in legacy
            # controller action - propage the exception up.
            raise
        outputs = []

    if workflow_invocation:
        # Be sure to update state of workflow_invocation.
        trans.sa_session.flush()

    return outputs, invoker.workflow_invocation


[docs]def queue_invoke(trans, workflow, workflow_run_config, request_params=None, populate_state=True, flush=True): request_params = request_params or {} if populate_state: modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map, allow_tool_state_corrections=workflow_run_config.allow_tool_state_corrections, ) workflow_invocation = workflow_run_config_to_request(trans, workflow_run_config, workflow) workflow_invocation.workflow = workflow return trans.app.workflow_scheduling_manager.queue(workflow_invocation, request_params, flush=flush)
class WorkflowInvoker: def __init__(self, trans, workflow, workflow_run_config, workflow_invocation=None, progress=None): self.trans = trans self.workflow = workflow if progress is not None: assert workflow_invocation is None workflow_invocation = progress.workflow_invocation if workflow_invocation is None: invocation_uuid = uuid.uuid1() workflow_invocation = model.WorkflowInvocation() workflow_invocation.workflow = self.workflow # In one way or another, following attributes will become persistent # so they are available during delayed/revisited workflow scheduling. workflow_invocation.uuid = invocation_uuid workflow_invocation.history = workflow_run_config.target_history self.workflow_invocation = workflow_invocation else: self.workflow_invocation = workflow_invocation self.workflow_invocation.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history self.workflow_invocation.use_cached_job = workflow_run_config.use_cached_job self.workflow_invocation.replacement_dict = workflow_run_config.replacement_dict module_injector = modules.WorkflowModuleInjector(trans) if progress is None: progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs, module_injector, param_map=workflow_run_config.param_map, jobs_per_scheduling_iteration=getattr( trans.app.config, "maximum_workflow_jobs_per_scheduling_iteration", -1 ), ) self.progress = progress def invoke(self): workflow_invocation = self.workflow_invocation config = self.trans.app.config maximum_duration = getattr(config, "maximum_workflow_invocation_duration", -1) if maximum_duration > 0 and workflow_invocation.seconds_since_created > maximum_duration: log.debug( f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing." ) workflow_invocation.state = model.WorkflowInvocation.states.FAILED # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) # Not flushing in here, because web controller may create multiple # invocations. return self.progress.outputs if workflow_invocation.history.deleted: log.info("Cancelled workflow evaluation due to deleted history") raise modules.CancelWorkflowEvaluation() remaining_steps = self.progress.remaining_steps() delayed_steps = False max_jobs_per_iteration_reached = False for (step, workflow_invocation_step) in remaining_steps: max_jobs_to_schedule = self.progress.maximum_jobs_to_schedule_or_none if max_jobs_to_schedule is not None and max_jobs_to_schedule <= 0: max_jobs_per_iteration_reached = True break step_delayed = False step_timer = ExecutionTimer() try: self.__check_implicitly_dependent_steps(step) if not workflow_invocation_step: workflow_invocation_step = model.WorkflowInvocationStep() workflow_invocation_step.workflow_invocation = workflow_invocation workflow_invocation_step.workflow_step = step workflow_invocation_step.state = "new" workflow_invocation.steps.append(workflow_invocation_step) incomplete_or_none = self._invoke_step(workflow_invocation_step) if incomplete_or_none is False: step_delayed = delayed_steps = True workflow_invocation_step.state = "ready" self.progress.mark_step_outputs_delayed(step, why="Not all jobs scheduled for state.") else: workflow_invocation_step.state = "scheduled" except modules.DelayedWorkflowEvaluation as de: step_delayed = delayed_steps = True self.progress.mark_step_outputs_delayed(step, why=de.why) except Exception: log.exception( "Failed to schedule %s, problem occurred on %s.", self.workflow_invocation.workflow.log_str(), step.log_str(), ) raise if not step_delayed: log.debug(f"Workflow step {step.id} of invocation {workflow_invocation.id} invoked {step_timer}") if delayed_steps or max_jobs_per_iteration_reached: state = model.WorkflowInvocation.states.READY else: state = model.WorkflowInvocation.states.SCHEDULED workflow_invocation.state = state # All jobs ran successfully, so we can save now self.trans.sa_session.add(workflow_invocation) # Not flushing in here, because web controller may create multiple # invocations. return self.progress.outputs def __check_implicitly_dependent_steps(self, step): """Method will delay the workflow evaluation if implicitly dependent steps (steps dependent but not through an input->output way) are not yet complete. """ for input_connection in step.input_connections: if input_connection.non_data_connection: output_id = input_connection.output_step.id self.__check_implicitly_dependent_step(output_id) def __check_implicitly_dependent_step(self, output_id): step_invocation = self.workflow_invocation.step_invocation_for_step_id(output_id) # No steps created yet - have to delay evaluation. if not step_invocation: delayed_why = f"depends on step [{output_id}] but that step has not been invoked yet" raise modules.DelayedWorkflowEvaluation(why=delayed_why) if step_invocation.state != "scheduled": delayed_why = f"depends on step [{output_id}] job has not finished scheduling yet" raise modules.DelayedWorkflowEvaluation(delayed_why) # TODO: Handle implicit dependency on stuff like pause steps. for job in step_invocation.jobs: # At least one job in incomplete. if not job.finished: delayed_why = ( f"depends on step [{output_id}] but one or more jobs created from that step have not finished yet" ) raise modules.DelayedWorkflowEvaluation(why=delayed_why) if job.state != job.states.OK: raise modules.CancelWorkflowEvaluation() def _invoke_step(self, invocation_step): incomplete_or_none = invocation_step.workflow_step.module.execute( self.trans, self.progress, invocation_step, use_cached_job=self.workflow_invocation.use_cached_job ) return incomplete_or_none STEP_OUTPUT_DELAYED = object() class WorkflowProgress: def __init__( self, workflow_invocation, inputs_by_step_id, module_injector, param_map, jobs_per_scheduling_iteration=-1 ): self.outputs = {} self.module_injector = module_injector self.workflow_invocation = workflow_invocation self.inputs_by_step_id = inputs_by_step_id self.param_map = param_map self.jobs_per_scheduling_iteration = jobs_per_scheduling_iteration self.jobs_scheduled_this_iteration = 0 @property def maximum_jobs_to_schedule_or_none(self): if self.jobs_per_scheduling_iteration > 0: return self.jobs_per_scheduling_iteration - self.jobs_scheduled_this_iteration else: return None def record_executed_job_count(self, job_count): self.jobs_scheduled_this_iteration += job_count def remaining_steps(self): # Previously computed and persisted step states. step_states = self.workflow_invocation.step_states_by_step_id() steps = self.workflow_invocation.workflow.steps # TODO: Wouldn't a generator be much better here so we don't have to reason about # steps we are no where near ready to schedule? remaining_steps = [] step_invocations_by_id = self.workflow_invocation.step_invocations_by_step_id() for step in steps: step_id = step.id if not hasattr(step, "module"): self.module_injector.inject(step, step_args=self.param_map.get(step.id, {})) if step_id not in step_states: template = "Workflow invocation [%s] has no step state for step id [%s]. States ids are %s." message = template % (self.workflow_invocation.id, step_id, list(step_states.keys())) raise Exception(message) runtime_state = step_states[step_id].value step.state = step.module.decode_runtime_state(runtime_state) invocation_step = step_invocations_by_id.get(step_id, None) if invocation_step and invocation_step.state == "scheduled": self._recover_mapping(invocation_step) else: remaining_steps.append((step, invocation_step)) return remaining_steps def replacement_for_input(self, step, input_dict): replacement: Union[ modules.NoReplacement, model.DatasetCollectionInstance, List[model.DatasetCollectionInstance], ] = modules.NO_REPLACEMENT prefixed_name = input_dict["name"] multiple = input_dict["multiple"] if prefixed_name in step.input_connections_by_name: connection = step.input_connections_by_name[prefixed_name] if input_dict["input_type"] == "dataset" and multiple: temp = [self.replacement_for_connection(c) for c in connection] # If replacement is just one dataset collection, replace tool # input_dict with dataset collection - tool framework will extract # datasets properly. if len(temp) == 1: if isinstance(temp[0], model.HistoryDatasetCollectionAssociation): replacement = temp[0] else: replacement = temp else: replacement = temp else: is_data = input_dict["input_type"] in ["dataset", "dataset_collection"] replacement = self.replacement_for_connection(connection[0], is_data=is_data) return replacement def replacement_for_connection(self, connection, is_data=True): output_step_id = connection.output_step.id if output_step_id not in self.outputs: message = f"No outputs found for step id {output_step_id}, outputs are {self.outputs}" raise Exception(message) step_outputs = self.outputs[output_step_id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"dependent step [{output_step_id}] delayed, so this step must be delayed" raise modules.DelayedWorkflowEvaluation(why=delayed_why) output_name = connection.output_name try: replacement = step_outputs[output_name] except KeyError: # Must resolve. template = "Workflow evaluation problem - failed to find output_name %s in step_outputs %s" message = template % (output_name, step_outputs) raise Exception(message) if isinstance(replacement, model.HistoryDatasetCollectionAssociation): if not replacement.collection.populated: if not replacement.waiting_for_elements: # If we are not waiting for elements, there was some # problem creating the collection. Collection will never # be populated. # TODO: consider distinguish between cancelled and failed? raise modules.CancelWorkflowEvaluation() delayed_why = f"dependent collection [{replacement.id}] not yet populated with datasets" raise modules.DelayedWorkflowEvaluation(why=delayed_why) if isinstance(replacement, model.DatasetCollection): raise NotImplementedError if not is_data and isinstance( replacement, (model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation) ): if isinstance(replacement, model.HistoryDatasetAssociation): if replacement.is_pending: raise modules.DelayedWorkflowEvaluation() if not replacement.is_ok: raise modules.CancelWorkflowEvaluation() else: if not replacement.collection.populated: raise modules.DelayedWorkflowEvaluation() pending = False for dataset_instance in replacement.dataset_instances: if dataset_instance.is_pending: pending = True elif not dataset_instance.is_ok: raise modules.CancelWorkflowEvaluation() if pending: raise modules.DelayedWorkflowEvaluation() return replacement def get_replacement_workflow_output(self, workflow_output): step = workflow_output.workflow_step output_name = workflow_output.output_name step_outputs = self.outputs[step.id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"depends on workflow output [{output_name}] but that output has not been created yet" raise modules.DelayedWorkflowEvaluation(why=delayed_why) else: return step_outputs[output_name] def set_outputs_for_input(self, invocation_step, outputs=None, already_persisted=False): step = invocation_step.workflow_step if outputs is None: outputs = {} if self.inputs_by_step_id: step_id = step.id if step_id not in self.inputs_by_step_id and "output" not in outputs: default_value = step.input_default_value if default_value or step.input_optional: outputs["output"] = default_value else: raise ValueError(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") elif step_id in self.inputs_by_step_id: outputs["output"] = self.inputs_by_step_id[step_id] self.set_step_outputs(invocation_step, outputs, already_persisted=already_persisted) def set_step_outputs(self, invocation_step, outputs, already_persisted=False): step = invocation_step.workflow_step if invocation_step.output_value: outputs[invocation_step.output_value.workflow_output.output_name] = invocation_step.output_value.value self.outputs[step.id] = outputs if not already_persisted: workflow_outputs_by_name = {wo.output_name: wo for wo in step.workflow_outputs} for output_name, output_object in outputs.items(): if hasattr(output_object, "history_content_type"): invocation_step.add_output(output_name, output_object) else: # Add this non-data, non workflow-output output to the workflow outputs. # This is required for recovering the output in the next scheduling iteration, # and should be replaced with a WorkflowInvocationStepOutputValue ASAP. if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT: workflow_output = model.WorkflowOutput(step, output_name=output_name) step.workflow_outputs.append(workflow_output) for workflow_output in step.workflow_outputs: output_name = workflow_output.output_name if output_name not in outputs: message = f"Failed to find expected workflow output [{output_name}] in step outputs [{outputs}]" # raise KeyError(message) # Pre-18.01 we would have never even detected this output wasn't configured # and even in 18.01 we don't have a way to tell the user something bad is # happening so I guess we just log a debug message and continue sadly for now. # Once https://github.com/galaxyproject/galaxy/issues/5142 is complete we could # at least tell the user what happened, give them a warning. log.debug(message) continue output = outputs[output_name] self._record_workflow_output( step, workflow_output, output=output, ) def _record_workflow_output(self, step, workflow_output, output): self.workflow_invocation.add_output(workflow_output, step, output) def mark_step_outputs_delayed(self, step, why=None): if why: message = f"Marking step {step.id} outputs of invocation {self.workflow_invocation.id} delayed ({why})" log.debug(message) self.outputs[step.id] = STEP_OUTPUT_DELAYED def _subworkflow_invocation(self, step): workflow_invocation = self.workflow_invocation subworkflow_invocation = workflow_invocation.get_subworkflow_invocation_for_step(step) if subworkflow_invocation is None: raise Exception(f"Failed to find persisted workflow invocation for step [{step.id}]") return subworkflow_invocation def subworkflow_invoker(self, trans, step, use_cached_job=False): subworkflow_invocation = self._subworkflow_invocation(step) workflow_run_config = workflow_request_to_run_config(trans, subworkflow_invocation) subworkflow_progress = self.subworkflow_progress(subworkflow_invocation, step, workflow_run_config.param_map) subworkflow_invocation = subworkflow_progress.workflow_invocation return WorkflowInvoker( trans, workflow=subworkflow_invocation.workflow, workflow_run_config=workflow_run_config, progress=subworkflow_progress, ) def subworkflow_progress(self, subworkflow_invocation, step, param_map): subworkflow = subworkflow_invocation.workflow subworkflow_inputs = {} for input_subworkflow_step in subworkflow.input_steps: connection_found = False subworkflow_step_id = input_subworkflow_step.id for input_connection in step.input_connections: if input_connection.input_subworkflow_step_id == subworkflow_step_id: is_data = input_connection.output_step.type != "parameter_input" replacement = self.replacement_for_connection( input_connection, is_data=is_data, ) subworkflow_inputs[subworkflow_step_id] = replacement connection_found = True break if not connection_found and not input_subworkflow_step.input_optional: raise Exception("Could not find connections for all subworkflow inputs.") return WorkflowProgress(subworkflow_invocation, subworkflow_inputs, self.module_injector, param_map=param_map) def _recover_mapping(self, step_invocation): try: step_invocation.workflow_step.module.recover_mapping(step_invocation, self) except modules.DelayedWorkflowEvaluation as de: self.mark_step_outputs_delayed(step_invocation.workflow_step, de.why) __all__ = ("queue_invoke", "WorkflowRunConfig")