Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.parameters.dynamic_options
"""
Support for generating the options for a SelectToolParameter dynamically (based
on the values of other parameters or other aspects of the current state)
"""
import copy
import logging
import os
import re
from io import StringIO
from galaxy.model import (
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
User
)
from galaxy.tools.wrappers import (
DatasetFilenameWrapper,
DatasetListWrapper
)
from galaxy.util import string_as_bool
from . import validation
log = logging.getLogger(__name__)
[docs]class Filter:
"""
A filter takes the current options list and modifies it.
"""
[docs] @classmethod
def from_element(cls, d_option, elem):
"""Loads the proper filter by the type attribute of elem"""
type = elem.get('type', None)
assert type is not None, "Required 'type' attribute missing from filter"
return filter_types[type.strip()](d_option, elem)
[docs] def get_dependency_name(self):
"""Returns the name of any dependencies, otherwise None"""
return None
[docs] def filter_options(self, options, trans, other_values):
"""Returns a list of options after the filter is applied"""
raise TypeError("Abstract Method")
[docs]class StaticValueFilter(Filter):
"""
Filters a list of options on a column by a static value.
Type: static_value
Required Attributes:
value: static value to compare to
column: column in options to compare with
Optional Attributes:
keep: Keep columns matching value (True)
Discard columns matching value (False)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
assert self.value is not None, "Required 'value' attribute missing from filter"
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter, when loading from file"
self.column = d_option.column_spec_to_index(column)
self.keep = string_as_bool(elem.get("keep", 'True'))
[docs] def filter_options(self, options, trans, other_values):
rval = []
filter_value = self.value
try:
filter_value = User.expand_user_properties(trans.user, filter_value)
except Exception:
pass
for fields in options:
if self.keep == (filter_value == fields[self.column]):
rval.append(fields)
return rval
[docs]class RegexpFilter(Filter):
"""
Filters a list of options on a column by a regular expression.
Type: regexp
Required Attributes:
value: regular expression to compare to
column: column in options to compare with
Optional Attributes:
keep: Keep columns matching the regexp (True)
Discard columns matching the regexp (False)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
assert self.value is not None, "Required 'value' attribute missing from filter"
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter, when loading from file"
self.column = d_option.column_spec_to_index(column)
self.keep = string_as_bool(elem.get("keep", 'True'))
[docs] def filter_options(self, options, trans, other_values):
rval = []
filter_value = self.value
try:
filter_value = User.expand_user_properties(trans.user, filter_value)
except Exception:
pass
filter_pattern = re.compile(filter_value)
for fields in options:
if self.keep == (not filter_pattern.match(fields[self.column]) is None):
rval.append(fields)
return rval
[docs]class DataMetaFilter(Filter):
"""
Filters a list of options on a column by a dataset metadata value.
Type: data_meta
When no 'from' source has been specified in the <options> tag, this will populate the options list with (meta_value, meta_value, False).
Otherwise, options which do not match the metadata value in the column are discarded.
Required Attributes:
- ref: Name of input dataset
- key: Metadata key to use for comparison
- column: column in options to compare with (not required when not associated with input options)
Optional Attributes:
- multiple: Option values are multiple, split column by separator (True)
- separator: When multiple split by this (,)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.ref_name = elem.get("ref", None)
assert self.ref_name is not None, "Required 'ref' attribute missing from filter"
d_option.has_dataset_dependencies = True
self.key = elem.get("key", None)
assert self.key is not None, "Required 'key' attribute missing from filter"
self.column = elem.get("column", None)
if self.column is None:
assert self.dynamic_option.file_fields is None and self.dynamic_option.dataset_ref_name is None, "Required 'column' attribute missing from filter, when loading from file"
else:
self.column = d_option.column_spec_to_index(self.column)
self.multiple = string_as_bool(elem.get("multiple", "False"))
self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, trans, other_values):
def _add_meta(meta_value, m):
if isinstance(m, list):
meta_value |= set(m)
elif isinstance(m, dict):
meta_value |= {f"{k},{v}" for k, v in m.items()}
elif isinstance(m, str) and os.path.isfile(m):
with open(m) as fh:
for line in fh:
meta_value.add(line)
else:
meta_value.add(m)
def compare_meta_value(file_value, dataset_value):
if isinstance(dataset_value, set):
if self.multiple:
file_value = file_value.split(self.separator)
for value in dataset_value:
if value not in file_value:
return False
return True
return file_value in dataset_value
if self.multiple:
return dataset_value in file_value.split(self.separator)
return file_value == dataset_value
try:
ref = _get_ref_data(other_values, self.ref_name)
except KeyError: # no such dataset
log.warning("could not filter by metadata: %s unknown" % self.ref_name)
return []
except ValueError: # not a valid dataset
log.warning("could not filter by metadata: %s not a data or collection parameter" % self.ref_name)
return []
# get the metadata value.
# - for lists: (of data sets) and collections the meta data values of all
# elements is determined
# - for data sets: the meta data value
# in both cases only meta data that is set (i.e. differs from the no_value)
# is considered
meta_value = set()
for r in ref:
if not r.metadata.element_is_set(self.key):
continue
_add_meta(meta_value, r.metadata.get(self.key))
# if no meta data value could be determined just return a copy
# of the original options
if len(meta_value) == 0:
return copy.deepcopy(options)
if self.column is not None:
rval = []
for fields in options:
if compare_meta_value(fields[self.column], meta_value):
rval.append(fields)
return rval
else:
if not self.dynamic_option.columns:
self.dynamic_option.columns = {
"name": 0,
"value": 1,
"selected": 2
}
self.dynamic_option.largest_index = 2
for value in meta_value:
options.append((value, value, False))
return options
[docs]class ParamValueFilter(Filter):
"""
Filters a list of options on a column by the value of another input.
Type: param_value
Required Attributes:
- ref: Name of input value
- column: column in options to compare with
Optional Attributes:
- keep: Keep columns matching value (True)
Discard columns matching value (False)
- ref_attribute: Period (.) separated attribute chain of input (ref) to use as value for filter
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.ref_name = elem.get("ref", None)
assert self.ref_name is not None, "Required 'ref' attribute missing from filter"
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index(column)
self.keep = string_as_bool(elem.get("keep", 'True'))
self.ref_attribute = elem.get("ref_attribute", None)
if self.ref_attribute:
self.ref_attribute = self.ref_attribute.split('.')
else:
self.ref_attribute = []
[docs] def filter_options(self, options, trans, other_values):
if trans is not None and trans.workflow_building_mode:
return []
ref = other_values.get(self.ref_name, None)
for ref_attribute in self.ref_attribute:
if not hasattr(ref, ref_attribute):
return [] # ref does not have attribute, so we cannot filter, return empty list
ref = getattr(ref, ref_attribute)
ref = str(ref)
rval = []
for fields in options:
if self.keep == (fields[self.column] == ref):
rval.append(fields)
return rval
[docs]class UniqueValueFilter(Filter):
"""
Filters a list of options to be unique by a column value.
Type: unique_value
Required Attributes:
column: column in options to compare with
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index(column)
[docs] def filter_options(self, options, trans, other_values):
rval = []
skip_list = []
for fields in options:
if fields[self.column] not in skip_list:
rval.append(fields)
skip_list.append(fields[self.column])
return rval
[docs]class MultipleSplitterFilter(Filter):
"""
Turns a single line of options into multiple lines, by splitting a column and creating a line for each item.
Type: multiple_splitter
Required Attributes:
column: column in options to compare with
Optional Attributes:
separator: Split column by this (,)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.separator = elem.get("separator", ",")
columns = elem.get("column", None)
assert columns is not None, "Required 'column' attribute missing from filter"
self.columns = [d_option.column_spec_to_index(column) for column in columns.split(",")]
[docs] def filter_options(self, options, trans, other_values):
rval = []
for fields in options:
for column in self.columns:
for field in fields[column].split(self.separator):
rval.append(fields[0:column] + [field] + fields[column + 1:])
return rval
[docs]class AttributeValueSplitterFilter(Filter):
"""
Filters a list of attribute-value pairs to be unique attribute names.
DEPRECATED: just replace with 2 rounds of MultipleSplitterFilter
Type: attribute_value_splitter
Required Attributes:
column: column in options to compare with
Optional Attributes:
pair_separator: Split column by this (,)
name_val_separator: Split name-value pair by this ( whitespace )
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.pair_separator = elem.get("pair_separator", ",")
self.name_val_separator = elem.get("name_val_separator", None)
columns = elem.get("column", None)
assert columns is not None, "Required 'column' attribute missing from filter"
self.columns = [d_option.column_spec_to_index(column) for column in columns.split(",")]
[docs] def filter_options(self, options, trans, other_values):
attr_names = []
rval = []
for fields in options:
for column in self.columns:
for pair in fields[column].split(self.pair_separator):
ary = pair.split(self.name_val_separator)
if len(ary) == 2:
name = ary[0]
if name not in attr_names:
rval.append(fields[0:column] + [name] + fields[column:])
attr_names.append(name)
return rval
[docs]class AdditionalValueFilter(Filter):
"""
Adds a single static value to an options list.
Type: add_value
Required Attributes:
value: value to appear in select list
Optional Attributes:
name: Display name to appear in select list (value)
index: Index of option list to add value (APPEND)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
assert self.value is not None, "Required 'value' attribute missing from filter"
self.name = elem.get("name", None)
if self.name is None:
self.name = self.value
self.index = elem.get("index", None)
if self.index is not None:
self.index = int(self.index)
[docs] def filter_options(self, options, trans, other_values):
rval = list(options)
add_value = []
for _ in range(self.dynamic_option.largest_index + 1):
add_value.append("")
value_col = self.dynamic_option.columns.get('value', 0)
name_col = self.dynamic_option.columns.get('name', value_col)
# Set name first, then value, in case they are the same column
add_value[name_col] = self.name
add_value[value_col] = self.value
if self.index is not None:
rval.insert(self.index, add_value)
else:
rval.append(add_value)
return rval
[docs]class RemoveValueFilter(Filter):
"""
Removes a value from an options list.
Type: remove_value
Required Attributes::
value: value to remove from select list
or
ref: param to refer to
or
meta_ref: dataset to refer to
key: metadata key to compare to
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
self.ref_name = elem.get("ref", None)
self.meta_ref = elem.get("meta_ref", None)
self.metadata_key = elem.get("key", None)
assert self.value is not None or self.ref_name is not None or (self.meta_ref is not None and self.metadata_key is not None), ValueError("Required 'value', or 'ref', or 'meta_ref' and 'key' attributes missing from filter")
self.multiple = string_as_bool(elem.get("multiple", "False"))
self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, trans, other_values):
if trans is not None and trans.workflow_building_mode:
return options
def compare_value(option_value, filter_value):
if isinstance(filter_value, list):
if self.multiple:
option_value = option_value.split(self.separator)
for value in filter_value:
if value not in option_value:
return False
return True
return option_value in filter_value
if self.multiple:
return filter_value in option_value.split(self.separator)
return option_value == filter_value
value = self.value
if value is None:
if self.ref_name is not None:
value = other_values.get(self.ref_name)
else:
data_ref = other_values.get(self.meta_ref)
if isinstance(data_ref, HistoryDatasetCollectionAssociation):
data_ref = data_ref.to_hda_representative()
if not isinstance(data_ref, HistoryDatasetAssociation) and not isinstance(data_ref, DatasetFilenameWrapper):
return options # cannot modify options
value = data_ref.metadata.get(self.metadata_key, None)
# Default to the second column (i.e. 1) since this used to work only on options produced by the data_meta filter
value_col = self.dynamic_option.columns.get('value', 1)
return [option for option in options if not compare_value(option[value_col], value)]
[docs]class SortByColumnFilter(Filter):
"""
Sorts an options list by a column
Type: sort_by
Required Attributes:
column: column to sort by
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index(column)
[docs] def filter_options(self, options, trans, other_values):
return sorted(options, key=lambda x: x[self.column])
filter_types = dict(data_meta=DataMetaFilter,
param_value=ParamValueFilter,
static_value=StaticValueFilter,
regexp=RegexpFilter,
unique_value=UniqueValueFilter,
multiple_splitter=MultipleSplitterFilter,
attribute_value_splitter=AttributeValueSplitterFilter,
add_value=AdditionalValueFilter,
remove_value=RemoveValueFilter,
sort_by=SortByColumnFilter)
[docs]class DynamicOptions:
"""Handles dynamically generated SelectToolParameter options"""
[docs] def __init__(self, elem, tool_param):
def load_from_parameter(from_parameter, transform_lines=None):
obj = self.tool_param
for field in from_parameter.split('.'):
obj = getattr(obj, field)
if transform_lines:
obj = eval(transform_lines, {'self': self, 'obj': obj})
return self.parse_file_fields(obj)
self.tool_param = tool_param
self.columns = {}
self.filters = []
self.file_fields = None
self.largest_index = 0
self.dataset_ref_name = None
# True if the options generation depends on one or more other parameters
# that are dataset inputs
self.has_dataset_dependencies = False
self.validators = []
self.converter_safe = True
# Parse the <options> tag
self.separator = elem.get('separator', '\t')
self.line_startswith = elem.get('startswith', None)
data_file = elem.get('from_file', None)
self.index_file = None
self.missing_index_file = None
dataset_file = elem.get('from_dataset', None)
from_parameter = elem.get('from_parameter', None)
self.tool_data_table_name = elem.get('from_data_table', None)
# Options are defined from a data table loaded by the app
self._tool_data_table = None
self.elem = elem
self.column_elem = elem.find("column")
self.tool_data_table # Need to touch tool data table once to populate self.columns
# Options are defined by parsing tabular text data from a data file
# on disk, a dataset, or the value of another parameter
if not self.tool_data_table_name and (data_file is not None or dataset_file is not None or from_parameter is not None):
self.parse_column_definitions(elem)
if data_file is not None:
data_file = data_file.strip()
if not os.path.isabs(data_file):
full_path = os.path.join(self.tool_param.tool.app.config.tool_data_path, data_file)
if os.path.exists(full_path):
self.index_file = data_file
with open(full_path) as fh:
self.file_fields = self.parse_file_fields(fh)
else:
self.missing_index_file = data_file
elif dataset_file is not None:
self.dataset_ref_name = dataset_file
self.has_dataset_dependencies = True
self.converter_safe = False
elif from_parameter is not None:
transform_lines = elem.get('transform_lines', None)
self.file_fields = list(load_from_parameter(from_parameter, transform_lines))
# Load filters
for filter_elem in elem.findall('filter'):
self.filters.append(Filter.from_element(self, filter_elem))
# Load Validators
for validator in elem.findall('validator'):
self.validators.append(validation.Validator.from_element(self.tool_param, validator))
if self.dataset_ref_name:
tool_param.data_ref = self.dataset_ref_name
@property
def tool_data_table(self):
if self.tool_data_table_name:
tool_data_table = self.tool_param.tool.app.tool_data_tables.get(self.tool_data_table_name, None)
if tool_data_table:
# Column definitions are optional, but if provided override those from the table
if self.column_elem is not None:
self.parse_column_definitions(self.elem)
else:
self.columns = tool_data_table.columns
# Set self.missing_index_file if the index file to
# which the tool_data_table refers does not exist.
if tool_data_table.missing_index_file:
self.missing_index_file = tool_data_table.missing_index_file
return tool_data_table
return None
@property
def missing_tool_data_table_name(self):
if not self.tool_data_table:
log.warning("Data table named '%s' is required by tool but not configured" % self.tool_data_table_name)
return self.tool_data_table_name
return None
[docs] def parse_column_definitions(self, elem):
for column_elem in elem.findall('column'):
name = column_elem.get('name', None)
assert name is not None, "Required 'name' attribute missing from column def"
index = column_elem.get('index', None)
assert index is not None, "Required 'index' attribute missing from column def"
index = int(index)
self.columns[name] = index
if index > self.largest_index:
self.largest_index = index
assert 'value' in self.columns, "Required 'value' column missing from column def"
if 'name' not in self.columns:
self.columns['name'] = self.columns['value']
[docs] def parse_file_fields(self, reader):
rval = []
field_count = None
for line in reader:
if line.startswith('#') or (self.line_startswith and not line.startswith(self.line_startswith)):
continue
line = line.rstrip("\n\r")
if line:
fields = line.split(self.separator)
if self.largest_index < len(fields):
if not field_count:
field_count = len(fields)
elif field_count != len(fields):
try:
name = reader.name
except AttributeError:
name = "a configuration file"
# Perhaps this should be an error, but even a warning is useful.
log.warning("Inconsistent number of fields (%i vs %i) in %s using separator %r, check line: %r" %
(field_count, len(fields), name, self.separator, line))
rval.append(fields)
return rval
[docs] def get_dependency_names(self):
"""
Return the names of parameters these options depend on -- both data
and other param types.
"""
rval = []
if self.dataset_ref_name:
rval.append(self.dataset_ref_name)
for filter in self.filters:
depend = filter.get_dependency_name()
if depend:
rval.append(depend)
return rval
[docs] def get_fields(self, trans, other_values):
if self.dataset_ref_name:
try:
datasets = _get_ref_data(other_values, self.dataset_ref_name)
except KeyError: # no such dataset
log.warning("could not create dynamic options from_dataset: %s unknown" % self.dataset_ref_name)
return []
except ValueError: # not a valid dataset
log.warning("could not create dynamic options from_dataset: %s not a data or collection parameter" % self.dataset_ref_name)
return []
options = []
for dataset in datasets:
if not hasattr(dataset, 'file_name'):
continue
# Ensure parsing dynamic options does not consume more than a megabyte worth memory.
path = dataset.file_name
if os.path.getsize(path) < 1048576:
with open(path) as fh:
options += self.parse_file_fields(fh)
else:
# Pass just the first megabyte to parse_file_fields.
log.warning("Attempting to load options from large file, reading just first megabyte")
with open(path) as fh:
contents = fh.read(1048576)
options += self.parse_file_fields(StringIO(contents))
elif self.tool_data_table:
options = self.tool_data_table.get_fields()
elif self.file_fields:
options = list(self.file_fields)
else:
options = []
for filter in self.filters:
options = filter.filter_options(options, trans, other_values)
return options
[docs] def get_fields_by_value(self, value, trans, other_values):
"""
Return a list of fields with column 'value' matching provided value.
"""
rval = []
val_index = self.columns['value']
for fields in self.get_fields(trans, other_values):
if fields[val_index] == value:
rval.append(fields)
return rval
[docs] def get_field_by_name_for_value(self, field_name, value, trans, other_values):
"""
Get contents of field by name for specified value.
"""
rval = []
if isinstance(field_name, int):
field_index = field_name
else:
assert field_name in self.columns, "Requested '%s' column missing from column def" % field_name
field_index = self.columns[field_name]
if not isinstance(value, list):
value = [value]
for val in value:
for fields in self.get_fields_by_value(val, trans, other_values):
rval.append(fields[field_index])
return rval
[docs] def get_options(self, trans, other_values):
rval = []
if self.file_fields is not None or self.tool_data_table is not None or self.dataset_ref_name is not None or self.missing_index_file:
options = self.get_fields(trans, other_values)
for fields in options:
rval.append((fields[self.columns['name']], fields[self.columns['value']], False))
else:
for filter in self.filters:
rval = filter.filter_options(rval, trans, other_values)
return rval
[docs] def column_spec_to_index(self, column_spec):
"""
Convert a column specification (as read from the config file), to an
index. A column specification can just be a number, a column name, or
a column alias.
"""
# Name?
if column_spec in self.columns:
return self.columns[column_spec]
# Int?
return int(column_spec)
def _get_ref_data(other_values, ref_name):
"""
get the list of data sets from ref_name
- a KeyError is raised if no such element exists
- a ValueError is raised if the element is not of the type DatasetFilenameWrapper, HistoryDatasetAssociation, DatasetListWrapper, HistoryDatasetCollectionAssociation, list
"""
ref = other_values[ref_name]
if not isinstance(ref, (DatasetFilenameWrapper, HistoryDatasetAssociation, DatasetListWrapper, HistoryDatasetCollectionAssociation, list)):
raise ValueError
if isinstance(ref, (DatasetFilenameWrapper, HistoryDatasetAssociation)):
ref = [ref]
elif isinstance(ref, HistoryDatasetCollectionAssociation):
ref = ref.to_hda_representative(multiple=True)
return ref