Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.parameters.wrapped
from collections import UserDict
from typing import (
Any,
Dict,
List,
Sequence,
Union,
)
from galaxy.tools.parameters.basic import (
DataCollectionToolParameter,
DataToolParameter,
SelectToolParameter,
)
from galaxy.tools.parameters.grouping import (
Conditional,
Repeat,
Section,
)
from galaxy.tools.wrappers import (
DatasetCollectionWrapper,
DatasetFilenameWrapper,
DatasetListWrapper,
ElementIdentifierMapper,
InputValueWrapper,
SelectToolParameterWrapper,
)
from galaxy.util.permutations import (
looks_like_flattened_repeat_key,
split_flattened_repeat_key,
)
PARAMS_UNWRAPPED = object()
[docs]class LegacyUnprefixedDict(UserDict):
"""Track and provide access to prefixed and unprefixed tool parameter values."""
# It used to be valid to access members of conditionals without specifying the conditional.
# This dict provides a fallback when dict lookup fails using those old rules
[docs] def __init__(self, dict=None, **kwargs):
self._legacy_mapping: Dict[str, str] = {}
super().__init__(dict, **kwargs)
[docs] def set_legacy_alias(self, new_key: str, old_key: str):
self._legacy_mapping[old_key] = new_key
def __getitem__(self, key):
if key not in self.data and key in self._legacy_mapping:
return super().__getitem__(self._legacy_mapping[key])
return super().__getitem__(key)
def __contains__(self, key: object) -> bool:
if super().__contains__(key):
return True
return key in self._legacy_mapping
def copy_identifiers(source, destination):
if isinstance(source, dict):
for k, v in source.items():
if k.endswith("|__identifier__"):
if isinstance(destination, dict):
destination[k] = v
[docs]class WrappedParameters:
[docs] def __init__(self, trans, tool, incoming, input_datasets=None):
self.trans = trans
self.tool = tool
self.incoming = incoming
self._params = PARAMS_UNWRAPPED
self._input_datasets = input_datasets
@property
def params(self):
if self._params is PARAMS_UNWRAPPED:
params = make_dict_copy(self.incoming)
self.wrap_values(self.tool.inputs, params, skip_missing_values=not self.tool.check_values)
self._params = params
return self._params
[docs] def wrap_values(self, inputs, input_values, skip_missing_values=False):
trans = self.trans
tool = self.tool
incoming = self.incoming
element_identifier_mapper = ElementIdentifierMapper(self._input_datasets)
# Wrap tool inputs as necessary
for input in inputs.values():
if input.name not in input_values and skip_missing_values:
continue
value = input_values[input.name]
copy_identifiers(destination=value, source=input_values)
if isinstance(input, Repeat):
for d in value:
copy_identifiers(destination=d, source=value)
self.wrap_values(input.inputs, d, skip_missing_values=skip_missing_values)
elif isinstance(input, Conditional):
values = value
current = values["__current_case__"]
self.wrap_values(input.cases[current].inputs, values, skip_missing_values=skip_missing_values)
elif isinstance(input, Section):
values = value
self.wrap_values(input.inputs, values, skip_missing_values=skip_missing_values)
elif isinstance(input, DataToolParameter) and input.multiple:
dataset_instances = DatasetListWrapper.to_dataset_instances(value)
input_values[input.name] = DatasetListWrapper(
None,
dataset_instances,
datatypes_registry=trans.app.datatypes_registry,
tool=tool,
name=input.name,
formats=input.formats,
)
elif isinstance(input, DataToolParameter):
wrapper_kwds = dict(
datatypes_registry=trans.app.datatypes_registry, tool=tool, name=input.name, formats=input.formats
)
element_identifier = element_identifier_mapper.identifier(value, input_values)
if element_identifier:
wrapper_kwds["identifier"] = element_identifier
input_values[input.name] = DatasetFilenameWrapper(value, **wrapper_kwds)
elif isinstance(input, SelectToolParameter):
input_values[input.name] = SelectToolParameterWrapper(input, value, other_values=incoming)
elif isinstance(input, DataCollectionToolParameter):
input_values[input.name] = DatasetCollectionWrapper(
None,
value,
datatypes_registry=trans.app.datatypes_registry,
tool=tool,
name=input.name,
)
else:
input_values[input.name] = InputValueWrapper(input, value, incoming, tool.profile)
[docs]def make_dict_copy(from_dict):
"""
Makes a copy of input dictionary from_dict such that all values that are dictionaries
result in creation of a new dictionary ( a sort of deepcopy ). We may need to handle
other complex types ( e.g., lists, etc ), but not sure...
Yes, we need to handle lists (and now are)...
"""
copy_from_dict = {}
for key, value in from_dict.items():
if type(value).__name__ == "dict":
copy_from_dict[key] = make_dict_copy(value)
elif isinstance(value, list):
copy_from_dict[key] = make_list_copy(value)
else:
copy_from_dict[key] = value
return copy_from_dict
def make_list_copy(from_list):
new_list = []
for value in from_list:
if isinstance(value, dict):
new_list.append(make_dict_copy(value))
elif isinstance(value, list):
new_list.append(make_list_copy(value))
else:
new_list.append(value)
return new_list
[docs]def process_key(incoming_key: str, incoming_value: Any, d: Dict[str, Any]):
key_parts = incoming_key.split("|")
if len(key_parts) == 1:
# Regular parameter
if incoming_key in d and not incoming_value:
# In case we get an empty repeat after we already filled in a repeat element
return
d[incoming_key] = incoming_value
elif looks_like_flattened_repeat_key(key_parts[0]):
# Repeat
input_name, index = split_flattened_repeat_key(key_parts[0])
d.setdefault(input_name, [])
newlist: List[Dict[Any, Any]] = [{} for _ in range(index + 1)]
d[input_name].extend(newlist[len(d[input_name]) :])
subdict = d[input_name][index]
process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)
else:
# Section / Conditional
input_name = key_parts[0]
subdict = d.get(input_name, {})
d[input_name] = subdict
process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)
[docs]def nested_key_to_path(key: str) -> Sequence[Union[str, int]]:
"""
Convert a tool state key that is separated with '|' and '_n' into path iterable.
E.g. "cond|repeat_0|paramA" -> ["cond", "repeat", 0, "paramA"].
Return value can be used with `boltons.iterutils.get_path`.
"""
path: List[Union[str, int]] = []
key_parts = key.split("|")
if len(key_parts) == 1:
return key_parts
for key_part in key_parts:
if "_" in key_part:
input_name, _index = key_part.rsplit("_", 1)
if _index.isdigit():
path.extend((input_name, int(_index)))
continue
path.append(key_part)
return path
[docs]def flat_to_nested_state(incoming: Dict[str, Any]):
nested_state: Dict[str, Any] = {}
for key, value in incoming.items():
process_key(key, value, nested_state)
return nested_state
__all__ = (
"LegacyUnprefixedDict",
"WrappedParameters",
"make_dict_copy",
"process_key",
"flat_to_nested_state",
"nested_key_to_path",
)