Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.parameters.dynamic_options
"""
Support for generating the options for a SelectToolParameter dynamically (based
on the values of other parameters or other aspects of the current state)
"""
import logging
import os
from six import StringIO
import galaxy.tools
from galaxy.model import (
HistoryDatasetAssociation,
HistoryDatasetCollectionAssociation,
User
)
from galaxy.util import string_as_bool
from . import validation
log = logging.getLogger(__name__)
[docs]class Filter(object):
"""
A filter takes the current options list and modifies it.
"""
[docs] @classmethod
def from_element(cls, d_option, elem):
"""Loads the proper filter by the type attribute of elem"""
type = elem.get('type', None)
assert type is not None, "Required 'type' attribute missing from filter"
return filter_types[type.strip()](d_option, elem)
[docs] def get_dependency_name(self):
"""Returns the name of any dependencies, otherwise None"""
return None
[docs] def filter_options(self, options, trans, other_values):
"""Returns a list of options after the filter is applied"""
raise TypeError("Abstract Method")
[docs]class StaticValueFilter(Filter):
"""
Filters a list of options on a column by a static value.
Type: static_value
Required Attributes:
value: static value to compare to
column: column in options to compare with
Optional Attributes:
keep: Keep columns matching value (True)
Discard columns matching value (False)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
assert self.value is not None, "Required 'value' attribute missing from filter"
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter, when loading from file"
self.column = d_option.column_spec_to_index(column)
self.keep = string_as_bool(elem.get("keep", 'True'))
[docs] def filter_options(self, options, trans, other_values):
rval = []
filter_value = self.value
try:
filter_value = User.expand_user_properties(trans.user, filter_value)
except Exception:
pass
for fields in options:
if (self.keep and fields[self.column] == filter_value) or (not self.keep and fields[self.column] != filter_value):
rval.append(fields)
return rval
[docs]class DataMetaFilter(Filter):
"""
Filters a list of options on a column by a dataset metadata value.
Type: data_meta
When no 'from' source has been specified in the <options> tag, this will populate the options list with (meta_value, meta_value, False).
Otherwise, options which do not match the metadata value in the column are discarded.
Required Attributes:
- ref: Name of input dataset
- key: Metadata key to use for comparison
- column: column in options to compare with (not required when not associated with input options)
Optional Attributes:
- multiple: Option values are multiple, split column by separator (True)
- separator: When multiple split by this (,)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.ref_name = elem.get("ref", None)
assert self.ref_name is not None, "Required 'ref' attribute missing from filter"
d_option.has_dataset_dependencies = True
self.key = elem.get("key", None)
assert self.key is not None, "Required 'key' attribute missing from filter"
self.column = elem.get("column", None)
if self.column is None:
assert self.dynamic_option.file_fields is None and self.dynamic_option.dataset_ref_name is None, "Required 'column' attribute missing from filter, when loading from file"
else:
self.column = d_option.column_spec_to_index(self.column)
self.multiple = string_as_bool(elem.get("multiple", "False"))
self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, trans, other_values):
def compare_meta_value(file_value, dataset_value):
if isinstance(dataset_value, list):
if self.multiple:
file_value = file_value.split(self.separator)
for value in dataset_value:
if value not in file_value:
return False
return True
return file_value in dataset_value
if self.multiple:
return dataset_value in file_value.split(self.separator)
return file_value == dataset_value
ref = other_values.get(self.ref_name, None)
if isinstance(ref, HistoryDatasetCollectionAssociation):
ref = ref.to_hda_representative(self.multiple)
is_data = isinstance(ref, galaxy.tools.wrappers.DatasetFilenameWrapper)
is_data_list = isinstance(ref, galaxy.tools.wrappers.DatasetListWrapper) or isinstance(ref, list)
is_data_or_data_list = is_data or is_data_list
if not isinstance(ref, HistoryDatasetAssociation) and not is_data_or_data_list:
return [] # not a valid dataset
if is_data_list:
meta_value = None
for single_ref in ref:
this_meta_value = single_ref.metadata.get(self.key, None)
if this_meta_value == meta_value:
continue
elif meta_value is None:
meta_value = this_meta_value
else:
# Different values with mismatching metadata, return []
return []
else:
meta_value = ref.metadata.get(self.key, None)
if meta_value is None:
return [(disp_name, optval, selected) for disp_name, optval, selected in options]
if self.column is not None:
rval = []
for fields in options:
if compare_meta_value(fields[self.column], meta_value):
rval.append(fields)
return rval
else:
if not self.dynamic_option.columns:
self.dynamic_option.columns = {
"name" : 0,
"value" : 1,
"selected" : 2
}
self.dynamic_option.largest_index = 2
if not isinstance(meta_value, list):
meta_value = [meta_value]
for value in meta_value:
options.append((value, value, False))
return options
[docs]class ParamValueFilter(Filter):
"""
Filters a list of options on a column by the value of another input.
Type: param_value
Required Attributes:
- ref: Name of input value
- column: column in options to compare with
Optional Attributes:
- keep: Keep columns matching value (True)
Discard columns matching value (False)
- ref_attribute: Period (.) separated attribute chain of input (ref) to use as value for filter
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.ref_name = elem.get("ref", None)
assert self.ref_name is not None, "Required 'ref' attribute missing from filter"
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index(column)
self.keep = string_as_bool(elem.get("keep", 'True'))
self.ref_attribute = elem.get("ref_attribute", None)
if self.ref_attribute:
self.ref_attribute = self.ref_attribute.split('.')
else:
self.ref_attribute = []
[docs] def filter_options(self, options, trans, other_values):
if trans is not None and trans.workflow_building_mode:
return []
ref = other_values.get(self.ref_name, None)
for ref_attribute in self.ref_attribute:
if not hasattr(ref, ref_attribute):
return [] # ref does not have attribute, so we cannot filter, return empty list
ref = getattr(ref, ref_attribute)
ref = str(ref)
rval = []
for fields in options:
if (self.keep and fields[self.column] == ref) or (not self.keep and fields[self.column] != ref):
rval.append(fields)
return rval
[docs]class UniqueValueFilter(Filter):
"""
Filters a list of options to be unique by a column value.
Type: unique_value
Required Attributes:
column: column in options to compare with
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index(column)
[docs] def filter_options(self, options, trans, other_values):
rval = []
skip_list = []
for fields in options:
if fields[self.column] not in skip_list:
rval.append(fields)
skip_list.append(fields[self.column])
return rval
[docs]class MultipleSplitterFilter(Filter):
"""
Turns a single line of options into multiple lines, by splitting a column and creating a line for each item.
Type: multiple_splitter
Required Attributes:
column: column in options to compare with
Optional Attributes:
separator: Split column by this (,)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.separator = elem.get("separator", ",")
columns = elem.get("column", None)
assert columns is not None, "Required 'column' attribute missing from filter"
self.columns = [d_option.column_spec_to_index(column) for column in columns.split(",")]
[docs] def filter_options(self, options, trans, other_values):
rval = []
for fields in options:
for column in self.columns:
for field in fields[column].split(self.separator):
rval.append(fields[0:column] + [field] + fields[column + 1:])
return rval
[docs]class AttributeValueSplitterFilter(Filter):
"""
Filters a list of attribute-value pairs to be unique attribute names.
Type: attribute_value_splitter
Required Attributes:
column: column in options to compare with
Optional Attributes:
pair_separator: Split column by this (,)
name_val_separator: Split name-value pair by this ( whitespace )
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.pair_separator = elem.get("pair_separator", ",")
self.name_val_separator = elem.get("name_val_separator", None)
columns = elem.get("column", None)
assert columns is not None, "Required 'column' attribute missing from filter"
self.columns = [d_option.column_spec_to_index(column) for column in columns.split(",")]
[docs] def filter_options(self, options, trans, other_values):
attr_names = []
rval = []
for fields in options:
for column in self.columns:
for pair in fields[column].split(self.pair_separator):
ary = pair.split(self.name_val_separator)
if len(ary) == 2:
name = ary[0]
if name not in attr_names:
rval.append(fields[0:column] + [name] + fields[column:])
attr_names.append(name)
return rval
[docs]class AdditionalValueFilter(Filter):
"""
Adds a single static value to an options list.
Type: add_value
Required Attributes:
value: value to appear in select list
Optional Attributes:
name: Display name to appear in select list (value)
index: Index of option list to add value (APPEND)
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
assert self.value is not None, "Required 'value' attribute missing from filter"
self.name = elem.get("name", None)
if self.name is None:
self.name = self.value
self.index = elem.get("index", None)
if self.index is not None:
self.index = int(self.index)
[docs] def filter_options(self, options, trans, other_values):
rval = list(options)
add_value = []
for _ in range(self.dynamic_option.largest_index + 1):
add_value.append("")
value_col = self.dynamic_option.columns.get('value', 0)
name_col = self.dynamic_option.columns.get('name', value_col)
# Set name first, then value, in case they are the same column
add_value[name_col] = self.name
add_value[value_col] = self.value
if self.index is not None:
rval.insert(self.index, add_value)
else:
rval.append(add_value)
return rval
[docs]class RemoveValueFilter(Filter):
"""
Removes a value from an options list.
Type: remove_value
Required Attributes::
value: value to remove from select list
or
ref: param to refer to
or
meta_ref: dataset to refer to
key: metadata key to compare to
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
self.value = elem.get("value", None)
self.ref_name = elem.get("ref", None)
self.meta_ref = elem.get("meta_ref", None)
self.metadata_key = elem.get("key", None)
assert self.value is not None or self.ref_name is not None or (self.meta_ref is not None and self.metadata_key is not None), ValueError("Required 'value', or 'ref', or 'meta_ref' and 'key' attributes missing from filter")
self.multiple = string_as_bool(elem.get("multiple", "False"))
self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, trans, other_values):
if trans is not None and trans.workflow_building_mode:
return options
def compare_value(option_value, filter_value):
if isinstance(filter_value, list):
if self.multiple:
option_value = option_value.split(self.separator)
for value in filter_value:
if value not in option_value:
return False
return True
return option_value in filter_value
if self.multiple:
return filter_value in option_value.split(self.separator)
return option_value == filter_value
value = self.value
if value is None:
if self.ref_name is not None:
value = other_values.get(self.ref_name)
else:
data_ref = other_values.get(self.meta_ref)
if isinstance(data_ref, HistoryDatasetCollectionAssociation):
data_ref = data_ref.to_hda_representative()
if not isinstance(data_ref, HistoryDatasetAssociation) and not isinstance(data_ref, galaxy.tools.wrappers.DatasetFilenameWrapper):
return options # cannot modify options
value = data_ref.metadata.get(self.metadata_key, None)
# Default to the second column (i.e. 1) since this used to work only on options produced by the data_meta filter
value_col = self.dynamic_option.columns.get('value', 1)
return [option for option in options if not compare_value(option[value_col], value)]
[docs]class SortByColumnFilter(Filter):
"""
Sorts an options list by a column
Type: sort_by
Required Attributes:
column: column to sort by
"""
[docs] def __init__(self, d_option, elem):
Filter.__init__(self, d_option, elem)
column = elem.get("column", None)
assert column is not None, "Required 'column' attribute missing from filter"
self.column = d_option.column_spec_to_index(column)
[docs] def filter_options(self, options, trans, other_values):
rval = []
for fields in options:
for j in range(0, len(rval)):
if fields[self.column] < rval[j][self.column]:
rval.insert(j, fields)
break
else:
rval.append(fields)
return rval
filter_types = dict(data_meta=DataMetaFilter,
param_value=ParamValueFilter,
static_value=StaticValueFilter,
unique_value=UniqueValueFilter,
multiple_splitter=MultipleSplitterFilter,
attribute_value_splitter=AttributeValueSplitterFilter,
add_value=AdditionalValueFilter,
remove_value=RemoveValueFilter,
sort_by=SortByColumnFilter)
[docs]class DynamicOptions(object):
"""Handles dynamically generated SelectToolParameter options"""
[docs] def __init__(self, elem, tool_param):
def load_from_parameter(from_parameter, transform_lines=None):
obj = self.tool_param
for field in from_parameter.split('.'):
obj = getattr(obj, field)
if transform_lines:
obj = eval(transform_lines, {'self': self, 'obj': obj})
return self.parse_file_fields(obj)
self.tool_param = tool_param
self.columns = {}
self.filters = []
self.file_fields = None
self.largest_index = 0
self.dataset_ref_name = None
# True if the options generation depends on one or more other parameters
# that are dataset inputs
self.has_dataset_dependencies = False
self.validators = []
self.converter_safe = True
# Parse the <options> tag
self.separator = elem.get('separator', '\t')
self.line_startswith = elem.get('startswith', None)
data_file = elem.get('from_file', None)
self.index_file = None
self.missing_index_file = None
dataset_file = elem.get('from_dataset', None)
from_parameter = elem.get('from_parameter', None)
self.tool_data_table_name = elem.get('from_data_table', None)
# Options are defined from a data table loaded by the app
self._tool_data_table = None
self.elem = elem
self.column_elem = elem.find("column")
self.tool_data_table # Need to touch tool data table once to populate self.columns
# Options are defined by parsing tabular text data from a data file
# on disk, a dataset, or the value of another parameter
if not self.tool_data_table_name and (data_file is not None or dataset_file is not None or from_parameter is not None):
self.parse_column_definitions(elem)
if data_file is not None:
data_file = data_file.strip()
if not os.path.isabs(data_file):
full_path = os.path.join(self.tool_param.tool.app.config.tool_data_path, data_file)
if os.path.exists(full_path):
self.index_file = data_file
with open(full_path) as fh:
self.file_fields = self.parse_file_fields(fh)
else:
self.missing_index_file = data_file
elif dataset_file is not None:
self.dataset_ref_name = dataset_file
self.has_dataset_dependencies = True
self.converter_safe = False
elif from_parameter is not None:
transform_lines = elem.get('transform_lines', None)
self.file_fields = list(load_from_parameter(from_parameter, transform_lines))
# Load filters
for filter_elem in elem.findall('filter'):
self.filters.append(Filter.from_element(self, filter_elem))
# Load Validators
for validator in elem.findall('validator'):
self.validators.append(validation.Validator.from_element(self.tool_param, validator))
if self.dataset_ref_name:
tool_param.data_ref = self.dataset_ref_name
@property
def tool_data_table(self):
if self.tool_data_table_name:
tool_data_table = self.tool_param.tool.app.tool_data_tables.get(self.tool_data_table_name, None)
if tool_data_table:
# Column definitions are optional, but if provided override those from the table
if self.column_elem is not None:
self.parse_column_definitions(self.elem)
else:
self.columns = tool_data_table.columns
# Set self.missing_index_file if the index file to
# which the tool_data_table refers does not exist.
if tool_data_table.missing_index_file:
self.missing_index_file = tool_data_table.missing_index_file
return tool_data_table
return None
@property
def missing_tool_data_table_name(self):
if not self.tool_data_table:
log.warning("Data table named '%s' is required by tool but not configured" % self.tool_data_table_name)
return self.tool_data_table_name
return None
[docs] def parse_column_definitions(self, elem):
for column_elem in elem.findall('column'):
name = column_elem.get('name', None)
assert name is not None, "Required 'name' attribute missing from column def"
index = column_elem.get('index', None)
assert index is not None, "Required 'index' attribute missing from column def"
index = int(index)
self.columns[name] = index
if index > self.largest_index:
self.largest_index = index
assert 'value' in self.columns, "Required 'value' column missing from column def"
if 'name' not in self.columns:
self.columns['name'] = self.columns['value']
[docs] def parse_file_fields(self, reader):
rval = []
field_count = None
for line in reader:
if line.startswith('#') or (self.line_startswith and not line.startswith(self.line_startswith)):
continue
line = line.rstrip("\n\r")
if line:
fields = line.split(self.separator)
if self.largest_index < len(fields):
if not field_count:
field_count = len(fields)
elif field_count != len(fields):
try:
name = reader.name
except AttributeError:
name = "a configuration file"
# Perhaps this should be an error, but even a warning is useful.
log.warning("Inconsistent number of fields (%i vs %i) in %s using separator %r, check line: %r" %
(field_count, len(fields), name, self.separator, line))
rval.append(fields)
return rval
[docs] def get_dependency_names(self):
"""
Return the names of parameters these options depend on -- both data
and other param types.
"""
rval = []
if self.dataset_ref_name:
rval.append(self.dataset_ref_name)
for filter in self.filters:
depend = filter.get_dependency_name()
if depend:
rval.append(depend)
return rval
[docs] def get_fields(self, trans, other_values):
if self.dataset_ref_name:
dataset = other_values.get(self.dataset_ref_name, None)
if not dataset or not hasattr(dataset, 'file_name'):
return [] # no valid dataset in history
# Ensure parsing dynamic options does not consume more than a megabyte worth memory.
path = dataset.file_name
if os.path.getsize(path) < 1048576:
with open(path) as fh:
options = self.parse_file_fields(fh)
else:
# Pass just the first megabyte to parse_file_fields.
log.warning("Attempting to load options from large file, reading just first megabyte")
with open(path, 'r') as fh:
contents = fh.read(1048576)
options = self.parse_file_fields(StringIO(contents))
elif self.tool_data_table:
options = self.tool_data_table.get_fields()
elif self.file_fields:
options = list(self.file_fields)
else:
options = []
for filter in self.filters:
options = filter.filter_options(options, trans, other_values)
return options
[docs] def get_fields_by_value(self, value, trans, other_values):
"""
Return a list of fields with column 'value' matching provided value.
"""
rval = []
val_index = self.columns['value']
for fields in self.get_fields(trans, other_values):
if fields[val_index] == value:
rval.append(fields)
return rval
[docs] def get_field_by_name_for_value(self, field_name, value, trans, other_values):
"""
Get contents of field by name for specified value.
"""
rval = []
if isinstance(field_name, int):
field_index = field_name
else:
assert field_name in self.columns, "Requested '%s' column missing from column def" % field_name
field_index = self.columns[field_name]
if not isinstance(value, list):
value = [value]
for val in value:
for fields in self.get_fields_by_value(val, trans, other_values):
rval.append(fields[field_index])
return rval
[docs] def get_options(self, trans, other_values):
rval = []
if self.file_fields is not None or self.tool_data_table is not None or self.dataset_ref_name is not None or self.missing_index_file:
options = self.get_fields(trans, other_values)
for fields in options:
rval.append((fields[self.columns['name']], fields[self.columns['value']], False))
else:
for filter in self.filters:
rval = filter.filter_options(rval, trans, other_values)
return rval
[docs] def column_spec_to_index(self, column_spec):
"""
Convert a column specification (as read from the config file), to an
index. A column specification can just be a number, a column name, or
a column alias.
"""
# Name?
if column_spec in self.columns:
return self.columns[column_spec]
# Int?
return int(column_spec)