Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.parameters.dynamic_options

"""
Support for generating the options for a SelectToolParameter dynamically (based
on the values of other parameters or other aspects of the current state)
"""
import logging
import os

from six import StringIO

import galaxy.tools
from galaxy.model import (
    HistoryDatasetAssociation,
    HistoryDatasetCollectionAssociation,
    User
)
from galaxy.util import string_as_bool
from . import validation

log = logging.getLogger(__name__)


[docs]class Filter(object): """ A filter takes the current options list and modifies it. """
[docs] @classmethod def from_element(cls, d_option, elem): """Loads the proper filter by the type attribute of elem""" type = elem.get('type', None) assert type is not None, "Required 'type' attribute missing from filter" return filter_types[type.strip()](d_option, elem)
[docs] def __init__(self, d_option, elem): self.dynamic_option = d_option self.elem = elem
[docs] def get_dependency_name(self): """Returns the name of any dependencies, otherwise None""" return None
[docs] def filter_options(self, options, trans, other_values): """Returns a list of options after the filter is applied""" raise TypeError("Abstract Method")
[docs]class StaticValueFilter(Filter): """ Filters a list of options on a column by a static value. Type: static_value Required Attributes: value: static value to compare to column: column in options to compare with Optional Attributes: keep: Keep columns matching value (True) Discard columns matching value (False) """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.value = elem.get("value", None) assert self.value is not None, "Required 'value' attribute missing from filter" column = elem.get("column", None) assert column is not None, "Required 'column' attribute missing from filter, when loading from file" self.column = d_option.column_spec_to_index(column) self.keep = string_as_bool(elem.get("keep", 'True'))
[docs] def filter_options(self, options, trans, other_values): rval = [] filter_value = self.value try: filter_value = User.expand_user_properties(trans.user, filter_value) except Exception: pass for fields in options: if (self.keep and fields[self.column] == filter_value) or (not self.keep and fields[self.column] != filter_value): rval.append(fields) return rval
[docs]class DataMetaFilter(Filter): """ Filters a list of options on a column by a dataset metadata value. Type: data_meta When no 'from' source has been specified in the <options> tag, this will populate the options list with (meta_value, meta_value, False). Otherwise, options which do not match the metadata value in the column are discarded. Required Attributes: - ref: Name of input dataset - key: Metadata key to use for comparison - column: column in options to compare with (not required when not associated with input options) Optional Attributes: - multiple: Option values are multiple, split column by separator (True) - separator: When multiple split by this (,) """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.ref_name = elem.get("ref", None) assert self.ref_name is not None, "Required 'ref' attribute missing from filter" d_option.has_dataset_dependencies = True self.key = elem.get("key", None) assert self.key is not None, "Required 'key' attribute missing from filter" self.column = elem.get("column", None) if self.column is None: assert self.dynamic_option.file_fields is None and self.dynamic_option.dataset_ref_name is None, "Required 'column' attribute missing from filter, when loading from file" else: self.column = d_option.column_spec_to_index(self.column) self.multiple = string_as_bool(elem.get("multiple", "False")) self.separator = elem.get("separator", ",")
[docs] def get_dependency_name(self): return self.ref_name
[docs] def filter_options(self, options, trans, other_values): def compare_meta_value(file_value, dataset_value): if isinstance(dataset_value, list): if self.multiple: file_value = file_value.split(self.separator) for value in dataset_value: if value not in file_value: return False return True return file_value in dataset_value if self.multiple: return dataset_value in file_value.split(self.separator) return file_value == dataset_value ref = other_values.get(self.ref_name, None) if isinstance(ref, HistoryDatasetCollectionAssociation): ref = ref.to_hda_representative(self.multiple) is_data = isinstance(ref, galaxy.tools.wrappers.DatasetFilenameWrapper) is_data_list = isinstance(ref, galaxy.tools.wrappers.DatasetListWrapper) or isinstance(ref, list) is_data_or_data_list = is_data or is_data_list if not isinstance(ref, HistoryDatasetAssociation) and not is_data_or_data_list: return [] # not a valid dataset if is_data_list: meta_value = None for single_ref in ref: this_meta_value = single_ref.metadata.get(self.key, None) if this_meta_value == meta_value: continue elif meta_value is None: meta_value = this_meta_value else: # Different values with mismatching metadata, return [] return [] else: meta_value = ref.metadata.get(self.key, None) if meta_value is None: return [(disp_name, optval, selected) for disp_name, optval, selected in options] if self.column is not None: rval = [] for fields in options: if compare_meta_value(fields[self.column], meta_value): rval.append(fields) return rval else: if not self.dynamic_option.columns: self.dynamic_option.columns = { "name" : 0, "value" : 1, "selected" : 2 } self.dynamic_option.largest_index = 2 if not isinstance(meta_value, list): meta_value = [meta_value] for value in meta_value: options.append((value, value, False)) return options
[docs]class ParamValueFilter(Filter): """ Filters a list of options on a column by the value of another input. Type: param_value Required Attributes: - ref: Name of input value - column: column in options to compare with Optional Attributes: - keep: Keep columns matching value (True) Discard columns matching value (False) - ref_attribute: Period (.) separated attribute chain of input (ref) to use as value for filter """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.ref_name = elem.get("ref", None) assert self.ref_name is not None, "Required 'ref' attribute missing from filter" column = elem.get("column", None) assert column is not None, "Required 'column' attribute missing from filter" self.column = d_option.column_spec_to_index(column) self.keep = string_as_bool(elem.get("keep", 'True')) self.ref_attribute = elem.get("ref_attribute", None) if self.ref_attribute: self.ref_attribute = self.ref_attribute.split('.') else: self.ref_attribute = []
[docs] def get_dependency_name(self): return self.ref_name
[docs] def filter_options(self, options, trans, other_values): if trans is not None and trans.workflow_building_mode: return [] ref = other_values.get(self.ref_name, None) for ref_attribute in self.ref_attribute: if not hasattr(ref, ref_attribute): return [] # ref does not have attribute, so we cannot filter, return empty list ref = getattr(ref, ref_attribute) ref = str(ref) rval = [] for fields in options: if (self.keep and fields[self.column] == ref) or (not self.keep and fields[self.column] != ref): rval.append(fields) return rval
[docs]class UniqueValueFilter(Filter): """ Filters a list of options to be unique by a column value. Type: unique_value Required Attributes: column: column in options to compare with """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) column = elem.get("column", None) assert column is not None, "Required 'column' attribute missing from filter" self.column = d_option.column_spec_to_index(column)
[docs] def get_dependency_name(self): return self.dynamic_option.dataset_ref_name
[docs] def filter_options(self, options, trans, other_values): rval = [] skip_list = [] for fields in options: if fields[self.column] not in skip_list: rval.append(fields) skip_list.append(fields[self.column]) return rval
[docs]class MultipleSplitterFilter(Filter): """ Turns a single line of options into multiple lines, by splitting a column and creating a line for each item. Type: multiple_splitter Required Attributes: column: column in options to compare with Optional Attributes: separator: Split column by this (,) """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.separator = elem.get("separator", ",") columns = elem.get("column", None) assert columns is not None, "Required 'column' attribute missing from filter" self.columns = [d_option.column_spec_to_index(column) for column in columns.split(",")]
[docs] def filter_options(self, options, trans, other_values): rval = [] for fields in options: for column in self.columns: for field in fields[column].split(self.separator): rval.append(fields[0:column] + [field] + fields[column + 1:]) return rval
[docs]class AttributeValueSplitterFilter(Filter): """ Filters a list of attribute-value pairs to be unique attribute names. Type: attribute_value_splitter Required Attributes: column: column in options to compare with Optional Attributes: pair_separator: Split column by this (,) name_val_separator: Split name-value pair by this ( whitespace ) """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.pair_separator = elem.get("pair_separator", ",") self.name_val_separator = elem.get("name_val_separator", None) columns = elem.get("column", None) assert columns is not None, "Required 'column' attribute missing from filter" self.columns = [d_option.column_spec_to_index(column) for column in columns.split(",")]
[docs] def filter_options(self, options, trans, other_values): attr_names = [] rval = [] for fields in options: for column in self.columns: for pair in fields[column].split(self.pair_separator): ary = pair.split(self.name_val_separator) if len(ary) == 2: name = ary[0] if name not in attr_names: rval.append(fields[0:column] + [name] + fields[column:]) attr_names.append(name) return rval
[docs]class AdditionalValueFilter(Filter): """ Adds a single static value to an options list. Type: add_value Required Attributes: value: value to appear in select list Optional Attributes: name: Display name to appear in select list (value) index: Index of option list to add value (APPEND) """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.value = elem.get("value", None) assert self.value is not None, "Required 'value' attribute missing from filter" self.name = elem.get("name", None) if self.name is None: self.name = self.value self.index = elem.get("index", None) if self.index is not None: self.index = int(self.index)
[docs] def filter_options(self, options, trans, other_values): rval = list(options) add_value = [] for _ in range(self.dynamic_option.largest_index + 1): add_value.append("") value_col = self.dynamic_option.columns.get('value', 0) name_col = self.dynamic_option.columns.get('name', value_col) # Set name first, then value, in case they are the same column add_value[name_col] = self.name add_value[value_col] = self.value if self.index is not None: rval.insert(self.index, add_value) else: rval.append(add_value) return rval
[docs]class RemoveValueFilter(Filter): """ Removes a value from an options list. Type: remove_value Required Attributes:: value: value to remove from select list or ref: param to refer to or meta_ref: dataset to refer to key: metadata key to compare to """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) self.value = elem.get("value", None) self.ref_name = elem.get("ref", None) self.meta_ref = elem.get("meta_ref", None) self.metadata_key = elem.get("key", None) assert self.value is not None or self.ref_name is not None or (self.meta_ref is not None and self.metadata_key is not None), ValueError("Required 'value', or 'ref', or 'meta_ref' and 'key' attributes missing from filter") self.multiple = string_as_bool(elem.get("multiple", "False")) self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, trans, other_values): if trans is not None and trans.workflow_building_mode: return options def compare_value(option_value, filter_value): if isinstance(filter_value, list): if self.multiple: option_value = option_value.split(self.separator) for value in filter_value: if value not in option_value: return False return True return option_value in filter_value if self.multiple: return filter_value in option_value.split(self.separator) return option_value == filter_value value = self.value if value is None: if self.ref_name is not None: value = other_values.get(self.ref_name) else: data_ref = other_values.get(self.meta_ref) if isinstance(data_ref, HistoryDatasetCollectionAssociation): data_ref = data_ref.to_hda_representative() if not isinstance(data_ref, HistoryDatasetAssociation) and not isinstance(data_ref, galaxy.tools.wrappers.DatasetFilenameWrapper): return options # cannot modify options value = data_ref.metadata.get(self.metadata_key, None) # Default to the second column (i.e. 1) since this used to work only on options produced by the data_meta filter value_col = self.dynamic_option.columns.get('value', 1) return [option for option in options if not compare_value(option[value_col], value)]
[docs]class SortByColumnFilter(Filter): """ Sorts an options list by a column Type: sort_by Required Attributes: column: column to sort by """
[docs] def __init__(self, d_option, elem): Filter.__init__(self, d_option, elem) column = elem.get("column", None) assert column is not None, "Required 'column' attribute missing from filter" self.column = d_option.column_spec_to_index(column)
[docs] def filter_options(self, options, trans, other_values): rval = [] for fields in options: for j in range(0, len(rval)): if fields[self.column] < rval[j][self.column]: rval.insert(j, fields) break else: rval.append(fields) return rval
filter_types = dict(data_meta=DataMetaFilter, param_value=ParamValueFilter, static_value=StaticValueFilter, unique_value=UniqueValueFilter, multiple_splitter=MultipleSplitterFilter, attribute_value_splitter=AttributeValueSplitterFilter, add_value=AdditionalValueFilter, remove_value=RemoveValueFilter, sort_by=SortByColumnFilter)
[docs]class DynamicOptions(object): """Handles dynamically generated SelectToolParameter options"""
[docs] def __init__(self, elem, tool_param): def load_from_parameter(from_parameter, transform_lines=None): obj = self.tool_param for field in from_parameter.split('.'): obj = getattr(obj, field) if transform_lines: obj = eval(transform_lines, {'self': self, 'obj': obj}) return self.parse_file_fields(obj) self.tool_param = tool_param self.columns = {} self.filters = [] self.file_fields = None self.largest_index = 0 self.dataset_ref_name = None # True if the options generation depends on one or more other parameters # that are dataset inputs self.has_dataset_dependencies = False self.validators = [] self.converter_safe = True # Parse the <options> tag self.separator = elem.get('separator', '\t') self.line_startswith = elem.get('startswith', None) data_file = elem.get('from_file', None) self.index_file = None self.missing_index_file = None dataset_file = elem.get('from_dataset', None) from_parameter = elem.get('from_parameter', None) self.tool_data_table_name = elem.get('from_data_table', None) # Options are defined from a data table loaded by the app self._tool_data_table = None self.elem = elem self.column_elem = elem.find("column") self.tool_data_table # Need to touch tool data table once to populate self.columns # Options are defined by parsing tabular text data from a data file # on disk, a dataset, or the value of another parameter if not self.tool_data_table_name and (data_file is not None or dataset_file is not None or from_parameter is not None): self.parse_column_definitions(elem) if data_file is not None: data_file = data_file.strip() if not os.path.isabs(data_file): full_path = os.path.join(self.tool_param.tool.app.config.tool_data_path, data_file) if os.path.exists(full_path): self.index_file = data_file with open(full_path) as fh: self.file_fields = self.parse_file_fields(fh) else: self.missing_index_file = data_file elif dataset_file is not None: self.dataset_ref_name = dataset_file self.has_dataset_dependencies = True self.converter_safe = False elif from_parameter is not None: transform_lines = elem.get('transform_lines', None) self.file_fields = list(load_from_parameter(from_parameter, transform_lines)) # Load filters for filter_elem in elem.findall('filter'): self.filters.append(Filter.from_element(self, filter_elem)) # Load Validators for validator in elem.findall('validator'): self.validators.append(validation.Validator.from_element(self.tool_param, validator)) if self.dataset_ref_name: tool_param.data_ref = self.dataset_ref_name
@property def tool_data_table(self): if self.tool_data_table_name: tool_data_table = self.tool_param.tool.app.tool_data_tables.get(self.tool_data_table_name, None) if tool_data_table: # Column definitions are optional, but if provided override those from the table if self.column_elem is not None: self.parse_column_definitions(self.elem) else: self.columns = tool_data_table.columns # Set self.missing_index_file if the index file to # which the tool_data_table refers does not exist. if tool_data_table.missing_index_file: self.missing_index_file = tool_data_table.missing_index_file return tool_data_table return None @property def missing_tool_data_table_name(self): if not self.tool_data_table: log.warning("Data table named '%s' is required by tool but not configured" % self.tool_data_table_name) return self.tool_data_table_name return None
[docs] def parse_column_definitions(self, elem): for column_elem in elem.findall('column'): name = column_elem.get('name', None) assert name is not None, "Required 'name' attribute missing from column def" index = column_elem.get('index', None) assert index is not None, "Required 'index' attribute missing from column def" index = int(index) self.columns[name] = index if index > self.largest_index: self.largest_index = index assert 'value' in self.columns, "Required 'value' column missing from column def" if 'name' not in self.columns: self.columns['name'] = self.columns['value']
[docs] def parse_file_fields(self, reader): rval = [] field_count = None for line in reader: if line.startswith('#') or (self.line_startswith and not line.startswith(self.line_startswith)): continue line = line.rstrip("\n\r") if line: fields = line.split(self.separator) if self.largest_index < len(fields): if not field_count: field_count = len(fields) elif field_count != len(fields): try: name = reader.name except AttributeError: name = "a configuration file" # Perhaps this should be an error, but even a warning is useful. log.warning("Inconsistent number of fields (%i vs %i) in %s using separator %r, check line: %r" % (field_count, len(fields), name, self.separator, line)) rval.append(fields) return rval
[docs] def get_dependency_names(self): """ Return the names of parameters these options depend on -- both data and other param types. """ rval = [] if self.dataset_ref_name: rval.append(self.dataset_ref_name) for filter in self.filters: depend = filter.get_dependency_name() if depend: rval.append(depend) return rval
[docs] def get_fields(self, trans, other_values): if self.dataset_ref_name: dataset = other_values.get(self.dataset_ref_name, None) if not dataset or not hasattr(dataset, 'file_name'): return [] # no valid dataset in history # Ensure parsing dynamic options does not consume more than a megabyte worth memory. path = dataset.file_name if os.path.getsize(path) < 1048576: with open(path) as fh: options = self.parse_file_fields(fh) else: # Pass just the first megabyte to parse_file_fields. log.warning("Attempting to load options from large file, reading just first megabyte") with open(path, 'r') as fh: contents = fh.read(1048576) options = self.parse_file_fields(StringIO(contents)) elif self.tool_data_table: options = self.tool_data_table.get_fields() elif self.file_fields: options = list(self.file_fields) else: options = [] for filter in self.filters: options = filter.filter_options(options, trans, other_values) return options
[docs] def get_fields_by_value(self, value, trans, other_values): """ Return a list of fields with column 'value' matching provided value. """ rval = [] val_index = self.columns['value'] for fields in self.get_fields(trans, other_values): if fields[val_index] == value: rval.append(fields) return rval
[docs] def get_field_by_name_for_value(self, field_name, value, trans, other_values): """ Get contents of field by name for specified value. """ rval = [] if isinstance(field_name, int): field_index = field_name else: assert field_name in self.columns, "Requested '%s' column missing from column def" % field_name field_index = self.columns[field_name] if not isinstance(value, list): value = [value] for val in value: for fields in self.get_fields_by_value(val, trans, other_values): rval.append(fields[field_index]) return rval
[docs] def get_options(self, trans, other_values): rval = [] if self.file_fields is not None or self.tool_data_table is not None or self.dataset_ref_name is not None or self.missing_index_file: options = self.get_fields(trans, other_values) for fields in options: rval.append((fields[self.columns['name']], fields[self.columns['value']], False)) else: for filter in self.filters: rval = filter.filter_options(rval, trans, other_values) return rval
[docs] def column_spec_to_index(self, column_spec): """ Convert a column specification (as read from the config file), to an index. A column specification can just be a number, a column name, or a column alias. """ # Name? if column_spec in self.columns: return self.columns[column_spec] # Int? return int(column_spec)