Source code for galaxy.workflow.run_request

import json
import logging
import uuid
from typing import (
    Any,
    Dict,
    List,
    Optional,
    TYPE_CHECKING,
)

from galaxy import exceptions
from galaxy.model import (
    EffectiveOutput,
    History,
    HistoryDatasetAssociation,
    HistoryDatasetCollectionAssociation,
    LibraryDataset,
    LibraryDatasetDatasetAssociation,
    WorkflowInvocation,
    WorkflowRequestInputParameter,
    WorkflowRequestStepState,
)
from galaxy.model.base import (
    ensure_object_added_to_session,
    transaction,
)
from galaxy.tools.parameters.meta import expand_workflow_inputs
from galaxy.workflow.resources import get_resource_mapper_function

if TYPE_CHECKING:
    from galaxy.model import (
        Workflow,
        WorkflowStep,
    )
    from galaxy.webapps.base.webapp import GalaxyWebTransaction

INPUT_STEP_TYPES = ["data_input", "data_collection_input", "parameter_input"]

log = logging.getLogger(__name__)


[docs]class WorkflowRunConfig: """Wrapper around all the ways a workflow execution can be parameterized. :param target_history: History to execute workflow in. :type target_history: galaxy.model.History. :param replacement_dict: Workflow level parameters used for renaming post job actions. :type replacement_dict: dict :param copy_inputs_to_history: Should input data parameters be copied to target_history. (Defaults to False) :type copy_inputs_to_history: bool :param inputs: Map from step ids to dict's containing HDA for these steps. :type inputs: dict :param inputs_by: How inputs maps to inputs (datasets/collections) to workflows steps - by unencoded database id ('step_id'), index in workflow 'step_index' (independent of database), or by input name for that step ('name'). :type inputs_by: str :param param_map: Override step parameters - should be dict with step id keys and tool param name-value dicts as values. :type param_map: dict """
[docs] def __init__( self, target_history: "History", replacement_dict: Optional[Dict[str, Any]] = None, inputs: Optional[Dict[int, Any]] = None, param_map: Optional[Dict[int, Any]] = None, allow_tool_state_corrections: bool = False, copy_inputs_to_history: bool = False, use_cached_job: bool = False, resource_params: Optional[Dict[int, Any]] = None, preferred_object_store_id: Optional[str] = None, preferred_outputs_object_store_id: Optional[str] = None, preferred_intermediate_object_store_id: Optional[str] = None, effective_outputs: Optional[List[EffectiveOutput]] = None, ) -> None: self.target_history = target_history self.replacement_dict = replacement_dict or {} self.copy_inputs_to_history = copy_inputs_to_history self.inputs = inputs or {} self.param_map = param_map or {} self.resource_params = resource_params or {} self.allow_tool_state_corrections = allow_tool_state_corrections self.use_cached_job = use_cached_job self.preferred_object_store_id = preferred_object_store_id self.preferred_outputs_object_store_id = preferred_outputs_object_store_id self.preferred_intermediate_object_store_id = preferred_intermediate_object_store_id self.effective_outputs = effective_outputs
def _normalize_inputs( steps: List["WorkflowStep"], inputs: Dict[str, Dict[str, Any]], inputs_by: str ) -> Dict[int, Dict[str, Any]]: normalized_inputs = {} for step in steps: if step.type not in INPUT_STEP_TYPES: continue possible_input_keys = [] for inputs_by_el in inputs_by.split("|"): if inputs_by_el == "step_id": possible_input_keys.append(str(step.id)) elif inputs_by_el == "step_index": possible_input_keys.append(str(step.order_index)) elif inputs_by_el == "step_uuid": possible_input_keys.append(str(step.uuid)) elif inputs_by_el == "name": possible_input_keys.append(step.label or step.tool_inputs.get("name")) # type:ignore[union-attr] else: raise exceptions.MessageException( "Workflow cannot be run because unexpected inputs_by value specified." ) inputs_key = None for possible_input_key in possible_input_keys: if possible_input_key in inputs: inputs_key = possible_input_key default_not_set = object() default_value = step.get_input_default_value(default_not_set) has_default = default_value is not default_not_set optional = step.input_optional # Need to be careful here to make sure 'default' has correct type - not sure how to do that # but asserting 'optional' is definitely a bool and not a String->Bool or something is a good # start to ensure tool state is being preserved and loaded in a type safe way. assert isinstance(has_default, bool) assert isinstance(optional, bool) has_input_value = inputs_key and inputs[inputs_key] is not None if not has_input_value and not has_default and not optional: message = f"Workflow cannot be run because input step '{step.id}' ({step.label}) is not optional and no input provided." raise exceptions.MessageException(message) if inputs_key: normalized_inputs[step.id] = inputs[inputs_key] return normalized_inputs def _normalize_step_parameters( steps: List["WorkflowStep"], param_map: Dict, legacy: bool = False, already_normalized: bool = False ) -> Dict: """Take a complex param_map that can reference parameters by step_id in the new flexible way or in the old one-parameter per step fashion or by tool id and normalize the parameters so everything is referenced by a numeric step id. """ normalized_param_map = {} for step in steps: if already_normalized: param_dict = param_map.get(str(step.order_index), {}) else: param_dict = _step_parameters(step, param_map, legacy=legacy) if step.type == "subworkflow" and param_dict: if not already_normalized: raise exceptions.RequestParameterInvalidException( "Specifying subworkflow step parameters requires already_normalized to be specified as true." ) subworkflow_param_dict: Dict[str, Dict[str, str]] = {} for key, value in param_dict.items(): step_index, param_name = key.split("|", 1) if step_index not in subworkflow_param_dict: subworkflow_param_dict[step_index] = {} subworkflow_param_dict[step_index][param_name] = value assert step.subworkflow param_dict = _normalize_step_parameters( step.subworkflow.steps, subworkflow_param_dict, legacy=legacy, already_normalized=already_normalized ) if param_dict: normalized_param_map[step.id] = param_dict return normalized_param_map def _step_parameters(step: "WorkflowStep", param_map: Dict, legacy: bool = False) -> Dict: """ Update ``step`` parameters based on the user-provided ``param_map`` dict. ``param_map`` should be structured as follows:: PARAM_MAP = {STEP_ID_OR_UUID: PARAM_DICT, ...} PARAM_DICT = {NAME: VALUE, ...} For backwards compatibility, the following (deprecated) format is also supported for ``param_map``:: PARAM_MAP = {TOOL_ID: PARAM_DICT, ...} in which case PARAM_DICT affects all steps with the given tool id. If both by-tool-id and by-step-id specifications are used, the latter takes precedence. Finally (again, for backwards compatibility), PARAM_DICT can also be specified as:: PARAM_DICT = {'param': NAME, 'value': VALUE} Note that this format allows only one parameter to be set per step. """ param_dict = param_map.get(step.tool_id, {}).copy() if legacy: param_dict.update(param_map.get(str(step.id), {})) else: param_dict.update(param_map.get(str(step.order_index), {})) if step_uuid := step.uuid: uuid_params = param_map.get(str(step_uuid), {}) param_dict.update(uuid_params) if param_dict: if "param" in param_dict and "value" in param_dict: param_dict[param_dict["param"]] = param_dict["value"] del param_dict["param"] del param_dict["value"] # Inputs can be nested dict, but Galaxy tool code wants nesting of keys (e.g. # cond1|moo=4 instead of cond1: {moo: 4} ). new_params = _flatten_step_params(param_dict) return new_params def _flatten_step_params(param_dict: Dict, prefix: str = "") -> Dict: # TODO: Temporary work around until tool code can process nested data # structures. This should really happen in there so the tools API gets # this functionality for free and so that repeats can be handled # properly. Also the tool code walks the tool inputs so it nows what is # a complex value object versus something that maps to child parameters # better than the hack or searching for src and id here. new_params = {} for key in list(param_dict.keys()): if prefix: effective_key = f"{prefix}|{key}" else: effective_key = key value = param_dict[key] if isinstance(value, dict) and (not ("src" in value and "id" in value) and key != "__POST_JOB_ACTIONS__"): new_params.update(_flatten_step_params(value, effective_key)) else: new_params[effective_key] = value return new_params def _get_target_history( trans: "GalaxyWebTransaction", workflow: "Workflow", payload: Dict[str, Any], param_keys: Optional[List[List]] = None, index: int = 0, ) -> History: param_keys = param_keys or [] history_name = payload.get("new_history_name", None) history_id = payload.get("history_id", None) history_param = payload.get("history", None) if [history_name, history_id, history_param].count(None) < 2: raise exceptions.RequestParameterInvalidException( "Specified workflow target history multiple ways - at most one of 'history', 'history_id', and 'new_history_name' may be specified." ) if history_param: if history_param.startswith("hist_id="): history_id = history_param[8:] else: history_name = history_param if history_id: history_manager = trans.app.history_manager target_history = history_manager.get_mutable( trans.security.decode_id(history_id), trans.user, current_history=trans.history ) else: if history_name: nh_name = history_name else: nh_name = f"History from {workflow.name} workflow" if len(param_keys) <= index: raise exceptions.MessageException("Incorrect expansion of workflow batch parameters.") ids = param_keys[index] nids = len(ids) if nids == 1: nh_name = f"{nh_name} on {ids[0]}" elif nids > 1: nh_name = f"{nh_name} on {', '.join(ids[0:-1])} and {ids[-1]}" new_history = History(user=trans.user, name=nh_name) trans.sa_session.add(new_history) with transaction(trans.sa_session): trans.sa_session.commit() target_history = new_history return target_history
[docs]def build_workflow_run_configs( trans: "GalaxyWebTransaction", workflow: "Workflow", payload: Dict[str, Any] ) -> List[WorkflowRunConfig]: app = trans.app allow_tool_state_corrections = payload.get("allow_tool_state_corrections", False) use_cached_job = payload.get("use_cached_job", False) # Sanity checks. if len(workflow.steps) == 0: raise exceptions.MessageException("Workflow cannot be run because it does not have any steps") if workflow.has_cycles: raise exceptions.MessageException("Workflow cannot be run because it contains cycles") if "step_parameters" in payload and "parameters" in payload: raise exceptions.RequestParameterInvalidException( "Cannot specify both legacy parameters and step_parameters attributes." ) if "inputs" in payload and "ds_map" in payload: raise exceptions.RequestParameterInvalidException("Cannot specify both legacy ds_map and input attributes.") add_to_history = "no_add_to_history" not in payload legacy = payload.get("legacy", False) already_normalized = payload.get("parameters_normalized", False) raw_parameters = payload.get("parameters", {}) run_configs = [] unexpanded_param_map = _normalize_step_parameters( workflow.steps, raw_parameters, legacy=legacy, already_normalized=already_normalized ) unexpanded_inputs = payload.get("inputs", None) inputs_by = payload.get("inputs_by", None) # New default is to reference steps by index of workflow step # which is intrinsic to the workflow and independent of the state # of Galaxy at the time of workflow import. default_inputs_by = "step_index|step_uuid" inputs_by = inputs_by or default_inputs_by if unexpanded_inputs is None: # Default to legacy behavior - read ds_map and reference steps # by unencoded step id (a raw database id). unexpanded_inputs = payload.get("ds_map", {}) if legacy: default_inputs_by = "step_id|step_uuid" inputs_by = inputs_by or default_inputs_by else: unexpanded_inputs = unexpanded_inputs or {} expanded_params, expanded_param_keys, expanded_inputs = expand_workflow_inputs( unexpanded_param_map, unexpanded_inputs ) for index, (param_map, inputs) in enumerate(zip(expanded_params, expanded_inputs)): history = _get_target_history(trans, workflow, payload, expanded_param_keys, index) if inputs or not already_normalized: normalized_inputs = _normalize_inputs(workflow.steps, inputs, inputs_by) else: # Only allow dumping IDs directly into JSON database instead of properly recording the # inputs with referential integrity if parameters are already normalized (coming from tool form). normalized_inputs = {} if param_map: # disentangle raw parameter dictionaries into formal request structures if we can # to setup proper WorkflowRequestToInputDatasetAssociation, WorkflowRequestToInputDatasetCollectionAssociation # and WorkflowRequestInputStepParameter objects. for step in workflow.steps: normalized_key = step.id if step.type == "parameter_input": if normalized_key in param_map: value = param_map.pop(normalized_key) normalized_inputs[normalized_key] = value["input"] steps_by_id = workflow.steps_by_id # Set workflow inputs. for key, input_dict in normalized_inputs.items(): if input_dict is None: continue step = steps_by_id[key] if step.type == "parameter_input": continue if "src" not in input_dict: raise exceptions.RequestParameterInvalidException( f"Not input source type defined for input '{input_dict}'." ) if "id" not in input_dict: raise exceptions.RequestParameterInvalidException(f"Not input id defined for input '{input_dict}'.") if "content" in input_dict: raise exceptions.RequestParameterInvalidException( f"Input cannot specify explicit 'content' attribute {input_dict}'." ) input_source = input_dict["src"] input_id = input_dict["id"] try: if input_source == "ldda": ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, trans.security.decode_id(input_id)) assert ldda assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), ldda.dataset ) content = ldda.to_history_dataset_association(history, add_to_history=add_to_history) elif input_source == "ld": library_dataset = trans.sa_session.get(LibraryDataset, trans.security.decode_id(input_id)) assert library_dataset ldda = library_dataset.library_dataset_dataset_association assert ldda assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), ldda.dataset ) content = ldda.to_history_dataset_association(history, add_to_history=add_to_history) elif input_source == "hda": # Get dataset handle, add to dict and history if necessary content = trans.sa_session.get(HistoryDatasetAssociation, trans.security.decode_id(input_id)) assert trans.user_is_admin or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), content.dataset ) elif input_source == "hdca": content = app.dataset_collection_manager.get_dataset_collection_instance(trans, "history", input_id) else: raise exceptions.RequestParameterInvalidException( f"Unknown workflow input source '{input_source}' specified." ) if add_to_history and content.history != history: if isinstance(content, HistoryDatasetCollectionAssociation): content = content.copy(element_destination=history, flush=False) else: content = content.copy(flush=False) history.stage_addition(content) input_dict["content"] = content except AssertionError: raise exceptions.ItemAccessibilityException(f"Invalid workflow input '{input_id}' specified") for key in set(normalized_inputs.keys()): value = normalized_inputs[key] if isinstance(value, dict) and "content" in value: normalized_inputs[key] = value["content"] else: normalized_inputs[key] = value resource_params = payload.get("resource_params", {}) if resource_params: # quick attempt to validate parameters, just handle select options now since is what # is needed for DTD - arbitrary plugins can define arbitrary logic at runtime in the # destination function. In the future this should be extended to allow arbitrary # pluggable validation. resource_mapper_function = get_resource_mapper_function(trans.app) # TODO: Do we need to do anything with the stored_workflow or can this be removed. resource_parameters = resource_mapper_function(trans=trans, stored_workflow=None, workflow=workflow) for resource_parameter in resource_parameters: if resource_parameter.get("type") == "select": name = resource_parameter.get("name") if name in resource_params: value = resource_params[name] valid_option = False # TODO: How should be handle the case where no selection is made by the user # This can happen when there is a select on the page but the user has no options to select # Here I have the validation pass it through. An alternative may be to remove the parameter if # it is None. if value is None: valid_option = True else: for option_elem in resource_parameter.get("data"): option_value = option_elem.get("value") if value == option_value: valid_option = True if not valid_option: raise exceptions.RequestParameterInvalidException( f"Invalid value for parameter '{name}' found." ) history.add_pending_items() preferred_object_store_id = payload.get("preferred_object_store_id") preferred_outputs_object_store_id = payload.get("preferred_outputs_object_store_id") preferred_intermediate_object_store_id = payload.get("preferred_intermediate_object_store_id") if payload.get("effective_outputs"): raise exceptions.RequestParameterInvalidException( "Cannot declare effective outputs on invocation in this fashion." ) split_object_store_config = bool( preferred_outputs_object_store_id is not None or preferred_intermediate_object_store_id is not None ) if split_object_store_config and preferred_object_store_id: raise exceptions.RequestParameterInvalidException( "May specified either 'preferred_object_store_id' or one/both of 'preferred_outputs_object_store_id' and 'preferred_intermediate_object_store_id' but not both" ) run_configs.append( WorkflowRunConfig( target_history=history, replacement_dict=payload.get("replacement_params", {}), inputs=normalized_inputs, param_map=param_map, allow_tool_state_corrections=allow_tool_state_corrections, use_cached_job=use_cached_job, resource_params=resource_params, preferred_object_store_id=preferred_object_store_id, preferred_outputs_object_store_id=preferred_outputs_object_store_id, preferred_intermediate_object_store_id=preferred_intermediate_object_store_id, ) ) return run_configs
[docs]def workflow_run_config_to_request( trans: "GalaxyWebTransaction", run_config: WorkflowRunConfig, workflow: "Workflow" ) -> WorkflowInvocation: param_types = WorkflowRequestInputParameter.types workflow_invocation = WorkflowInvocation() workflow_invocation.uuid = uuid.uuid1() workflow_invocation.history = run_config.target_history ensure_object_added_to_session(workflow_invocation, object_in_session=run_config.target_history) def add_parameter(name: str, value: str, type: WorkflowRequestInputParameter.types) -> None: parameter = WorkflowRequestInputParameter( name=name, value=value, type=type, ) workflow_invocation.input_parameters.append(parameter) steps_by_id = {} for step in workflow.steps: steps_by_id[step.id] = step assert step.module serializable_runtime_state = step.module.encode_runtime_state(step, step.state) step_state = WorkflowRequestStepState() step_state.workflow_step = step log.info(f"Creating a step_state for step.id {step.id}") step_state.value = serializable_runtime_state workflow_invocation.step_states.append(step_state) if step.type == "subworkflow": subworkflow = step.subworkflow assert subworkflow effective_outputs: Optional[List[EffectiveOutput]] = None if run_config.preferred_intermediate_object_store_id or run_config.preferred_outputs_object_store_id: step_outputs = step.workflow_outputs effective_outputs = [] for step_output in step_outputs: subworkflow_output = subworkflow.workflow_output_for(step_output.output_name) if subworkflow_output is not None: output_dict = EffectiveOutput( output_name=subworkflow_output.output_name, step_id=subworkflow_output.workflow_step_id ) effective_outputs.append(output_dict) subworkflow_run_config = WorkflowRunConfig( target_history=run_config.target_history, replacement_dict=run_config.replacement_dict, copy_inputs_to_history=False, use_cached_job=run_config.use_cached_job, inputs={}, param_map=run_config.param_map.get(step.order_index), allow_tool_state_corrections=run_config.allow_tool_state_corrections, resource_params=run_config.resource_params, preferred_object_store_id=run_config.preferred_object_store_id, preferred_intermediate_object_store_id=run_config.preferred_intermediate_object_store_id, preferred_outputs_object_store_id=run_config.preferred_outputs_object_store_id, effective_outputs=effective_outputs, ) subworkflow_invocation = workflow_run_config_to_request( trans, subworkflow_run_config, subworkflow, ) workflow_invocation.attach_subworkflow_invocation_for_step( step, subworkflow_invocation, ) replacement_dict = run_config.replacement_dict for name, value in replacement_dict.items(): add_parameter( name=name, value=value, type=param_types.REPLACEMENT_PARAMETERS, ) for step_id, content in run_config.inputs.items(): workflow_invocation.add_input(content, step_id) for step_id, param_dict in run_config.param_map.items(): add_parameter( name=str(step_id), value=json.dumps(param_dict), type=param_types.STEP_PARAMETERS, ) resource_parameters = run_config.resource_params for key, value in resource_parameters.items(): add_parameter(str(key), value, param_types.RESOURCE_PARAMETERS) add_parameter( "copy_inputs_to_history", "true" if run_config.copy_inputs_to_history else "false", param_types.META_PARAMETERS ) add_parameter("use_cached_job", "true" if run_config.use_cached_job else "false", param_types.META_PARAMETERS) for param in [ "preferred_object_store_id", "preferred_outputs_object_store_id", "preferred_intermediate_object_store_id", ]: value = getattr(run_config, param) if value: add_parameter(param, value, param_types.META_PARAMETERS) if run_config.effective_outputs is not None: # empty list needs to come through here... add_parameter("effective_outputs", json.dumps(run_config.effective_outputs), param_types.META_PARAMETERS) return workflow_invocation
[docs]def workflow_request_to_run_config( workflow_invocation: WorkflowInvocation, use_cached_job: bool = False ) -> WorkflowRunConfig: param_types = WorkflowRequestInputParameter.types history = workflow_invocation.history replacement_dict = {} inputs = {} param_map = {} resource_params = {} copy_inputs_to_history = None # Preferred object store IDs - either split or join. preferred_object_store_id = None preferred_outputs_object_store_id = None preferred_intermediate_object_store_id = None effective_outputs = None for parameter in workflow_invocation.input_parameters: parameter_type = parameter.type if parameter_type == param_types.REPLACEMENT_PARAMETERS: replacement_dict[parameter.name] = parameter.value elif parameter_type == param_types.META_PARAMETERS: if parameter.name == "copy_inputs_to_history": copy_inputs_to_history = parameter.value == "true" if parameter.name == "use_cached_job": use_cached_job = parameter.value == "true" if parameter.name == "preferred_object_store_id": preferred_object_store_id = parameter.value if parameter.name == "preferred_outputs_object_store_id": preferred_outputs_object_store_id = parameter.value if parameter.name == "preferred_intermediate_object_store_id": preferred_intermediate_object_store_id = parameter.value if parameter.name == "effective_outputs": effective_outputs = json.loads(parameter.value) elif parameter_type == param_types.RESOURCE_PARAMETERS: resource_params[parameter.name] = parameter.value elif parameter_type == param_types.STEP_PARAMETERS: param_map[int(parameter.name)] = json.loads(parameter.value) for input_association in workflow_invocation.input_datasets: inputs[input_association.workflow_step_id] = input_association.dataset for input_association in workflow_invocation.input_dataset_collections: inputs[input_association.workflow_step_id] = input_association.dataset_collection for input_association in workflow_invocation.input_step_parameters: parameter_value = input_association.parameter_value inputs[input_association.workflow_step_id] = parameter_value step_label = input_association.workflow_step.label if step_label and step_label not in replacement_dict: replacement_dict[step_label] = str(parameter_value) if copy_inputs_to_history is None: raise exceptions.InconsistentDatabase( "Failed to find copy_inputs_to_history parameter loading workflow_invocation from database." ) workflow_run_config = WorkflowRunConfig( target_history=history, replacement_dict=replacement_dict, inputs=inputs, param_map=param_map, copy_inputs_to_history=copy_inputs_to_history, use_cached_job=use_cached_job, resource_params=resource_params, preferred_object_store_id=preferred_object_store_id, preferred_outputs_object_store_id=preferred_outputs_object_store_id, preferred_intermediate_object_store_id=preferred_intermediate_object_store_id, effective_outputs=effective_outputs, ) return workflow_run_config