Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.parameters.dataset_matcher
from logging import getLogger
import galaxy.model
log = getLogger(__name__)
[docs]def set_dataset_matcher_factory(trans, tool):
trans.dataset_matcher_factory = DatasetMatcherFactory(trans, tool)
[docs]def get_dataset_matcher_factory(trans):
dataset_matcher_factory = getattr(trans, "dataset_matcher_factory", None)
return dataset_matcher_factory or DatasetMatcherFactory(trans)
class DatasetMatcherFactory:
""""""
def __init__(self, trans, tool=None):
self._trans = trans
self._tool = tool
self._data_inputs = []
self._matches_format_cache = {}
if tool:
valid_input_states = tool.valid_input_states
else:
valid_input_states = galaxy.model.Dataset.valid_input_states
self.valid_input_states = valid_input_states
can_process_summary = False
if tool is not None:
for input in tool.inputs.values():
self._collect_data_inputs(input)
require_public = self._tool and self._tool.tool_type == "data_destination"
if not require_public and self._data_inputs:
can_process_summary = True
for data_input in self._data_inputs:
if data_input.options:
can_process_summary = False
break
self._can_process_summary = can_process_summary
def matches_any_format(self, hda_extension, formats):
for format in formats:
if self.matches_format(hda_extension, format):
return True
return False
def matches_format(self, hda_extension, format):
# cache datatype checking combinations for fast recall
if hda_extension not in self._matches_format_cache:
self._matches_format_cache[hda_extension] = {}
formats = self._matches_format_cache[hda_extension]
if format not in formats:
datatype = galaxy.model.datatype_for_extension(
hda_extension, datatypes_registry=self._trans.app.datatypes_registry
)
formats[format] = datatype.matches_any([format])
return formats[format]
def _collect_data_inputs(self, input):
type_name = input.type
if type_name == "repeat" or type_name == "upload_dataset" or type_name == "section":
for child_input in input.inputs.values():
self._collect_data_inputs(child_input)
elif type_name == "conditional":
for case in input.cases:
for child_input in case.inputs.values():
self._collect_data_inputs(child_input)
elif type_name == "data" or type_name == "data_collection":
self._data_inputs.append(input)
def dataset_matcher(self, param, other_values):
return DatasetMatcher(self, self._trans, param, other_values)
def dataset_collection_matcher(self, dataset_matcher):
if self._can_process_summary:
return SummaryDatasetCollectionMatcher(self, self._trans, dataset_matcher)
else:
return DatasetCollectionMatcher(self._trans, dataset_matcher)
class DatasetMatcher:
"""Utility class to aid DataToolParameter and similar classes in reasoning
about what HDAs could match or are selected for a parameter and value.
Goal here is to both encapsulate and reuse logic related to filtering,
datatype matching, hiding errored dataset, finding implicit conversions,
and permission handling.
"""
def __init__(self, dataset_matcher_factory, trans, param, other_values):
self.dataset_matcher_factory = dataset_matcher_factory
self.trans = trans
self.param = param
self.tool = param.tool
filter_values = set()
if param.options and other_values:
try:
for v in param.options.get_options(trans, other_values):
filter_values.add(v[0])
except IndexError:
pass # no valid options
self.filter_values = filter_values
def valid_hda_match(self, hda, check_implicit_conversions=True):
"""Return False if this parameter can not be matched to the supplied
HDA, otherwise return a description of the match (either a
HdaDirectMatch describing a direct match or a HdaImplicitMatch
describing an implicit conversion.)
"""
rval = False
formats = self.param.formats
direct_match, target_ext, converted_dataset = hda.find_conversion_destination(formats)
if direct_match:
rval = HdaDirectMatch(hda)
else:
if not check_implicit_conversions:
return False
if target_ext:
original_hda = hda
if converted_dataset:
hda = converted_dataset
rval = HdaImplicitMatch(hda, target_ext, original_hda)
else:
return False
if self.filter(hda):
return False
return rval
def hda_match(self, hda, check_implicit_conversions=True, ensure_visible=True):
"""If HDA is accessible, return information about whether it could
match this parameter and if so how. See valid_hda_match for more
information.
"""
dataset = hda.dataset
valid_state = dataset.state in self.dataset_matcher_factory.valid_input_states
if valid_state and (not ensure_visible or hda.visible):
# If we are sending data to an external application, then we need to make sure there are no roles
# associated with the dataset that restrict its access from "public".
require_public = self.tool and self.tool.tool_type == "data_destination"
if require_public and not self.trans.app.security_agent.dataset_is_public(dataset):
return False
return self.valid_hda_match(hda, check_implicit_conversions=check_implicit_conversions)
def filter(self, hda):
"""Filter out this value based on other values for job (if
applicable).
"""
param = self.param
return param.options and param.get_options_filter_attribute(hda) not in self.filter_values
class HdaDirectMatch:
"""Supplied HDA was a valid option directly (did not need to find implicit
conversion).
"""
def __init__(self, hda):
self.hda = hda
@property
def implicit_conversion(self):
return False
class HdaImplicitMatch:
"""Supplied HDA was a valid option directly (did not need to find implicit
conversion).
"""
def __init__(self, hda, target_ext, original_hda):
self.original_hda = original_hda
self.hda = hda
self.target_ext = target_ext
@property
def implicit_conversion(self):
return True
class HdcaDirectMatch:
implicit_conversion = False
def __init__(self):
pass
class HdcaImplicitMatch:
implicit_conversion = True
def __init__(self):
pass
class SummaryDatasetCollectionMatcher:
def __init__(self, dataset_matcher_factory, trans, dataset_matcher):
self.dataset_matcher_factory = dataset_matcher_factory
self._trans = trans
self.dataset_matcher = dataset_matcher
def hdca_match(self, history_dataset_collection_association):
dataset_collection = history_dataset_collection_association.collection
if not dataset_collection.populated_optimized:
return False
(states, extensions) = dataset_collection.dataset_states_and_extensions_summary
for state in states:
if state not in self.dataset_matcher_factory.valid_input_states:
return False
formats = self.dataset_matcher.param.formats
uses_implicit_conversion = False
for extension in extensions:
datatypes_registry = self._trans.app.datatypes_registry
direct_match, converted_ext, _ = datatypes_registry.find_conversion_destination_for_dataset_by_extensions(
extension, formats
)
if direct_match:
continue
if not converted_ext:
return False
else:
uses_implicit_conversion = True
return HdcaImplicitMatch() if uses_implicit_conversion else HdcaDirectMatch()
class DatasetCollectionMatcher:
def __init__(self, trans, dataset_matcher):
self.dataset_matcher = dataset_matcher
self._trans = trans
def __valid_element(self, element):
# Simplify things for now and assume these are hdas and not implicit
# converts. One could imagine handling both of those cases down the
# road.
if element.ldda:
return False
if child_collection := element.child_collection:
return self.dataset_collection_match(child_collection)
hda = element.hda
if not hda:
return False
hda_match = self.dataset_matcher.hda_match(hda, ensure_visible=False)
return hda_match
def hdca_match(self, history_dataset_collection_association):
dataset_collection = history_dataset_collection_association.collection
return self.dataset_collection_match(dataset_collection)
def dataset_collection_match(self, dataset_collection):
# If dataset collection not yet populated, cannot determine if it
# would be a valid match for this parameter.
if not dataset_collection.populated_optimized:
return False
valid = True
uses_implicit_conversion = False
for element in dataset_collection.elements:
match_element = self.__valid_element(element)
if not match_element:
valid = False
break
elif match_element.implicit_conversion:
uses_implicit_conversion = True
return valid and (HdcaImplicitMatch() if uses_implicit_conversion else HdcaDirectMatch())
__all__ = ("get_dataset_matcher_factory", "set_dataset_matcher_factory", "unset_dataset_matcher_factory")