Source code for galaxy.tool_util.parser.output_actions

"""
Support for dynamically modifying output attributes.
"""

import logging
import os.path
import re

from galaxy import util

try:
    from galaxy.util.template import fill_template
except ImportError:
    fill_template = None  # type: ignore[assignment]

log = logging.getLogger(__name__)


[docs]class ToolOutputActionGroup: """ Manages a set of tool output dataset actions directives """ tag = "group"
[docs] def __init__(self, parent, config_elem): self.parent = parent self.actions = [] if config_elem is not None: for elem in config_elem: if elem.tag == "conditional": self.actions.append(ToolOutputActionConditional(self, elem)) elif elem.tag == "action": self.actions.append(ToolOutputAction.from_elem(self, elem)) else: log.debug(f"Unknown ToolOutputAction tag specified: {elem.tag}")
[docs] def apply_action(self, output_dataset, other_values) -> None: for action in self.actions: action.apply_action(output_dataset, other_values)
@property def tool(self): return self.parent.tool def __len__(self): return len(self.actions)
[docs]class ToolOutputActionConditionalWhen(ToolOutputActionGroup): tag = "when"
[docs] @classmethod def from_elem(cls, parent, when_elem): """Loads the proper when by attributes of elem""" when_value = when_elem.get("value", None) if when_value is not None: return ValueToolOutputActionConditionalWhen(parent, when_elem, when_value) else: when_value = when_elem.get("datatype_isinstance", None) if when_value is not None: return DatatypeIsInstanceToolOutputActionConditionalWhen(parent, when_elem, when_value) raise TypeError("When type not implemented")
[docs] def __init__(self, parent, config_elem, value): super().__init__(parent, config_elem) self.value = value
[docs] def is_case(self, other_values): raise TypeError("Not implemented")
[docs] def get_ref(self, other_values): ref = other_values for ref_name in self.parent.name: assert ref_name in ref, f"Required dependency '{ref_name}' not found in incoming values" ref = ref.get(ref_name) return ref
[docs] def apply_action(self, output_dataset, other_values) -> None: if self.is_case(other_values): super().apply_action(output_dataset, other_values)
[docs]class ValueToolOutputActionConditionalWhen(ToolOutputActionConditionalWhen): tag = "when value"
[docs] def is_case(self, other_values) -> bool: ref = self.get_ref(other_values) return ref == self.value
[docs]class DatatypeIsInstanceToolOutputActionConditionalWhen(ToolOutputActionConditionalWhen): tag = "when datatype_isinstance"
[docs] def __init__(self, parent, config_elem, value): super().__init__(parent, config_elem, value) self.value = type(self.tool.app.datatypes_registry.get_datatype_by_extension(value))
[docs] def is_case(self, other_values) -> bool: ref = self.get_ref(other_values) return isinstance(ref.datatype, self.value)
[docs]class ToolOutputActionConditional: tag = "conditional"
[docs] def __init__(self, parent, config_elem): self.parent = parent self.name = config_elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from ToolOutputActionConditional" self.name = self.name.split(".") self.cases = [] for when_elem in config_elem.findall("when"): self.cases.append(ToolOutputActionConditionalWhen.from_elem(self, when_elem))
[docs] def apply_action(self, output_dataset, other_values) -> None: for case in self.cases: case.apply_action(output_dataset, other_values)
@property def tool(self): return self.parent.tool
[docs]class ToolOutputAction: tag = "action"
[docs] @classmethod def from_elem(cls, parent, elem): """Loads the proper action by the type attribute of elem""" action_type = elem.get("type", None) assert action_type is not None, "Required 'type' attribute missing from ToolOutputAction" return action_types[action_type](parent, elem)
[docs] def __init__(self, parent, elem): self.parent = parent self.default = elem.get("default", None) option_elem = elem.find("option") self.option = ToolOutputActionOption.from_elem(self, option_elem)
[docs] def apply_action(self, output_dataset, other_values): raise TypeError("Not implemented")
@property def tool(self): return self.parent.tool
[docs]class ToolOutputActionOption: tag = "object"
[docs] @classmethod def from_elem(cls, parent, elem): """Loads the proper action by the type attribute of elem""" if elem is None: option_type = ( NullToolOutputActionOption.tag ) # no ToolOutputActionOption's have been defined, use implicit NullToolOutputActionOption else: option_type = elem.get("type", None) assert option_type is not None, "Required 'type' attribute missing from ToolOutputActionOption" return option_types[option_type](parent, elem)
[docs] def __init__(self, parent, elem): self.parent = parent self.filters = [] if elem is not None: for filter_elem in elem.findall("filter"): self.filters.append(ToolOutputActionOptionFilter.from_elem(self, filter_elem))
[docs] def get_value(self, other_values): raise TypeError("Not implemented")
@property def tool(self): return self.parent.tool
[docs]class NullToolOutputActionOption(ToolOutputActionOption): tag = "null_option"
[docs] def get_value(self, other_values): return None
[docs]class FromFileToolOutputActionOption(ToolOutputActionOption): tag = "from_file"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from FromFileToolOutputActionOption" self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from FromFileToolOutputActionOption" self.column = int(self.column) self.offset = elem.get("offset", -1) self.offset = int(self.offset) self.separator = elem.get("separator", "\t") self.options = [] data_file = self.name if not os.path.isabs(data_file): data_file = os.path.join(self.tool.app.config.tool_data_path, data_file) for line in open(data_file): self.options.append(line.rstrip("\n\r").split(self.separator))
[docs] def get_value(self, other_values): options = self.options for filter in self.filters: options = filter.filter_options(options, other_values) try: if options: return str(options[self.offset][self.column]) except Exception as e: log.debug(f"Error in FromFileToolOutputActionOption get_value: {e}") return None
[docs]class FromParamToolOutputActionOption(ToolOutputActionOption): tag = "from_param"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from FromFileToolOutputActionOption" self.name = self.name.split(".") self.column = elem.get("column", 0) self.column = int(self.column) self.offset = elem.get("offset", -1) self.offset = int(self.offset) self.param_attribute = elem.get("param_attribute", []) if self.param_attribute: self.param_attribute = self.param_attribute.split(".")
[docs] def get_value(self, other_values): value = other_values for ref_name in self.name: assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values" value = value.get(ref_name) for attr_name in self.param_attribute: # if the value is a list from a repeat tag you can access the first element of the repeat with # artifical 'first' attribute_name. For example: .. param_attribute="first.input_mate1.ext" if isinstance(value, list) and attr_name == "first": value = value[0] elif isinstance(value, dict): value = value[attr_name] elif hasattr(value, "collection"): # if this is an HDCA for instance let reverse.ext grab # the reverse element and then continue for loop to grab # dataset extension try: value = value.collection[attr_name].element_object except KeyError: value = value.child_collection[attr_name].element_object else: value = getattr(value, attr_name) options = [[str(value)]] for filter in self.filters: options = filter.filter_options(options, other_values) try: if options: return str(options[self.offset][self.column]) except Exception as e: log.debug(f"Error in FromParamToolOutputActionOption get_value: {e}") return None
[docs]class FromDataTableOutputActionOption(ToolOutputActionOption): tag = "from_data_table" # TODO: allow accessing by column 'name' not just index
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from FromDataTableOutputActionOption" self.missing_tool_data_table_name = None if self.name in self.tool.app.tool_data_tables: self.options = self.tool.app.tool_data_tables[self.name].get_fields() self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from FromDataTableOutputActionOption" self.column = int(self.column) self.offset = elem.get("offset", -1) self.offset = int(self.offset) else: self.options = [] self.missing_tool_data_table_name = self.name
[docs] def get_value(self, other_values): if self.options: options = self.options else: options = [] for filter in self.filters: options = filter.filter_options(options, other_values) try: if options: return str(options[self.offset][self.column]) except Exception as e: log.debug(f"Error in FromDataTableOutputActionOption get_value: {e}") return None
[docs]class MetadataToolOutputAction(ToolOutputAction): tag = "metadata"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from MetadataToolOutputAction"
[docs] def apply_action(self, output_dataset, other_values) -> None: value = self.option.get_value(other_values) # TODO: figure out correct type based on MetadataElementSpec, # but MetadataElementSpec doesn't actually define a type (but it should). # That would avoid the ad-hoc comma splitting here for defaults. if self.default: # For historical reasons the default value takes preference over value, # and we only treat the default value as potentially containing cheetah. if fill_template is not None: value = fill_template( self.default, context=other_values, python_template_version=other_values.get("__python_template_version__"), ).split(",") else: # fallback when Cheetah not available, equivalent to how this was handled prior 23.0 # definitely not needed for CWL tool parsing log.warning("Cheetah not installed, falling back to legacy 'apply_action' behavior.") # type: ignore[unreachable] value = self.default if value is not None: setattr(output_dataset.metadata, self.name, value)
[docs]class FormatToolOutputAction(ToolOutputAction): tag = "format"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.default = elem.get("default", None)
[docs] def apply_action(self, output_dataset, other_values) -> None: value = self.option.get_value(other_values) if value is None and self.default is not None: value = self.default if value is not None: output_dataset.extension = value
[docs]class ToolOutputActionOptionFilter: tag = "filter"
[docs] @classmethod def from_elem(cls, parent, elem): """Loads the proper action by the type attribute of elem""" filter_type = elem.get("type", None) assert filter_type is not None, "Required 'type' attribute missing from ToolOutputActionOptionFilter" return filter_types[filter_type](parent, elem)
[docs] def __init__(self, parent, elem): self.parent = parent
[docs] def filter_options(self, options, other_values): raise TypeError("Not implemented")
@property def tool(self): return self.parent.tool
[docs]class ParamValueToolOutputActionOptionFilter(ToolOutputActionOptionFilter): tag = "param_value"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.ref = elem.get("ref", None) if self.ref: self.ref = self.ref.split(".") self.value = elem.get("value", None) assert ( self.ref != self.value ), "Required 'ref' or 'value' attribute missing from ParamValueToolOutputActionOptionFilter" self.column = elem.get("column", None) assert ( self.column is not None ), "Required 'column' attribute missing from ParamValueToolOutputActionOptionFilter" self.column = int(self.column) self.keep = util.string_as_bool(elem.get("keep", "True")) self.compare = parse_compare_type(elem.get("compare", None)) self.cast = parse_cast_attribute(elem.get("cast", None)) self.param_attribute = elem.get("param_attribute", []) if self.param_attribute: self.param_attribute = self.param_attribute.split(".")
[docs] def filter_options(self, options, other_values): if self.ref: # find ref value value = other_values for ref_name in self.ref: assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values" value = value.get(ref_name) for attr_name in self.param_attribute: value = getattr(value, attr_name) value = str(value) else: value = self.value value = self.cast(value) rval = [] for fields in options: try: if self.keep == (self.compare(self.cast(fields[self.column]), value)): rval.append(fields) except Exception as e: log.debug(e) continue # likely a bad cast or column out of range return rval
[docs]class InsertColumnToolOutputActionOptionFilter(ToolOutputActionOptionFilter): tag = "insert_column"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.ref = elem.get("ref", None) if self.ref: self.ref = self.ref.split(".") self.value = elem.get("value", None) assert ( self.ref != self.value ), "Required 'ref' or 'value' attribute missing from InsertColumnToolOutputActionOptionFilter" self.column = elem.get("column", None) # None is append if self.column: self.column = int(self.column) # TODO not supported in xsd self.iterate = util.string_as_bool(elem.get("iterate", "False"))
[docs] def filter_options(self, options, other_values): if self.ref: # find ref value value = other_values for ref_name in self.ref: assert ref_name in value, f"Required dependency '{ref_name}' not found in incoming values" value = value.get(ref_name) value = str(value) else: value = self.value if self.iterate: value = int(value) rval = [] for fields in options: if self.column is None: rval.append(fields + [str(value)]) else: fields = list(fields) fields.insert(self.column, str(value)) rval.append(fields) if self.iterate: value += 1 return rval
[docs]class MultipleSplitterFilter(ToolOutputActionOptionFilter): tag = "multiple_splitter"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from MultipleSplitterFilter" self.column = int(self.column) self.separator = elem.get("separator", ",")
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: for field in fields[self.column].split(self.separator): rval.append(fields[0 : self.column] + [field] + fields[self.column + 1 :]) return rval
[docs]class ColumnStripFilter(ToolOutputActionOptionFilter): tag = "column_strip"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from ColumnStripFilter" self.column = int(self.column) self.strip = elem.get("strip", None)
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: rval.append(fields[0 : self.column] + [fields[self.column].strip(self.strip)] + fields[self.column + 1 :]) return rval
[docs]class ColumnReplaceFilter(ToolOutputActionOptionFilter): tag = "column_replace"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.old_column = elem.get("old_column", None) self.old_value = elem.get("old_value", None) self.new_value = elem.get("new_value", None) self.new_column = elem.get("new_column", None) assert bool(self.old_column) ^ bool(self.old_value) and bool(self.new_column) ^ bool( self.new_value ), "Required 'old_column' or 'old_value' and 'new_column' or 'new_value' attribute missing from ColumnReplaceFilter" self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from ColumnReplaceFilter" self.column = int(self.column) if self.old_column is not None: self.old_column = int(self.old_column) if self.new_column is not None: self.new_column = int(self.new_column)
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: if self.old_column: old_value = fields[self.old_column] else: old_value = self.old_value if self.new_column: new_value = fields[self.new_column] else: new_value = self.new_value rval.append( fields[0 : self.column] + [fields[self.column].replace(old_value, new_value)] + fields[self.column + 1 :] ) return rval
[docs]class MetadataValueFilter(ToolOutputActionOptionFilter): tag = "metadata_value"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.ref = elem.get("ref", None) assert self.ref is not None, "Required 'ref' attribute missing from MetadataValueFilter" self.ref = self.ref.split(".") self.name = elem.get("name", None) assert self.name is not None, "Required 'name' attribute missing from MetadataValueFilter" self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from MetadataValueFilter" self.column = int(self.column) self.keep = util.string_as_bool(elem.get("keep", "True")) self.compare = parse_compare_type(elem.get("compare", None))
[docs] def filter_options(self, options, other_values): ref = other_values for ref_name in self.ref: assert ref_name in ref, f"Required dependency '{ref_name}' not found in incoming values" ref = ref.get(ref_name) value = str(getattr(ref.metadata, self.name)) rval = [] for fields in options: if self.keep == (self.compare(fields[self.column], value)): rval.append(fields) return rval
[docs]class BooleanFilter(ToolOutputActionOptionFilter): tag = "boolean"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from BooleanFilter" self.column = int(self.column) self.keep = util.string_as_bool(elem.get("keep", "True")) self.cast = parse_cast_attribute(elem.get("cast", None))
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: try: value = fields[self.column] value = self.cast(value) except Exception: value = False # unable to cast or access value; treat as false if self.keep == bool(value): rval.append(fields) return rval
[docs]class StringFunctionFilter(ToolOutputActionOptionFilter): tag = "string_function"
[docs] def __init__(self, parent, elem): super().__init__(parent, elem) self.column = elem.get("column", None) assert self.column is not None, "Required 'column' attribute missing from StringFunctionFilter" self.column = int(self.column) self.function = elem.get("name", None) assert self.function in [ "lower", "upper", ], "Required function 'name' missing or invalid from StringFunctionFilter" # add function names as needed self.function = getattr(str, self.function)
[docs] def filter_options(self, options, other_values): rval = [] for fields in options: rval.append(fields[0 : self.column] + [self.function(fields[self.column])] + fields[self.column + 1 :]) return rval
# tag to class lookups action_types = {} for action_type in [MetadataToolOutputAction, FormatToolOutputAction]: action_types[action_type.tag] = action_type option_types = {} for option_type in [ NullToolOutputActionOption, FromFileToolOutputActionOption, FromParamToolOutputActionOption, FromDataTableOutputActionOption, ]: option_types[option_type.tag] = option_type filter_types = {} for filter_type in [ ParamValueToolOutputActionOptionFilter, InsertColumnToolOutputActionOptionFilter, MultipleSplitterFilter, ColumnStripFilter, MetadataValueFilter, BooleanFilter, StringFunctionFilter, ColumnReplaceFilter, ]: filter_types[filter_type.tag] = filter_type # helper classes # determine cast function # TODO add float
[docs]def parse_cast_attribute(cast): if cast == "string_as_bool": cast = util.string_as_bool elif cast == "int": cast = int elif cast == "str": cast = str else: # return value as-is def cast(x): return x return cast
# comparison
[docs]def parse_compare_type(compare): if compare is None: compare = "eq" assert compare in compare_types, f"Invalid compare type specified: {compare}" return compare_types[compare]
[docs]def compare_eq(value1, value2): return value1 == value2
[docs]def compare_neq(value1, value2): return value1 != value2
[docs]def compare_gt(value1, value2): return value1 > value2
[docs]def compare_gte(value1, value2): return value1 >= value2
[docs]def compare_lt(value1, value2): return value1 < value2
[docs]def compare_lte(value1, value2): return value1 <= value2
[docs]def compare_in(value1, value2): return value1 in value2
[docs]def compare_startswith(value1, value2): return value1.startswith(value2)
[docs]def compare_endswith(value1, value2): return value1.endswith(value2)
compare_types = { "eq": compare_eq, "neq": compare_neq, "gt": compare_gt, "gte": compare_gte, "lt": compare_lt, "lte": compare_lte, "in": compare_in, "startswith": compare_startswith, "endswith": compare_endswith, "re_search": compare_re_search, }