Source code for galaxy.tool_util.parser.output_actions
"""
Support for dynamically modifying output attributes.
"""
import logging
import os.path
import re
from galaxy import util
try:
from galaxy.util.template import fill_template
except ImportError:
fill_template = None # type: ignore[assignment, unused-ignore]
log = logging.getLogger(__name__)
[docs]class ToolOutputActionGroup:
"""
Manages a set of tool output dataset actions directives
"""
tag = "group"
[docs] def __init__(self, parent, config_elem):
self.parent = parent
self.actions = []
if config_elem is not None:
for elem in config_elem:
if elem.tag == "conditional":
self.actions.append(ToolOutputActionConditional(self, elem))
elif elem.tag == "action":
self.actions.append(ToolOutputAction.from_elem(self, elem))
else:
log.debug(f"Unknown ToolOutputAction tag specified: {elem.tag}")
[docs] def apply_action(self, output_dataset, other_values) -> None:
for action in self.actions:
action.apply_action(output_dataset, other_values)
@property
def tool(self):
return self.parent.tool
def __len__(self):
return len(self.actions)
[docs]class ToolOutputActionConditionalWhen(ToolOutputActionGroup):
tag = "when"
[docs] @classmethod
def from_elem(cls, parent, when_elem):
"""Loads the proper when by attributes of elem"""
when_value = when_elem.get("value", None)
if when_value is not None:
return ValueToolOutputActionConditionalWhen(parent, when_elem, when_value)
else:
when_value = when_elem.get("datatype_isinstance", None)
if when_value is not None:
return DatatypeIsInstanceToolOutputActionConditionalWhen(parent, when_elem, when_value)
raise TypeError("When type not implemented")
[docs] def __init__(self, parent, config_elem, value):
super().__init__(parent, config_elem)
self.value = value
[docs] def get_ref(self, other_values):
ref = other_values
for ref_name in self.parent.name:
assert ref_name in ref, f"Required dependency '{ref_name}' not found in incoming values"
ref = ref.get(ref_name)
return ref
[docs] def apply_action(self, output_dataset, other_values) -> None:
if self.is_case(other_values):
super().apply_action(output_dataset, other_values)
[docs]class ValueToolOutputActionConditionalWhen(ToolOutputActionConditionalWhen):
tag = "when value"
[docs] def is_case(self, other_values) -> bool:
ref = self.get_ref(other_values)
return ref == self.value
[docs]class DatatypeIsInstanceToolOutputActionConditionalWhen(ToolOutputActionConditionalWhen):
tag = "when datatype_isinstance"
[docs] def __init__(self, parent, config_elem, value):
super().__init__(parent, config_elem, value)
self.value = type(self.tool.app.datatypes_registry.get_datatype_by_extension(value))
[docs] def is_case(self, other_values) -> bool:
ref = self.get_ref(other_values)
return isinstance(ref.datatype, self.value)
[docs]class ToolOutputActionConditional:
tag = "conditional"
[docs] def __init__(self, parent, config_elem):
self.parent = parent
self.name = config_elem.get("name", None)
assert self.name is not None, "Required 'name' attribute missing from ToolOutputActionConditional"
self.name = self.name.split(".")
self.cases = []
for when_elem in config_elem.findall("when"):
self.cases.append(ToolOutputActionConditionalWhen.from_elem(self, when_elem))
[docs] def apply_action(self, output_dataset, other_values) -> None:
for case in self.cases:
case.apply_action(output_dataset, other_values)
@property
def tool(self):
return self.parent.tool
[docs]class ToolOutputAction:
tag = "action"
[docs] @classmethod
def from_elem(cls, parent, elem):
"""Loads the proper action by the type attribute of elem"""
action_type = elem.get("type", None)
assert action_type is not None, "Required 'type' attribute missing from ToolOutputAction"
return action_types[action_type](parent, elem)
[docs] def __init__(self, parent, elem):
self.parent = parent
self.default = elem.get("default", None)
option_elem = elem.find("option")
self.option = ToolOutputActionOption.from_elem(self, option_elem)
@property
def tool(self):
return self.parent.tool
[docs]class ToolOutputActionOption:
tag = "object"
[docs] @classmethod
def from_elem(cls, parent, elem):
"""Loads the proper action by the type attribute of elem"""
if elem is None:
option_type = (
NullToolOutputActionOption.tag
) # no ToolOutputActionOption's have been defined, use implicit NullToolOutputActionOption
else:
option_type = elem.get("type", None)
assert option_type is not None, "Required 'type' attribute missing from ToolOutputActionOption"
return option_types[option_type](parent, elem)
[docs] def __init__(self, parent, elem):
self.parent = parent
self.filters = []
if elem is not None:
for filter_elem in elem.findall("filter"):
self.filters.append(ToolOutputActionOptionFilter.from_elem(self, filter_elem))
@property
def tool(self):
return self.parent.tool
[docs]class FromFileToolOutputActionOption(ToolOutputActionOption):
tag = "from_file"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.name = elem.get("name", None)
assert self.name is not None, "Required 'name' attribute missing from FromFileToolOutputActionOption"
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from FromFileToolOutputActionOption"
self.column = int(self.column)
self.offset = elem.get("offset", -1)
self.offset = int(self.offset)
self.separator = elem.get("separator", "\t")
self.options = []
data_file = self.name
if not os.path.isabs(data_file):
data_file = os.path.join(self.tool.app.config.tool_data_path, data_file)
for line in open(data_file):
self.options.append(line.rstrip("\n\r").split(self.separator))
[docs] def get_value(self, other_values):
options = self.options
for filter in self.filters:
options = filter.filter_options(options, other_values)
try:
if options:
return str(options[self.offset][self.column])
except Exception as e:
log.debug(f"Error in FromFileToolOutputActionOption get_value: {e}")
return None
[docs]class FromParamToolOutputActionOption(ToolOutputActionOption):
tag = "from_param"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.name = elem.get("name", None)
assert self.name is not None, "Required 'name' attribute missing from FromFileToolOutputActionOption"
self.name = self.name.split(".")
self.column = elem.get("column", 0)
self.column = int(self.column)
self.offset = elem.get("offset", -1)
self.offset = int(self.offset)
self.param_attribute = elem.get("param_attribute", [])
if self.param_attribute:
self.param_attribute = self.param_attribute.split(".")
[docs] def get_value(self, other_values):
value = other_values
for ref_name in self.name:
assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values"
value = value.get(ref_name)
for attr_name in self.param_attribute:
# if the value is a list from a repeat tag you can access the first element of the repeat with
# artifical 'first' attribute_name. For example: .. param_attribute="first.input_mate1.ext"
if isinstance(value, list) and attr_name == "first":
value = value[0]
elif isinstance(value, dict):
value = value[attr_name]
elif hasattr(value, "collection"):
# if this is an HDCA for instance let reverse.ext grab
# the reverse element and then continue for loop to grab
# dataset extension
try:
value = value.collection[attr_name].element_object
except KeyError:
value = value.child_collection[attr_name].element_object
else:
value = getattr(value, attr_name)
options = [[str(value)]]
for filter in self.filters:
options = filter.filter_options(options, other_values)
try:
if options:
return str(options[self.offset][self.column])
except Exception as e:
log.debug(f"Error in FromParamToolOutputActionOption get_value: {e}")
return None
[docs]class FromDataTableOutputActionOption(ToolOutputActionOption):
tag = "from_data_table"
# TODO: allow accessing by column 'name' not just index
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.name = elem.get("name", None)
assert self.name is not None, "Required 'name' attribute missing from FromDataTableOutputActionOption"
self.missing_tool_data_table_name = None
if self.name in self.tool.app.tool_data_tables:
self.options = self.tool.app.tool_data_tables[self.name].get_fields()
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from FromDataTableOutputActionOption"
self.column = int(self.column)
self.offset = elem.get("offset", -1)
self.offset = int(self.offset)
else:
self.options = []
self.missing_tool_data_table_name = self.name
[docs] def get_value(self, other_values):
if self.options:
options = self.options
else:
options = []
for filter in self.filters:
options = filter.filter_options(options, other_values)
try:
if options:
return str(options[self.offset][self.column])
except Exception as e:
log.debug(f"Error in FromDataTableOutputActionOption get_value: {e}")
return None
[docs]class MetadataToolOutputAction(ToolOutputAction):
tag = "metadata"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.name = elem.get("name", None)
assert self.name is not None, "Required 'name' attribute missing from MetadataToolOutputAction"
[docs] def apply_action(self, output_dataset, other_values) -> None:
value = self.option.get_value(other_values)
# TODO: figure out correct type based on MetadataElementSpec,
# but MetadataElementSpec doesn't actually define a type (but it should).
# That would avoid the ad-hoc comma splitting here for defaults.
if self.default:
# For historical reasons the default value takes preference over value,
# and we only treat the default value as potentially containing cheetah.
if fill_template is not None:
value = fill_template(
self.default,
context=other_values,
python_template_version=other_values.get("__python_template_version__"),
).split(",")
else:
# fallback when Cheetah not available, equivalent to how this was handled prior 23.0
# definitely not needed for CWL tool parsing
log.warning("Cheetah not installed, falling back to legacy 'apply_action' behavior.") # type: ignore[unreachable, unused-ignore]
value = self.default
if value is not None:
setattr(output_dataset.metadata, self.name, value)
[docs]class FormatToolOutputAction(ToolOutputAction):
tag = "format"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.default = elem.get("default", None)
[docs] def apply_action(self, output_dataset, other_values) -> None:
value = self.option.get_value(other_values)
if value is None and self.default is not None:
value = self.default
if value is not None:
output_dataset.extension = value
[docs]class ToolOutputActionOptionFilter:
tag = "filter"
[docs] @classmethod
def from_elem(cls, parent, elem):
"""Loads the proper action by the type attribute of elem"""
filter_type = elem.get("type", None)
assert filter_type is not None, "Required 'type' attribute missing from ToolOutputActionOptionFilter"
return filter_types[filter_type](parent, elem)
@property
def tool(self):
return self.parent.tool
[docs]class ParamValueToolOutputActionOptionFilter(ToolOutputActionOptionFilter):
tag = "param_value"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.ref = elem.get("ref", None)
if self.ref:
self.ref = self.ref.split(".")
self.value = elem.get("value", None)
assert (
self.ref != self.value
), "Required 'ref' or 'value' attribute missing from ParamValueToolOutputActionOptionFilter"
self.column = elem.get("column", None)
assert (
self.column is not None
), "Required 'column' attribute missing from ParamValueToolOutputActionOptionFilter"
self.column = int(self.column)
self.keep = util.string_as_bool(elem.get("keep", "True"))
self.compare = parse_compare_type(elem.get("compare", None))
self.cast = parse_cast_attribute(elem.get("cast", None))
self.param_attribute = elem.get("param_attribute", [])
if self.param_attribute:
self.param_attribute = self.param_attribute.split(".")
[docs] def filter_options(self, options, other_values):
if self.ref:
# find ref value
value = other_values
for ref_name in self.ref:
assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values"
value = value.get(ref_name)
for attr_name in self.param_attribute:
value = getattr(value, attr_name)
value = str(value)
else:
value = self.value
value = self.cast(value)
rval = []
for fields in options:
try:
if self.keep == (self.compare(self.cast(fields[self.column]), value)):
rval.append(fields)
except Exception as e:
log.debug(e)
continue # likely a bad cast or column out of range
return rval
[docs]class InsertColumnToolOutputActionOptionFilter(ToolOutputActionOptionFilter):
tag = "insert_column"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.ref = elem.get("ref", None)
if self.ref:
self.ref = self.ref.split(".")
self.value = elem.get("value", None)
assert (
self.ref != self.value
), "Required 'ref' or 'value' attribute missing from InsertColumnToolOutputActionOptionFilter"
self.column = elem.get("column", None) # None is append
if self.column:
self.column = int(self.column)
# TODO not supported in xsd
self.iterate = util.string_as_bool(elem.get("iterate", "False"))
[docs] def filter_options(self, options, other_values):
if self.ref:
# find ref value
value = other_values
for ref_name in self.ref:
assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values"
value = value.get(ref_name)
value = str(value)
else:
value = self.value
if self.iterate:
value = int(value)
rval = []
for fields in options:
if self.column is None:
rval.append(fields + [str(value)])
else:
fields = list(fields)
fields.insert(self.column, str(value))
rval.append(fields)
if self.iterate:
value += 1
return rval
[docs]class MultipleSplitterFilter(ToolOutputActionOptionFilter):
tag = "multiple_splitter"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from MultipleSplitterFilter"
self.column = int(self.column)
self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, other_values):
rval = []
for fields in options:
for field in fields[self.column].split(self.separator):
rval.append(fields[0 : self.column] + [field] + fields[self.column + 1 :])
return rval
[docs]class ColumnStripFilter(ToolOutputActionOptionFilter):
tag = "column_strip"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from ColumnStripFilter"
self.column = int(self.column)
self.strip = elem.get("strip", None)
[docs] def filter_options(self, options, other_values):
rval = []
for fields in options:
rval.append(fields[0 : self.column] + [fields[self.column].strip(self.strip)] + fields[self.column + 1 :])
return rval
[docs]class ColumnReplaceFilter(ToolOutputActionOptionFilter):
tag = "column_replace"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.old_column = elem.get("old_column", None)
self.old_value = elem.get("old_value", None)
self.new_value = elem.get("new_value", None)
self.new_column = elem.get("new_column", None)
assert bool(self.old_column) ^ bool(self.old_value) and bool(self.new_column) ^ bool(
self.new_value
), "Required 'old_column' or 'old_value' and 'new_column' or 'new_value' attribute missing from ColumnReplaceFilter"
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from ColumnReplaceFilter"
self.column = int(self.column)
if self.old_column is not None:
self.old_column = int(self.old_column)
if self.new_column is not None:
self.new_column = int(self.new_column)
[docs] def filter_options(self, options, other_values):
rval = []
for fields in options:
if self.old_column:
old_value = fields[self.old_column]
else:
old_value = self.old_value
if self.new_column:
new_value = fields[self.new_column]
else:
new_value = self.new_value
rval.append(
fields[0 : self.column]
+ [fields[self.column].replace(old_value, new_value)]
+ fields[self.column + 1 :]
)
return rval
[docs]class MetadataValueFilter(ToolOutputActionOptionFilter):
tag = "metadata_value"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.ref = elem.get("ref", None)
assert self.ref is not None, "Required 'ref' attribute missing from MetadataValueFilter"
self.ref = self.ref.split(".")
self.name = elem.get("name", None)
assert self.name is not None, "Required 'name' attribute missing from MetadataValueFilter"
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from MetadataValueFilter"
self.column = int(self.column)
self.keep = util.string_as_bool(elem.get("keep", "True"))
self.compare = parse_compare_type(elem.get("compare", None))
[docs] def filter_options(self, options, other_values):
ref = other_values
for ref_name in self.ref:
assert ref_name in ref, f"Required dependency '{ref_name}' not found in incoming values"
ref = ref.get(ref_name)
value = str(getattr(ref.metadata, self.name))
rval = []
for fields in options:
if self.keep == (self.compare(fields[self.column], value)):
rval.append(fields)
return rval
[docs]class BooleanFilter(ToolOutputActionOptionFilter):
tag = "boolean"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from BooleanFilter"
self.column = int(self.column)
self.keep = util.string_as_bool(elem.get("keep", "True"))
self.cast = parse_cast_attribute(elem.get("cast", None))
[docs] def filter_options(self, options, other_values):
rval = []
for fields in options:
try:
value = fields[self.column]
value = self.cast(value)
except Exception:
value = False # unable to cast or access value; treat as false
if self.keep == bool(value):
rval.append(fields)
return rval
[docs]class StringFunctionFilter(ToolOutputActionOptionFilter):
tag = "string_function"
[docs] def __init__(self, parent, elem):
super().__init__(parent, elem)
self.column = elem.get("column", None)
assert self.column is not None, "Required 'column' attribute missing from StringFunctionFilter"
self.column = int(self.column)
self.function = elem.get("name", None)
assert self.function in [
"lower",
"upper",
], "Required function 'name' missing or invalid from StringFunctionFilter" # add function names as needed
self.function = getattr(str, self.function)
[docs] def filter_options(self, options, other_values):
rval = []
for fields in options:
rval.append(fields[0 : self.column] + [self.function(fields[self.column])] + fields[self.column + 1 :])
return rval
# tag to class lookups
action_types = {}
for action_type in [MetadataToolOutputAction, FormatToolOutputAction]:
action_types[action_type.tag] = action_type
option_types = {}
for option_type in [
NullToolOutputActionOption,
FromFileToolOutputActionOption,
FromParamToolOutputActionOption,
FromDataTableOutputActionOption,
]:
option_types[option_type.tag] = option_type
filter_types = {}
for filter_type in [
ParamValueToolOutputActionOptionFilter,
InsertColumnToolOutputActionOptionFilter,
MultipleSplitterFilter,
ColumnStripFilter,
MetadataValueFilter,
BooleanFilter,
StringFunctionFilter,
ColumnReplaceFilter,
]:
filter_types[filter_type.tag] = filter_type
# helper classes
# determine cast function
# TODO add float
[docs]def parse_cast_attribute(cast):
if cast == "string_as_bool":
cast = util.string_as_bool
elif cast == "int":
cast = int
elif cast == "str":
cast = str
else:
# return value as-is
def cast(x):
return x
return cast
# comparison
[docs]def parse_compare_type(compare):
if compare is None:
compare = "eq"
assert compare in compare_types, f"Invalid compare type specified: {compare}"
return compare_types[compare]
[docs]def compare_re_search(value1, value2):
# checks pattern=value2 in value1
return bool(re.search(value2, value1))
compare_types = {
"eq": compare_eq,
"neq": compare_neq,
"gt": compare_gt,
"gte": compare_gte,
"lt": compare_lt,
"lte": compare_lte,
"in": compare_in,
"startswith": compare_startswith,
"endswith": compare_endswith,
"re_search": compare_re_search,
}