Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.parameters.dataset_matcher

from logging import getLogger

import galaxy.model

log = getLogger(__name__)


def set_dataset_matcher_factory(trans, tool):
    trans.dataset_matcher_factory = DatasetMatcherFactory(trans, tool)


def unset_dataset_matcher_factory(trans):
    trans.dataset_matcher_factory = None


def get_dataset_matcher_factory(trans):
    dataset_matcher_factory = getattr(trans, "dataset_matcher_factory", None)
    return dataset_matcher_factory or DatasetMatcherFactory(trans)


class DatasetMatcherFactory(object):
    """"""

    def __init__(self, trans, tool=None):
        self._trans = trans
        self._tool = tool
        self._data_inputs = []
        self._matches_format_cache = {}
        if tool:
            valid_input_states = tool.valid_input_states
        else:
            valid_input_states = galaxy.model.Dataset.valid_input_states
        self.valid_input_states = valid_input_states
        can_process_summary = False
        if tool is not None:
            for input in tool.inputs.values():
                self._collect_data_inputs(input)

            require_public = self._tool and self._tool.tool_type == 'data_destination'
            if not require_public and self._data_inputs:
                can_process_summary = True
                for data_input in self._data_inputs:
                    if data_input.options:
                        can_process_summary = False
                        break
        self._can_process_summary = can_process_summary

    def matches_any_format(self, hda_extension, formats):
        for format in formats:
            if self.matches_format(hda_extension, format):
                return True
        return False

    def matches_format(self, hda_extension, format):
        # cache datatype checking combinations for fast recall
        if hda_extension not in self._matches_format_cache:
            self._matches_format_cache[hda_extension] = {}

        formats = self._matches_format_cache[hda_extension]
        if format not in formats:
            datatype = galaxy.model.datatype_for_extension(hda_extension, datatypes_registry=self._trans.app.datatypes_registry)
            formats[format] = datatype.matches_any([format])

        return formats[format]

    def _collect_data_inputs(self, input):
        type_name = input.type
        if type_name == "repeat" or type_name == "upload_dataset" or type_name == "section":
            for child_input in input.inputs.values():
                self._collect_data_inputs(child_input)
        elif type_name == "conditional":
            for case in input.cases:
                for child_input in case.inputs.values():
                    self._collect_data_inputs(child_input)
        elif type_name == "data" or type_name == "data_collection":
            self._data_inputs.append(input)

    def dataset_matcher(self, param, other_values):
        return DatasetMatcher(self, self._trans, param, other_values)

    def dataset_collection_matcher(self, dataset_matcher):
        if self._can_process_summary:
            return SummaryDatasetCollectionMatcher(self, dataset_matcher)
        else:
            return DatasetCollectionMatcher(dataset_matcher)


[docs]class DatasetMatcher(object): """ Utility class to aid DataToolParameter and similar classes in reasoning about what HDAs could match or are selected for a parameter and value. Goal here is to both encapsulate and reuse logic related to filtering, datatype matching, hiding errored dataset, finding implicit conversions, and permission handling. """
[docs] def __init__(self, dataset_matcher_factory, trans, param, other_values): self.dataset_matcher_factory = dataset_matcher_factory self.trans = trans self.param = param self.tool = param.tool filter_value = None if param.options and other_values: try: filter_value = param.options.get_options(trans, other_values)[0][0] except IndexError: pass # no valid options self.filter_value = filter_value
[docs] def valid_hda_match(self, hda, check_implicit_conversions=True): """ Return False of this parameter can not be matched to the supplied HDA, otherwise return a description of the match (either a HdaDirectMatch describing a direct match or a HdaImplicitMatch describing an implicit conversion.) """ rval = False formats = self.param.formats if self.dataset_matcher_factory.matches_any_format(hda.extension, formats): rval = HdaDirectMatch(hda) else: if not check_implicit_conversions: return False target_ext, converted_dataset = hda.find_conversion_destination(formats) if target_ext: original_hda = hda if converted_dataset: hda = converted_dataset rval = HdaImplicitMatch(hda, target_ext, original_hda) else: return False if self.filter(hda): return False return rval
[docs] def hda_match(self, hda, check_implicit_conversions=True, ensure_visible=True): """ If HDA is accessible, return information about whether it could match this parameter and if so how. See valid_hda_match for more information. """ dataset = hda.dataset valid_state = dataset.state in self.dataset_matcher_factory.valid_input_states if valid_state and (not ensure_visible or hda.visible): # If we are sending data to an external application, then we need to make sure there are no roles # associated with the dataset that restrict its access from "public". require_public = self.tool and self.tool.tool_type == 'data_destination' if require_public and not self.trans.app.security_agent.dataset_is_public(dataset): return False return self.valid_hda_match(hda, check_implicit_conversions=check_implicit_conversions)
[docs] def filter(self, hda): """ Filter out this value based on other values for job (if applicable). """ param = self.param return param.options and param.get_options_filter_attribute(hda) != self.filter_value
class HdaDirectMatch(object): """ Supplied HDA was a valid option directly (did not need to find implicit conversion). """ def __init__(self, hda): self.hda = hda @property def implicit_conversion(self): return False class HdaImplicitMatch(object): """ Supplied HDA was a valid option directly (did not need to find implicit conversion). """ def __init__(self, hda, target_ext, original_hda): self.original_hda = original_hda self.hda = hda self.target_ext = target_ext @property def implicit_conversion(self): return True class SummaryDatasetCollectionMatcher(object): def __init__(self, dataset_matcher_factory, dataset_matcher): self.dataset_matcher_factory = dataset_matcher_factory self.dataset_matcher = dataset_matcher def hdca_match(self, history_dataset_collection_association, reduction=False): dataset_collection = history_dataset_collection_association.collection if reduction and dataset_collection.collection_type.find(":") > 0: return False if not dataset_collection.populated_optimized: return False (states, extensions) = dataset_collection.dataset_states_and_extensions_summary for state in states: if state not in self.dataset_matcher_factory.valid_input_states: return False formats = self.dataset_matcher.param.formats for extension in extensions: if not self.dataset_matcher_factory.matches_any_format(extension, formats): return False return True
[docs]class DatasetCollectionMatcher(object):
[docs] def __init__(self, dataset_matcher): self.dataset_matcher = dataset_matcher
def __valid_element(self, element): # Simplify things for now and assume these are hdas and not implicit # converts. One could imagine handling both of those cases down the # road. if element.ldda: return False child_collection = element.child_collection if child_collection: return self.dataset_collection_match(child_collection) hda = element.hda if not hda: return False hda_match = self.dataset_matcher.hda_match(hda, ensure_visible=False) return hda_match and not hda_match.implicit_conversion
[docs] def hdca_match(self, history_dataset_collection_association, reduction=False): dataset_collection = history_dataset_collection_association.collection if reduction and dataset_collection.collection_type.find(":") > 0: return False else: return self.dataset_collection_match(dataset_collection)
[docs] def dataset_collection_match(self, dataset_collection): # If dataset collection not yet populated, cannot determine if it # would be a valid match for this parameter. if not dataset_collection.populated: return False valid = True for element in dataset_collection.elements: if not self.__valid_element(element): valid = False break return valid
__all__ = ('DatasetMatcher', 'DatasetCollectionMatcher')