Source code for galaxy.tools.parameters.meta
import copy
import itertools
import logging
from collections import namedtuple
from typing import (
from galaxy import (
from galaxy.model import HistoryDatasetCollectionAssociation
from galaxy.model.dataset_collections import (
from galaxy.util import permutations
from . import visit_input_values
log = logging.getLogger(__name__)
WorkflowParameterExpansion = namedtuple(
"WorkflowParameterExpansion", ["param_combinations", "param_keys", "input_combinations"]
[docs]def expand_workflow_inputs(param_inputs, inputs=None):
Expands incoming encoded multiple payloads, into the set of all individual payload combinations
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}})
>>> print(["%s" % (p['1']['input']['hid']) for p in expansion.param_combinations])
['1', '2']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}})
>>> print(["%s" % (p['1']['input']['hid']) for p in expansion.param_combinations])
['1', '2']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'values': [{'hid': '3'}, {'hid': '4'}] }}})
>>> print(["%s%s" % (p['1']['input']['hid'], p['2']['input']['hid']) for p in expansion.param_combinations])
['13', '24']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'values': [{'hid': '3'}, {'hid': '4'}, {'hid': '5'}] }}})
>>> print(["%s%s" % (p['1']['input']['hid'], p['2']['input']['hid']) for p in expansion.param_combinations])
['13', '23', '14', '24', '15', '25']
>>> expansion = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'product': True, 'values': [{'hid': '3'}, {'hid': '4'}, {'hid': '5'}] }}, '3': {'input': {'batch': True, 'product': True, 'values': [{'hid': '6'}, {'hid': '7'}, {'hid': '8'}] }}})
>>> print(["%s%s%s" % (p['1']['input']['hid'], p['2']['input']['hid'], p['3']['input']['hid']) for p in expansion.param_combinations])
['136', '137', '138', '146', '147', '148', '156', '157', '158', '236', '237', '238', '246', '247', '248', '256', '257', '258']
>>> expansion = expand_workflow_inputs(None, inputs={'myinput': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }})
>>> print(["%s" % (p['myinput']['hid']) for p in expansion.input_combinations])
['1', '2']
param_inputs = param_inputs or {}
inputs = inputs or {}
linked_n = None
linked = []
product = []
linked_keys = []
product_keys = []
def is_batch(value):
return (
isinstance(value, dict)
and "batch" in value
and value["batch"] is True
and "values" in value
and isinstance(value["values"], list)
for step_id, step in sorted(param_inputs.items()):
for key, value in sorted(step.items()):
if is_batch(value):
nval = len(value["values"])
if "product" in value and value["product"] is True:
product_keys.append(ParamKey(step_id, key))
if linked_n is None:
linked_n = nval
elif linked_n != nval or nval == 0:
raise exceptions.RequestParameterInvalidException(
"Failed to match linked batch selections. Please select equal number of data files."
linked_keys.append(ParamKey(step_id, key))
# Force it to a list to allow modification...
input_items = list(inputs.items())
for input_id, value in input_items:
if is_batch(value):
nval = len(value["values"])
if "product" in value and value["product"] is True:
if linked_n is None:
linked_n = nval
elif linked_n != nval or nval == 0:
raise exceptions.RequestParameterInvalidException(
"Failed to match linked batch selections. Please select equal number of data files."
elif isinstance(value, dict) and "batch" in value:
# remove batch wrapper and render simplified input form rest of workflow
# code expects
inputs[input_id] = value["values"][0]
param_combinations = []
input_combinations = []
params_keys = []
linked = linked or [[None]]
product = product or [[None]]
linked_keys = linked_keys or [None]
product_keys = product_keys or [None]
for linked_values, product_values in itertools.product(zip(*linked), itertools.product(*product)):
new_params = copy.deepcopy(param_inputs)
new_inputs = copy.deepcopy(inputs)
new_keys = []
for input_key, value in list(zip(linked_keys, linked_values)) + list(zip(product_keys, product_values)):
if input_key:
if isinstance(input_key, ParamKey):
step_id = input_key.step_id
key = input_key.key
assert step_id is not None
new_params[step_id][key] = value
if "hid" in value:
input_id = input_key.input_id
assert input_id is not None
new_inputs[input_id] = value
if "hid" in value:
return WorkflowParameterExpansion(param_combinations, params_keys, input_combinations)
[docs]def process_key(incoming_key, incoming_value, d):
key_parts = incoming_key.split("|")
if len(key_parts) == 1:
# Regular parameter
if incoming_key in d and not incoming_value:
# In case we get an empty repeat after we already filled in a repeat element
d[incoming_key] = incoming_value
elif key_parts[0].rsplit("_", 1)[-1].isdigit():
# Repeat
input_name, index = key_parts[0].rsplit("_", 1)
index = int(index)
d.setdefault(input_name, [])
newlist = [{} for _ in range(index + 1)]
d[input_name].extend(newlist[len(d[input_name]) :])
subdict = d[input_name][index]
process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)
# Section / Conditional
input_name = key_parts[0]
subdict = {}
d[input_name] = subdict
process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)
ExpandedT = Tuple[List[Dict[str, Any]], Optional[matching.MatchingCollections]]
[docs]def expand_meta_parameters(trans, tool, incoming) -> ExpandedT:
Take in a dictionary of raw incoming parameters and expand to a list
of expanded incoming parameters (one set of parameters per tool
for key in list(incoming.keys()):
if key.endswith("|__identifier__"):
# If we're going to multiply input dataset combinations
# order matters, so the following reorders incoming
# according to tool.inputs (which is ordered).
incoming_copy = incoming.copy()
nested_dict: Dict[str, Any] = {}
for incoming_key, incoming_value in incoming_copy.items():
if not incoming_key.startswith("__"):
process_key(incoming_key, incoming_value=incoming_value, d=nested_dict)
reordered_incoming = {}
def visitor(input, value, prefix, prefixed_name, prefixed_label, error, **kwargs):
if prefixed_name in incoming_copy:
reordered_incoming[prefixed_name] = incoming_copy[prefixed_name]
del incoming_copy[prefixed_name]
visit_input_values(inputs=tool.inputs, input_values=nested_dict, callback=visitor)
def classifier(input_key):
value = incoming[input_key]
if isinstance(value, dict) and "values" in value:
# Explicit meta wrapper for inputs...
is_batch = value.get("batch", False)
is_linked = value.get("linked", True)
if is_batch and is_linked:
classification = permutations.input_classification.MATCHED
elif is_batch:
classification = permutations.input_classification.MULTIPLIED
classification = permutations.input_classification.SINGLE
if __collection_multirun_parameter(value):
collection_value = value["values"][0]
values = __expand_collection_parameter(
trans, input_key, collection_value, collections_to_match, linked=is_linked
values = value["values"]
classification = permutations.input_classification.SINGLE
values = value
return classification, values
collections_to_match = matching.CollectionsToMatch()
# Stick an unexpanded version of multirun keys so they can be replaced,
# by expand_mult_inputs.
incoming_template = reordered_incoming
expanded_incomings = permutations.expand_multi_inputs(incoming_template, classifier)
if collections_to_match.has_collections():
collection_info = trans.app.dataset_collection_manager.match_collections(collections_to_match)
collection_info = None
return expanded_incomings, collection_info
def __expand_collection_parameter(trans, input_key, incoming_val, collections_to_match, linked=False):
# If subcollectin multirun of data_collection param - value will
# be "hdca_id|subcollection_type" else it will just be hdca_id
if "|" in incoming_val:
encoded_hdc_id, subcollection_type = incoming_val.split("|", 1)
src = incoming_val["src"]
if src != "hdca":
raise exceptions.ToolMetaParameterException(f"Invalid dataset collection source type {src}")
encoded_hdc_id = incoming_val["id"]
subcollection_type = incoming_val.get("map_over_type", None)
except TypeError:
encoded_hdc_id = incoming_val
subcollection_type = None
hdc_id = trans.app.security.decode_id(encoded_hdc_id)
hdc = trans.sa_session.get(HistoryDatasetCollectionAssociation, hdc_id)
collections_to_match.add(input_key, hdc, subcollection_type=subcollection_type, linked=linked)
if subcollection_type is not None:
subcollection_elements = subcollections.split_dataset_collection_instance(hdc, subcollection_type)
return subcollection_elements
hdas = []
for element in hdc.collection.dataset_elements:
hda = element.dataset_instance
hda.element_identifier = element.element_identifier
return hdas
def __collection_multirun_parameter(value):
is_batch = value.get("batch", False)
if not is_batch:
return False
batch_values = util.listify(value["values"])
if len(batch_values) == 1:
batch_over = batch_values[0]
if isinstance(batch_over, dict) and ("src" in batch_over) and (batch_over["src"] in {"hdca", "dce"}):
return True
return False