Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.dataset_collections.structure

""" Module for reasoning about structure of and matching hierarchical collections of data.
"""

import logging

log = logging.getLogger(__name__)


[docs]class Leaf: children_known = True def __len__(self): return 1 @property def is_leaf(self): return True
[docs] def clone(self): return self
[docs] def multiply(self, other_structure): return other_structure.clone()
[docs] def sliced_collection_type(self, collection): return input
def __str__(self): return "Leaf[]"
leaf = Leaf()
[docs]class BaseTree:
[docs] def __init__(self, collection_type_description): self.collection_type_description = collection_type_description
[docs]class UninitializedTree(BaseTree): children_known = False
[docs] def clone(self): return self
@property def is_leaf(self): return False def __len__(self): raise Exception("Unknown length")
[docs] def multiply(self, other_structure): if other_structure.is_leaf: return self.clone() new_collection_type = self.collection_type_description.multiply(other_structure.collection_type_description) return UninitializedTree(new_collection_type)
def __str__(self): return f"UninitializedTree[collection_type={self.collection_type_description}]"
[docs]class Tree(BaseTree): children_known = True
[docs] def __init__(self, children, collection_type_description, when_values=None): super().__init__(collection_type_description) self.children = children self.when_values = when_values
[docs] @staticmethod def for_dataset_collection(dataset_collection, collection_type_description): children = [] for element in dataset_collection.elements: if collection_type_description.has_subcollections(): child_collection = element.child_collection subcollection_type_description = ( collection_type_description.subcollection_type_description() ) # Type description of children tree = Tree.for_dataset_collection( child_collection, collection_type_description=subcollection_type_description ) children.append((element.element_identifier, tree)) else: children.append((element.element_identifier, leaf)) return Tree(children, collection_type_description)
[docs] def walk_collections(self, hdca_dict): return self._walk_collections(dict_map(lambda hdca: hdca.collection, hdca_dict))
def _walk_collections(self, collection_dict): for index, (_identifier, substructure) in enumerate(self.children): def get_element(collection): return collection[index] # noqa: B023 when_value = None if self.when_values: if len(self.when_values) == 1: when_value = self.when_values[0] else: when_value = self.when_values[index] if substructure.is_leaf: yield dict_map(get_element, collection_dict), when_value else: sub_collections = dict_map(lambda collection: get_element(collection).child_collection, collection_dict) for element, _when_value in substructure._walk_collections(sub_collections): yield element, when_value @property def is_leaf(self): return False
[docs] def can_match(self, other_structure): if not self.collection_type_description.can_match_type(other_structure.collection_type_description): return False if len(self.children) != len(other_structure.children): return False for my_child, other_child in zip(self.children, other_structure.children): # At least one is nested collection... if my_child[1].is_leaf != other_child[1].is_leaf: return False if not my_child[1].is_leaf and not my_child[1].can_match(other_child[1]): return False return True
def __len__(self): return sum(len(c[1]) for c in self.children)
[docs] def multiply(self, other_structure): if other_structure.is_leaf: return self.clone() new_collection_type = self.collection_type_description.multiply(other_structure.collection_type_description) new_children = [] for identifier, structure in self.children: new_children.append((identifier, structure.multiply(other_structure))) return Tree(new_children, new_collection_type)
[docs] def clone(self): cloned_children = [(_[0], _[1].clone()) for _ in self.children] return Tree(cloned_children, self.collection_type_description)
def __str__(self): return f"Tree[collection_type={self.collection_type_description},children={','.join(map(lambda identifier_and_element: f'{identifier_and_element[0]}={identifier_and_element[1]}', self.children))}]"
[docs]def tool_output_to_structure(get_sliced_input_collection_structure, tool_output, collections_manager): if not tool_output.collection: tree = leaf else: collection_type_descriptions = collections_manager.collection_type_descriptions # Okay this is ToolCollectionOutputStructure not a Structure - different # concepts of structure. structured_like = tool_output.structure.structured_like collection_type = tool_output.structure.collection_type if structured_like: tree = get_sliced_input_collection_structure(structured_like) if collection_type and tree.collection_type_description.collection_type != collection_type: # See tool paired_collection_map_over_structured_like - type should # override structured_like if they disagree. tree = UninitializedTree(collection_type_descriptions.for_collection_type(collection_type)) else: # Can't pre-compute the structure in this case, see if we can find a collection type. if collection_type is None and tool_output.structure.collection_type_source: collection_type = get_sliced_input_collection_structure( tool_output.structure.collection_type_source ).collection_type_description.collection_type if not collection_type: raise Exception(f"Failed to determine collection type for mapping over output {tool_output.name}") tree = UninitializedTree(collection_type_descriptions.for_collection_type(collection_type)) if not tree.children_known and tree.collection_type_description.collection_type == "paired": # TODO: We don't need to return UninitializedTree for pairs I think, we should build # a paired tree for the known structure here. pass return tree
[docs]def dict_map(func, input_dict): return {k: func(v) for k, v in input_dict.items()}
[docs]def get_structure(dataset_collection_instance, collection_type_description, leaf_subcollection_type=None): if leaf_subcollection_type: collection_type_description = collection_type_description.effective_collection_type_description( leaf_subcollection_type ) if hasattr(dataset_collection_instance, "child_collection"): collection_type_description = ( collection_type_description.collection_type_description_factory.for_collection_type( leaf_subcollection_type ) ) return UninitializedTree(collection_type_description) collection = dataset_collection_instance.collection return Tree.for_dataset_collection(collection, collection_type_description)