Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.parameters.meta
import copy
import itertools
import logging
from collections import namedtuple
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from galaxy import (
exceptions,
util,
)
from galaxy.model import HistoryDatasetCollectionAssociation
from galaxy.model.dataset_collections import (
matching,
subcollections,
)
from galaxy.util.permutations import (
build_combos,
input_classification,
is_in_state,
state_copy,
state_get_value,
state_remove_value,
state_set_value,
)
from . import visit_input_values
from .wrapped import process_key
from .._types import (
InputFormatT,
ToolRequestT,
ToolStateJobInstanceT,
)
log = logging.getLogger(__name__)
WorkflowParameterExpansion = namedtuple(
"WorkflowParameterExpansion", ["param_combinations", "param_keys", "input_combinations"]
)
[docs]def expand_workflow_inputs(param_inputs, inputs=None):
"""
Expands incoming encoded multiple payloads, into the set of all individual payload combinations
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}})
>>> print(["%s" % (p['1']['input']['hid']) for p in expansion.param_combinations])
['1', '2']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}})
>>> print(["%s" % (p['1']['input']['hid']) for p in expansion.param_combinations])
['1', '2']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'values': [{'hid': '3'}, {'hid': '4'}] }}})
>>> print(["%s%s" % (p['1']['input']['hid'], p['2']['input']['hid']) for p in expansion.param_combinations])
['13', '24']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'values': [{'hid': '3'}, {'hid': '4'}, {'hid': '5'}] }}})
>>> print(["%s%s" % (p['1']['input']['hid'], p['2']['input']['hid']) for p in expansion.param_combinations])
['13', '23', '14', '24', '15', '25']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'product': True, 'values': [{'hid': '3'}, {'hid': '4'}, {'hid': '5'}] }}, '3': {'input': {'batch': True, 'product': True, 'values': [{'hid': '6'}, {'hid': '7'}, {'hid': '8'}] }}})
>>> print(["%s%s%s" % (p['1']['input']['hid'], p['2']['input']['hid'], p['3']['input']['hid']) for p in expansion.param_combinations])
['136', '137', '138', '146', '147', '148', '156', '157', '158', '236', '237', '238', '246', '247', '248', '256', '257', '258']
>>> expansion = expand_workflow_inputs(None, inputs={'myinput': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }})
>>> print(["%s" % (p['myinput']['hid']) for p in expansion.input_combinations])
['1', '2']
"""
param_inputs = param_inputs or {}
inputs = inputs or {}
linked_n = None
linked = []
product = []
linked_keys = []
product_keys = []
def is_batch(value):
return (
isinstance(value, dict)
and "batch" in value
and value["batch"] is True
and "values" in value
and isinstance(value["values"], list)
)
for step_id, step in sorted(param_inputs.items()):
for key, value in sorted(step.items()):
if is_batch(value):
nval = len(value["values"])
if "product" in value and value["product"] is True:
product.append(value["values"])
product_keys.append(ParamKey(step_id, key))
else:
if linked_n is None:
linked_n = nval
elif linked_n != nval or nval == 0:
raise exceptions.RequestParameterInvalidException(
"Failed to match linked batch selections. Please select equal number of data files."
)
linked.append(value["values"])
linked_keys.append(ParamKey(step_id, key))
# Force it to a list to allow modification...
input_items = list(inputs.items())
for input_id, value in input_items:
if is_batch(value):
nval = len(value["values"])
if "product" in value and value["product"] is True:
product.append(value["values"])
product_keys.append(InputKey(input_id))
else:
if linked_n is None:
linked_n = nval
elif linked_n != nval or nval == 0:
raise exceptions.RequestParameterInvalidException(
"Failed to match linked batch selections. Please select equal number of data files."
)
linked.append(value["values"])
linked_keys.append(InputKey(input_id))
elif isinstance(value, dict) and "batch" in value:
# remove batch wrapper and render simplified input form rest of workflow
# code expects
inputs[input_id] = value["values"][0]
param_combinations = []
input_combinations = []
params_keys = []
linked = linked or [[None]]
product = product or [[None]]
linked_keys = linked_keys or [None]
product_keys = product_keys or [None]
for linked_values, product_values in itertools.product(zip(*linked), itertools.product(*product)):
new_params = copy.deepcopy(param_inputs)
new_inputs = copy.deepcopy(inputs)
new_keys = []
for input_key, value in list(zip(linked_keys, linked_values)) + list(zip(product_keys, product_values)):
if input_key:
if isinstance(input_key, ParamKey):
step_id = input_key.step_id
key = input_key.key
assert step_id is not None
new_params[step_id][key] = value
if "hid" in value:
new_keys.append(str(value["hid"]))
else:
input_id = input_key.input_id
assert input_id is not None
new_inputs[input_id] = value
if "hid" in value:
new_keys.append(str(value["hid"]))
params_keys.append(new_keys)
param_combinations.append(new_params)
input_combinations.append(new_inputs)
return WorkflowParameterExpansion(param_combinations, params_keys, input_combinations)
ExpandedT = Tuple[List[ToolStateJobInstanceT], Optional[matching.MatchingCollections]]
[docs]def expand_flat_parameters_to_nested(incoming_copy: ToolRequestT) -> Dict[str, Any]:
nested_dict: Dict[str, Any] = {}
for incoming_key, incoming_value in incoming_copy.items():
if not incoming_key.startswith("__"):
process_key(incoming_key, incoming_value=incoming_value, d=nested_dict)
return nested_dict
[docs]def expand_meta_parameters(trans, tool, incoming: ToolRequestT, input_format: InputFormatT) -> ExpandedT:
"""
Take in a dictionary of raw incoming parameters and expand to a list
of expanded incoming parameters (one set of parameters per tool
execution).
"""
for key in list(incoming.keys()):
if key.endswith("|__identifier__"):
incoming.pop(key)
# If we're going to multiply input dataset combinations
# order matters, so the following reorders incoming
# according to tool.inputs (which is ordered).
incoming_copy = incoming.copy()
if input_format == "legacy":
nested_dict = expand_flat_parameters_to_nested(incoming_copy)
else:
nested_dict = incoming_copy
collections_to_match = matching.CollectionsToMatch()
def classifier_from_value(value, input_key):
if isinstance(value, dict) and "values" in value:
# Explicit meta wrapper for inputs...
is_batch = value.get("batch", False)
is_linked = value.get("linked", True)
if is_batch and is_linked:
classification = input_classification.MATCHED
elif is_batch:
classification = input_classification.MULTIPLIED
else:
classification = input_classification.SINGLE
if __collection_multirun_parameter(value):
collection_value = value["values"][0]
values = __expand_collection_parameter(
trans, input_key, collection_value, collections_to_match, linked=is_linked
)
else:
values = value["values"]
else:
classification = input_classification.SINGLE
values = value
return classification, values
nested = input_format != "legacy"
if not nested:
reordered_incoming = reorder_parameters(tool, incoming_copy, nested_dict, nested)
incoming_template = reordered_incoming
def classifier_flat(input_key):
return classifier_from_value(incoming[input_key], input_key)
single_inputs, matched_multi_inputs, multiplied_multi_inputs = split_inputs_flat(
incoming_template, classifier_flat
)
else:
reordered_incoming = reorder_parameters(tool, incoming_copy, nested_dict, nested)
incoming_template = reordered_incoming
single_inputs, matched_multi_inputs, multiplied_multi_inputs = split_inputs_nested(
tool.inputs, incoming_template, classifier_from_value
)
expanded_incomings = build_combos(single_inputs, matched_multi_inputs, multiplied_multi_inputs, nested=nested)
if collections_to_match.has_collections():
collection_info = trans.app.dataset_collection_manager.match_collections(collections_to_match)
else:
collection_info = None
return expanded_incomings, collection_info
[docs]def reorder_parameters(tool, incoming, nested_dict, nested):
# If we're going to multiply input dataset combinations
# order matters, so the following reorders incoming
# according to tool.inputs (which is ordered).
incoming_copy = state_copy(incoming, nested)
reordered_incoming = {}
def visitor(input, value, prefix, prefixed_name, prefixed_label, error, **kwargs):
if is_in_state(incoming_copy, prefixed_name, nested):
value_to_copy_over = state_get_value(incoming_copy, prefixed_name, nested)
state_set_value(reordered_incoming, prefixed_name, value_to_copy_over, nested)
state_remove_value(incoming_copy, prefixed_name, nested)
visit_input_values(inputs=tool.inputs, input_values=nested_dict, callback=visitor)
def merge_into(from_object, into_object):
if isinstance(from_object, dict):
for key, value in from_object.items():
if key not in into_object:
into_object[key] = value
else:
into_target = into_object[key]
merge_into(value, into_target)
elif isinstance(from_object, list):
for index in from_object:
if len(into_object) <= index:
into_object.append(from_object[index])
else:
merge_into(from_object[index], into_object[index])
merge_into(incoming_copy, reordered_incoming)
return reordered_incoming
[docs]def split_inputs_flat(inputs: Dict[str, Any], classifier):
single_inputs: Dict[str, Any] = {}
matched_multi_inputs: Dict[str, Any] = {}
multiplied_multi_inputs: Dict[str, Any] = {}
for input_key in inputs:
input_type, expanded_val = classifier(input_key)
if input_type == input_classification.SINGLE:
single_inputs[input_key] = expanded_val
elif input_type == input_classification.MATCHED:
matched_multi_inputs[input_key] = expanded_val
elif input_type == input_classification.MULTIPLIED:
multiplied_multi_inputs[input_key] = expanded_val
return (single_inputs, matched_multi_inputs, multiplied_multi_inputs)
[docs]def split_inputs_nested(inputs, nested_dict, classifier):
single_inputs: Dict[str, Any] = {}
matched_multi_inputs: Dict[str, Any] = {}
multiplied_multi_inputs: Dict[str, Any] = {}
unset_value = object()
def visitor(input, value, prefix, prefixed_name, prefixed_label, error, **kwargs):
if value is unset_value:
# don't want to inject extra nulls into state
return
input_type, expanded_val = classifier(value, prefixed_name)
if input_type == input_classification.SINGLE:
single_inputs[prefixed_name] = expanded_val
elif input_type == input_classification.MATCHED:
matched_multi_inputs[prefixed_name] = expanded_val
elif input_type == input_classification.MULTIPLIED:
multiplied_multi_inputs[prefixed_name] = expanded_val
visit_input_values(
inputs=inputs, input_values=nested_dict, callback=visitor, allow_case_inference=True, unset_value=unset_value
)
single_inputs_nested = expand_flat_parameters_to_nested(single_inputs)
return (single_inputs_nested, matched_multi_inputs, multiplied_multi_inputs)
def __expand_collection_parameter(trans, input_key, incoming_val, collections_to_match, linked=False):
# If subcollectin multirun of data_collection param - value will
# be "hdca_id|subcollection_type" else it will just be hdca_id
if "|" in incoming_val:
encoded_hdc_id, subcollection_type = incoming_val.split("|", 1)
else:
try:
src = incoming_val["src"]
if src != "hdca":
raise exceptions.ToolMetaParameterException(f"Invalid dataset collection source type {src}")
encoded_hdc_id = incoming_val["id"]
subcollection_type = incoming_val.get("map_over_type", None)
except TypeError:
encoded_hdc_id = incoming_val
subcollection_type = None
hdc_id = trans.app.security.decode_id(encoded_hdc_id)
hdc = trans.sa_session.get(HistoryDatasetCollectionAssociation, hdc_id)
if not hdc.collection.populated_optimized:
raise exceptions.ToolInputsNotReadyException("An input collection is not populated.")
collections_to_match.add(input_key, hdc, subcollection_type=subcollection_type, linked=linked)
if subcollection_type is not None:
subcollection_elements = subcollections.split_dataset_collection_instance(hdc, subcollection_type)
return subcollection_elements
else:
hdas = []
for element in hdc.collection.dataset_elements:
hda = element.dataset_instance
hda.element_identifier = element.element_identifier
hdas.append(hda)
return hdas
def __collection_multirun_parameter(value):
is_batch = value.get("batch", False)
if not is_batch:
return False
batch_values = util.listify(value["values"])
if len(batch_values) == 1:
batch_over = batch_values[0]
if isinstance(batch_over, dict) and ("src" in batch_over) and (batch_over["src"] in {"hdca", "dce"}):
return True
return False