Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.rules_dsl

import abc
import itertools
import re
from typing import (
    List,
    Type,
)

import yaml

from galaxy.util.resources import resource_string


[docs]def get_rules_specification(): return yaml.safe_load(resource_string(__name__, "rules_dsl_spec.yml"))
def _ensure_rule_contains_keys(rule, keys): for key, instance_class in keys.items(): if key not in rule: raise ValueError(f"Rule of type [{rule['type']}] does not contain key [{key}].") value = rule[key] if not isinstance(value, instance_class): raise ValueError(f"Rule of type [{rule['type']}] does not contain correct value type for key [{key}].") def _ensure_key_value_in(rule, key, values): value = rule[key] if value not in values: raise ValueError(f"Invalid value [{value}] for [{key}] encountered.") def _ensure_valid_pattern(expression): re.compile(expression)
[docs]def apply_regex(regex, target, data, replacement=None, group_count=None): pattern = re.compile(regex) def new_row(row): source = row[target] if replacement is None: match = pattern.search(source) if not match: raise Exception(f"Problem applying regular expression [{regex}] to [{source}].") if group_count: if len(match.groups()) != group_count: raise Exception("Problem applying regular expression, wrong number of groups found.") result = row + list(match.groups()) else: result = row + [match.group(0)] else: result = row + [pattern.search(source).expand(replacement)] return result new_data = list(map(new_row, data)) return new_data
[docs]class BaseRuleDefinition(metaclass=abc.ABCMeta): @property @abc.abstractmethod def rule_type(self): """Short string describing type of rule (plugin class) to use."""
[docs] @abc.abstractmethod def validate_rule(self, rule): """Validate dictified rule definition of this type."""
[docs] @abc.abstractmethod def apply(self, rule, data, sources): """Apply validated, dictified rule definition to supplied data."""
[docs]class AddColumnMetadataRuleDefinition(BaseRuleDefinition): rule_type = "add_column_metadata"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources): rule_value = rule["value"] if rule_value.startswith("identifier"): identifier_index = int(rule_value[len("identifier") :]) new_rows = [] for index, row in enumerate(data): new_rows.append(row + [sources[index]["identifiers"][identifier_index]]) elif rule_value == "tags": def sorted_tags(index): tags = sorted(sources[index]["tags"]) return [",".join(tags)] new_rows = [] for index, row in enumerate(data): new_rows.append(row + sorted_tags(index)) return new_rows, sources
[docs]class AddColumnGroupTagValueRuleDefinition(BaseRuleDefinition): rule_type = "add_column_group_tag_value"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources): rule_value = rule["value"] tag_prefix = f"group:{rule_value}:" new_rows = [] for index, row in enumerate(data): group_tag_value = None source = sources[index] tags = source["tags"] for tag in sorted(tags): if tag.startswith(tag_prefix): group_tag_value = tag[len(tag_prefix) :] break if group_tag_value is None: group_tag_value = rule.get("default_value", "") new_rows.append(row + [group_tag_value]) return new_rows, sources
[docs]class AddColumnConcatenateRuleDefinition(BaseRuleDefinition): rule_type = "add_column_concatenate"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column_0": int, "target_column_1": int})
[docs] def apply(self, rule, data, sources): column_0 = rule["target_column_0"] column_1 = rule["target_column_1"] new_rows = [] for row in data: new_rows.append(row + [row[column_0] + row[column_1]]) return new_rows, sources
[docs]class AddColumnBasenameRuleDefinition(BaseRuleDefinition): rule_type = "add_column_basename"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column": int})
[docs] def apply(self, rule, data, sources): column = rule["target_column"] re = r"[^/]*$" return apply_regex(re, column, data), sources
[docs]class AddColumnRegexRuleDefinition(BaseRuleDefinition): rule_type = "add_column_regex"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column": int, "expression": str}) _ensure_valid_pattern(rule["expression"])
[docs] def apply(self, rule, data, sources): target = rule["target_column"] expression = rule["expression"] replacement = rule.get("replacement") group_count = rule.get("group_count") return apply_regex(expression, target, data, replacement, group_count), sources
[docs]class AddColumnRownumRuleDefinition(BaseRuleDefinition): rule_type = "add_column_rownum"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"start": int})
[docs] def apply(self, rule, data, sources): start = rule["start"] new_rows = [] for index, row in enumerate(data): new_rows.append(row + [f"{index + start}"]) return new_rows, sources
[docs]class AddColumnValueRuleDefinition(BaseRuleDefinition): rule_type = "add_column_value"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources): value = rule["value"] new_rows = [] for row in data: new_rows.append(row + [str(value)]) return new_rows, sources
[docs]class AddColumnSubstrRuleDefinition(BaseRuleDefinition): rule_type = "add_column_substr"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "length": int, "substr_type": str, }, ) _ensure_key_value_in(rule, "substr_type", ["keep_prefix", "drop_prefix", "keep_suffix", "drop_suffix"])
[docs] def apply(self, rule, data, sources): target = rule["target_column"] length = rule["length"] substr_type = rule["substr_type"] def new_row(row): original_value = row[target] start = 0 end = len(original_value) if substr_type == "keep_prefix": end = length elif substr_type == "drop_prefix": start = length elif substr_type == "keep_suffix": start = end - length if start < 0: start = 0 else: end = end - length if end < 0: end = 0 return row + [original_value[start:end]] return list(map(new_row, data)), sources
[docs]class RemoveColumnsRuleDefinition(BaseRuleDefinition): rule_type = "remove_columns"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_columns": list, }, )
[docs] def apply(self, rule, data, sources): target_columns = rule["target_columns"] def new_row(row): new = [] for index, val in enumerate(row): if index not in target_columns: new.append(val) return new return list(map(new_row, data)), sources
def _filter_index(func, iterable): result = [] for index, x in enumerate(iterable): if func(index): result.append(x) return result
[docs]class AddFilterRegexRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_regex"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "invert": bool, "expression": str, }, ) _ensure_valid_pattern(rule["expression"])
[docs] def apply(self, rule, data, sources): target_column = rule["target_column"] invert = rule["invert"] regex = rule["expression"] def _filter(index): row = data[index] val = row[target_column] pattern = re.compile(regex) return not invert if pattern.search(val) else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterCountRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_count"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "count": int, "invert": bool, "which": str, }, ) _ensure_key_value_in(rule, "which", ["first", "last"])
[docs] def apply(self, rule, data, sources): num_rows = len(data) invert = rule["invert"] n = rule["count"] which = rule["which"] def _filter(index): if which == "first": matches = index >= n else: matches = index < (num_rows - n) return not invert if matches else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterEmptyRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_empty"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column": int, "invert": bool})
[docs] def apply(self, rule, data, sources): invert = rule["invert"] target_column = rule["target_column"] def _filter(index): non_empty = len(data[index][target_column]) != 0 return not invert if non_empty else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterMatchesRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_matches"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "invert": bool, "value": str, }, )
[docs] def apply(self, rule, data, sources): invert = rule["invert"] target_column = rule["target_column"] value = rule["value"] def _filter(index): row = data[index] val = row[target_column] return not invert if val == value else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterCompareRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_compare"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "value": int, "compare_type": str, }, ) _ensure_key_value_in( rule, "compare_type", ["less_than", "less_than_equal", "greater_than", "greater_than_equal"] )
[docs] def apply(self, rule, data, sources): target_column = rule["target_column"] value = rule["value"] compare_type = rule["compare_type"] def _filter(index): row = data[index] target_value = float(row[target_column]) if compare_type == "less_than": matches = target_value < value elif compare_type == "less_than_equal": matches = target_value <= value elif compare_type == "greater_than": matches = target_value > value elif compare_type == "greater_than_equal": matches = target_value >= value return matches return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class SortRuleDefinition(BaseRuleDefinition): rule_type = "sort"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "numeric": bool, }, )
[docs] def apply(self, rule, data, sources): target = rule["target_column"] numeric = rule["numeric"] sortable = zip(data, sources) def sort_func(item): a_val = item[0][target] if numeric: a_val = float(a_val) return a_val sorted_data = sorted(sortable, key=sort_func) new_data = [] new_sources = [] for row, source in sorted_data: new_data.append(row) new_sources.append(source) return new_data, new_sources
[docs]class SwapColumnsRuleDefinition(BaseRuleDefinition): rule_type = "swap_columns"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column_0": int, "target_column_1": int, }, )
[docs] def apply(self, rule, data, sources): target_column_0 = rule["target_column_0"] target_column_1 = rule["target_column_1"] def new_row(row): row_copy = row[:] row_copy[target_column_0] = row[target_column_1] row_copy[target_column_1] = row[target_column_0] return row_copy return list(map(new_row, data)), sources
[docs]class SplitColumnsRuleDefinition(BaseRuleDefinition): rule_type = "split_columns"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_columns_0": list, "target_columns_1": list, }, )
[docs] def apply(self, rule, data, sources): target_columns_0 = rule["target_columns_0"] target_columns_1 = rule["target_columns_1"] def split_row(row): new_row_0 = [] new_row_1 = [] for index, el in enumerate(row): if index in target_columns_0: new_row_0.append(el) elif index in target_columns_1: new_row_1.append(el) else: new_row_0.append(el) new_row_1.append(el) return [new_row_0, new_row_1] data = flat_map(split_row, data) sources = flat_map(lambda x: [x, x], sources) return data, sources
[docs]def flat_map(f, items): return list(itertools.chain.from_iterable(map(f, items)))
[docs]class RuleSet:
[docs] def __init__(self, rule_set_as_dict): self.raw_rules = rule_set_as_dict["rules"] self.raw_mapping = rule_set_as_dict.get("mapping", [])
@property def rules(self): return self.raw_rules def _rules_with_definitions(self): for rule in self.raw_rules: yield (rule, RULES_DEFINITIONS[rule["type"]])
[docs] def apply(self, data, sources): for rule, rule_definition in self._rules_with_definitions(): rule_definition.validate_rule(rule) data, sources = rule_definition.apply(rule, data, sources) return data, sources
@property def has_errors(self): errored = False try: for rule, rule_definition in self._rules_with_definitions(): rule_definition.validate_rule(rule) except Exception: errored = True return errored @property def mapping_as_dict(self): as_dict = {} for mapping in self.raw_mapping: as_dict[mapping["type"]] = mapping return as_dict # Rest of this is generic, things here are Galaxy collection specific, think about about # subclass of RuleSet for collection creation. @property def identifier_columns(self): mapping_as_dict = self.mapping_as_dict identifier_columns = [] if "list_identifiers" in mapping_as_dict: identifier_columns.extend(mapping_as_dict["list_identifiers"]["columns"]) if "paired_identifier" in mapping_as_dict: identifier_columns.append(mapping_as_dict["paired_identifier"]["columns"][0]) return identifier_columns @property def collection_type(self): mapping_as_dict = self.mapping_as_dict list_columns = mapping_as_dict.get("list_identifiers", {"columns": []})["columns"] collection_type = ":".join("list" for c in list_columns) if "paired_identifier" in mapping_as_dict: if collection_type: collection_type += ":paired" else: collection_type = "paired" return collection_type @property def display(self): message = "Rules:\n" message += "".join(f"- {r}\n" for r in self.raw_rules) message += "Column Definitions:\n" message += "".join(f"- {m}\n" for m in self.raw_mapping) return message
RULES_DEFINITION_CLASSES: List[Type[BaseRuleDefinition]] = [ AddColumnMetadataRuleDefinition, AddColumnGroupTagValueRuleDefinition, AddColumnConcatenateRuleDefinition, AddColumnBasenameRuleDefinition, AddColumnRegexRuleDefinition, AddColumnRownumRuleDefinition, AddColumnValueRuleDefinition, AddColumnSubstrRuleDefinition, RemoveColumnsRuleDefinition, AddFilterRegexRuleDefinition, AddFilterCountRuleDefinition, AddFilterEmptyRuleDefinition, AddFilterMatchesRuleDefinition, AddFilterCompareRuleDefinition, SortRuleDefinition, SwapColumnsRuleDefinition, SplitColumnsRuleDefinition, ] RULES_DEFINITIONS = {} for rule_class in RULES_DEFINITION_CLASSES: RULES_DEFINITIONS[rule_class.rule_type] = rule_class()