Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.parser.output_actions

"""
Support for dynamically modifying output attributes.
"""

import logging
import os.path
import re

from galaxy import util

try:
    from galaxy.util.template import fill_template
except ImportError:
    fill_template = None  # type: ignore[assignment]

log = logging.getLogger(__name__)


[docs]class ToolOutputActionGroup: """ Manages a set of tool output dataset actions directives """ tag = "group"
[docs] def __init__(self, parent, config_elem): self.parent = parent self.actions = [] if config_elem is not None: for elem in config_elem: if elem.tag == "conditional": self.actions.append(ToolOutputActionConditional(self, elem)) elif elem.tag == "action": self.actions.append(ToolOutputAction.from_elem(self, elem)) else: log.debug(f"Unknown ToolOutputAction tag specified: {elem.tag}")
[docs] def apply_action(self, output_dataset, other_values) -> None: for action in self.actions: action.apply_action(output_dataset, other_values)
@property def tool(self): return self.parent.tool def __len__(self): return len(self.actions)
[docs]class ToolOutputActionConditionalWhen(ToolOutputActionGroup): tag = "when"
[docs] @classmethod def from_elem(cls, parent, when_elem): """Loads the proper when by attributes of elem""" when_value = when_elem.get("value", None) if when_value is not None: return ValueToolOutputActionConditionalWhen(parent, when_elem, when_value) else: when_value = when_elem.get("datatype_isinstance", None) if when_value is not None: return DatatypeIsInstanceToolOutputActionConditionalWhen(parent, when_elem, when_value) raise TypeError("When type not implemented")
[docs] def __init__(self, parent, config_elem, value): super().__init__(parent, config_elem) self.value = value
[docs] def is_case(self, other_values): raise TypeError("Not implemented")
[docs] def get_ref(self, other_values): ref = other_values for ref_name in self.parent.name: assert ref_name in ref, f"Required dependency '{ref_name}' not found in incoming values" ref = ref.get(ref_name) return ref
[docs] def apply_action(self, output_dataset, other_values) -> None: if self.is_case(other_values): super().apply_action(output_dataset, other_values)
[docs]class ValueToolOutputActionConditionalWhen(ToolOutputActionConditionalWhen): tag = "when value"
[docs] def is_case(self, other_values) -> bool: ref = self.get_ref(other_values) return ref == self.value
[docs]class DatatypeIsInstanceToolOutputActionConditionalWhen(ToolOutputActionConditionalWhen): tag = "when datatype_isinstance"
[docs] def __init__(self, parent, config_elem, value): super().__init__(parent, config_elem, value) self.value = type(self.tool.app.datatypes_registry.get_datatype_by_extension(value))
[docs] def is_case(self, other_values) -> bool: ref = self.get_ref(other_values) return isinstance(ref.datatype, self.value)
[docs]class ToolOutputActionConditional: tag = "conditional"
[docs] def __init__(self, parent, config_elem): self.parent = parent self.name = config_elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from ToolOutputActionConditional" self.name = self.name.split(".") self.cases = [] for when_elem in config_elem.findall("when"): self.cases.append(ToolOutputActionConditionalWhen.from_elem(self, when_elem))
[docs] def apply_action(self, output_dataset, other_values) -> None: for case in self.cases: case.apply_action(output_dataset, other_values)
@property def tool(self): return self.parent.tool
[docs]class ToolOutputAction: tag = "action"
[docs] @classmethod def from_elem(cls, parent, elem): """Loads the proper action by the type attribute of elem""" action_type = elem.get("type", None) assert action_type is not None, "Required 'type' attribute missing from ToolOutputAction" return action_types[action_type](parent, elem)
[docs] def __init__(self, parent, elem): self.parent = parent self.default = elem.get("default", None) option_elem = elem.find("option") self.option = ToolOutputActionOption.from_elem(self, option_elem)
[docs] def apply_action(self, output_dataset, other_values): raise TypeError("Not implemented")
@property def tool(self): return self.parent.tool
[docs]class ToolOutputActionOption: tag = "object"
[docs] @classmethod def from_elem(cls, parent, elem): """Loads the proper action by the type attribute of elem""" if elem is None: option_type = ( NullToolOutputActionOption.tag ) # no ToolOutputActionOption's have been defined, use implicit NullToolOutputActionOption else: option_type = elem.get("type", None) assert option_type is not None, "Required 'type' attribute missing from ToolOutputActionOption" return option_types[option_type](parent, elem)
[docs] def __init__(self, parent, elem): self.parent = parent self.filters = [] if elem is not None: for filter_elem in elem.findall("filter"): self.filters.append(ToolOutputActionOptionFilter.from_elem(self, filter_elem))
[docs] def get_value(self, other_values): raise TypeError("Not implemented")
@property def tool(self): return self.parent.tool
[docs]class NullToolOutputActionOption(ToolOutputActionOption): tag = "null_option"
[docs] def get_value(self, other_values): return None
[docs]class FromFileToolOutputActionOption(ToolOutputActionOption): tag = "from_file"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from FromFileToolOutputActionOption" self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from FromFileToolOutputActionOption" self.column = int(self.column) self.offset = elem.get("offset", -1) self.offset = int(self.offset) self.separator = elem.get("separator", "\t") self.options = [] data_file = self.name if not os.path.isabs(data_file): data_file = os.path.join(self.tool.app.config.tool_data_path, data_file) for line in open(data_file): self.options.append(line.rstrip("\n\r").split(self.separator))
[docs] def get_value(self, other_values): options = self.options for filter in self.filters: options = filter.filter_options(options, other_values) try: if options: return str(options[self.offset][self.column]) except Exception as e: log.debug(f"Error in FromFileToolOutputActionOption get_value: {e}") return None
[docs]class FromParamToolOutputActionOption(ToolOutputActionOption): tag = "from_param"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from FromFileToolOutputActionOption" self.name = self.name.split(".") self.column = elem.get("column", 0) self.column = int(self.column) self.offset = elem.get("offset", -1) self.offset = int(self.offset) self.param_attribute = elem.get("param_attribute", []) if self.param_attribute: self.param_attribute = self.param_attribute.split(".")
[docs] def get_value(self, other_values): value = other_values for ref_name in self.name: assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values" value = value.get(ref_name) for attr_name in self.param_attribute: # if the value is a list from a repeat tag you can access the first element of the repeat with # artifical 'first' attribute_name. For example: .. param_attribute="first.input_mate1.ext" if isinstance(value, list) and attr_name == "first": value = value[0] elif isinstance(value, dict): value = value[attr_name] elif hasattr(value, "collection"): # if this is an HDCA for instance let reverse.ext grab # the reverse element and then continue for loop to grab # dataset extension try: value = value.collection[attr_name].element_object except KeyError: value = value.child_collection[attr_name].element_object else: value = getattr(value, attr_name) options = [[str(value)]] for filter in self.filters: options = filter.filter_options(options, other_values) try: if options: return str(options[self.offset][self.column]) except Exception as e: log.debug(f"Error in FromParamToolOutputActionOption get_value: {e}") return None
[docs]class FromDataTableOutputActionOption(ToolOutputActionOption): tag = "from_data_table" # TODO: allow accessing by column 'name' not just index
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from FromDataTableOutputActionOption" self.missing_tool_data_table_name = None if self.name in self.tool.app.tool_data_tables: self.options = self.tool.app.tool_data_tables[self.name].get_fields() self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from FromDataTableOutputActionOption" self.column = int(self.column) self.offset = elem.get("offset", -1) self.offset = int(self.offset) else: self.options = [] self.missing_tool_data_table_name = self.name
[docs] def get_value(self, other_values): if self.options: options = self.options else: options = [] for filter in self.filters: options = filter.filter_options(options, other_values) try: if options: return str(options[self.offset][self.column]) except Exception as e: log.debug(f"Error in FromDataTableOutputActionOption get_value: {e}") return None
[docs]class MetadataToolOutputAction(ToolOutputAction): tag = "metadata"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from MetadataToolOutputAction"
[docs] def apply_action(self, output_dataset, other_values) -> None: value = self.option.get_value(other_values) # TODO: figure out correct type based on MetadataElementSpec, # but MetadataElementSpec doesn't actually define a type (but it should). # That would avoid the ad-hoc comma splitting here for defaults. if self.default: # For historical reasons the default value takes preference over value, # and we only treat the default value as potentially containing cheetah. if fill_template is not None: value = fill_template( self.default, context=other_values, python_template_version=other_values.get("__python_template_version__"), ).split(",") else: # fallback when Cheetah not available, equivalent to how this was handled prior 23.0 # definitely not needed for CWL tool parsing log.warning("Cheetah not installed, falling back to legacy 'apply_action' behavior.") # type: ignore[unreachable] value = self.default if value is not None: setattr(output_dataset.metadata, self.name, value)
[docs]class FormatToolOutputAction(ToolOutputAction): tag = "format"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.default = elem.get("default", None)
[docs] def apply_action(self, output_dataset, other_values) -> None: value = self.option.get_value(other_values) if value is None and self.default is not None: value = self.default if value is not None: output_dataset.extension = value
[docs]class ToolOutputActionOptionFilter: tag = "filter"
[docs] @classmethod def from_elem(cls, parent, elem): """Loads the proper action by the type attribute of elem""" filter_type = elem.get("type", None) assert filter_type is not None, "Required 'type' attribute missing from ToolOutputActionOptionFilter" return filter_types[filter_type](parent, elem)
[docs] def __init__(self, parent, elem): self.parent = parent
[docs] def filter_options(self, options, other_values): raise TypeError("Not implemented")
@property def tool(self): return self.parent.tool
[docs]class ParamValueToolOutputActionOptionFilter(ToolOutputActionOptionFilter): tag = "param_value"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.ref = elem.get("ref", None) if self.ref: self.ref = self.ref.split(".") self.value = elem.get("value", None) assert ( self.ref != self.value ), "Required 'ref' or 'value' attribute missing from ParamValueToolOutputActionOptionFilter" self.column = elem.get("column", None) assert ( self.column is not None ), "Required 'column' attribute missing from ParamValueToolOutputActionOptionFilter" self.column = int(self.column) self.keep = util.string_as_bool(elem.get("keep", "True")) self.compare = parse_compare_type(elem.get("compare", None)) self.cast = parse_cast_attribute(elem.get("cast", None)) self.param_attribute = elem.get("param_attribute", []) if self.param_attribute: self.param_attribute = self.param_attribute.split(".")
[docs] def filter_options(self, options, other_values): if self.ref: # find ref value value = other_values for ref_name in self.ref: assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values" value = value.get(ref_name) for attr_name in self.param_attribute: value = getattr(value, attr_name) value = str(value) else: value = self.value value = self.cast(value) rval = [] for fields in options: try: if self.keep == (self.compare(self.cast(fields[self.column]), value)): rval.append(fields) except Exception as e: log.debug(e) continue # likely a bad cast or column out of range return rval
[docs]class InsertColumnToolOutputActionOptionFilter(ToolOutputActionOptionFilter): tag = "insert_column"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.ref = elem.get("ref", None) if self.ref: self.ref = self.ref.split(".") self.value = elem.get("value", None) assert ( self.ref != self.value ), "Required 'ref' or 'value' attribute missing from InsertColumnToolOutputActionOptionFilter" self.column = elem.get("column", None) # None is append if self.column: self.column = int(self.column) # TODO not supported in xsd self.iterate = util.string_as_bool(elem.get("iterate", "False"))
[docs] def filter_options(self, options, other_values): if self.ref: # find ref value value = other_values for ref_name in self.ref: assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values" value = value.get(ref_name) value = str(value) else: value = self.value if self.iterate: value = int(value) rval = [] for fields in options: if self.column is None: rval.append(fields + [str(value)]) else: fields = list(fields) fields.insert(self.column, str(value)) rval.append(fields) if self.iterate: value += 1 return rval
[docs]class MultipleSplitterFilter(ToolOutputActionOptionFilter): tag = "multiple_splitter"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from MultipleSplitterFilter" self.column = int(self.column) self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: for field in fields[self.column].split(self.separator): rval.append(fields[0 : self.column] + [field] + fields[self.column + 1 :]) return rval
[docs]class ColumnStripFilter(ToolOutputActionOptionFilter): tag = "column_strip"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from ColumnStripFilter" self.column = int(self.column) self.strip = elem.get("strip", None)
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: rval.append(fields[0 : self.column] + [fields[self.column].strip(self.strip)] + fields[self.column + 1 :]) return rval
[docs]class ColumnReplaceFilter(ToolOutputActionOptionFilter): tag = "column_replace"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.old_column = elem.get("old_column", None) self.old_value = elem.get("old_value", None) self.new_value = elem.get("new_value", None) self.new_column = elem.get("new_column", None) assert bool(self.old_column) ^ bool(self.old_value) and bool(self.new_column) ^ bool( self.new_value ), "Required 'old_column' or 'old_value' and 'new_column' or 'new_value' attribute missing from ColumnReplaceFilter" self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from ColumnReplaceFilter" self.column = int(self.column) if self.old_column is not None: self.old_column = int(self.old_column) if self.new_column is not None: self.new_column = int(self.new_column)
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: if self.old_column: old_value = fields[self.old_column] else: old_value = self.old_value if self.new_column: new_value = fields[self.new_column] else: new_value = self.new_value rval.append( fields[0 : self.column] + [fields[self.column].replace(old_value, new_value)] + fields[self.column + 1 :] ) return rval
[docs]class MetadataValueFilter(ToolOutputActionOptionFilter): tag = "metadata_value"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.ref = elem.get("ref", None) assert self.ref is not None, "Required 'ref' attribute missing from MetadataValueFilter" self.ref = self.ref.split(".") self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from MetadataValueFilter" self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from MetadataValueFilter" self.column = int(self.column) self.keep = util.string_as_bool(elem.get("keep", "True")) self.compare = parse_compare_type(elem.get("compare", None))
[docs] def filter_options(self, options, other_values): ref = other_values for ref_name in self.ref: assert ref_name in ref, f"Required dependency '{ref_name}' not found in incoming values" ref = ref.get(ref_name) value = str(getattr(ref.metadata, self.name)) rval = [] for fields in options: if self.keep == (self.compare(fields[self.column], value)): rval.append(fields) return rval
[docs]class BooleanFilter(ToolOutputActionOptionFilter): tag = "boolean"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from BooleanFilter" self.column = int(self.column) self.keep = util.string_as_bool(elem.get("keep", "True")) self.cast = parse_cast_attribute(elem.get("cast", None))
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: try: value = fields[self.column] value = self.cast(value) except Exception: value = False # unable to cast or access value; treat as false if self.keep == bool(value): rval.append(fields) return rval
[docs]class StringFunctionFilter(ToolOutputActionOptionFilter): tag = "string_function"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from StringFunctionFilter" self.column = int(self.column) self.function = elem.get("name", None) assert self.function in [ "lower", "upper", ], "Required function 'name' missing or invalid from StringFunctionFilter" # add function names as needed self.function = getattr(str, self.function)
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: rval.append(fields[0 : self.column] + [self.function(fields[self.column])] + fields[self.column + 1 :]) return rval
# tag to class lookups action_types = {} for action_type in [MetadataToolOutputAction, FormatToolOutputAction]: action_types[action_type.tag] = action_type option_types = {} for option_type in [ NullToolOutputActionOption, FromFileToolOutputActionOption, FromParamToolOutputActionOption, FromDataTableOutputActionOption, ]: option_types[option_type.tag] = option_type filter_types = {} for filter_type in [ ParamValueToolOutputActionOptionFilter, InsertColumnToolOutputActionOptionFilter, MultipleSplitterFilter, ColumnStripFilter, MetadataValueFilter, BooleanFilter, StringFunctionFilter, ColumnReplaceFilter, ]: filter_types[filter_type.tag] = filter_type # helper classes # determine cast function # TODO add float
[docs]def parse_cast_attribute(cast): if cast == "string_as_bool": cast = util.string_as_bool elif cast == "int": cast = int elif cast == "str": cast = str else: # return value as-is def cast(x): return x return cast
# comparison
[docs]def parse_compare_type(compare): if compare is None: compare = "eq" assert compare in compare_types, f"Invalid compare type specified: {compare}" return compare_types[compare]
[docs]def compare_eq(value1, value2): return value1 == value2
[docs]def compare_neq(value1, value2): return value1 != value2
[docs]def compare_gt(value1, value2): return value1 > value2
[docs]def compare_gte(value1, value2): return value1 >= value2
[docs]def compare_lt(value1, value2): return value1 < value2
[docs]def compare_lte(value1, value2): return value1 <= value2
[docs]def compare_in(value1, value2): return value1 in value2
[docs]def compare_startswith(value1, value2): return value1.startswith(value2)
[docs]def compare_endswith(value1, value2): return value1.endswith(value2)
compare_types = { "eq": compare_eq, "neq": compare_neq, "gt": compare_gt, "gte": compare_gte, "lt": compare_lt, "lte": compare_lte, "in": compare_in, "startswith": compare_startswith, "endswith": compare_endswith, "re_search": compare_re_search, }