Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.rules_dsl

import abc
import itertools
import re
from typing import (
    List,
    Type,
)

import yaml

from galaxy.util.resources import resource_string


[docs]def get_rules_specification(): return yaml.safe_load(resource_string(__package__, "rules_dsl_spec.yml"))
def _ensure_rule_contains_keys(rule, keys): for key, instance_class in keys.items(): if key not in rule: raise ValueError(f"Rule of type [{rule['type']}] does not contain key [{key}].") value = rule[key] if not isinstance(value, instance_class): raise ValueError(f"Rule of type [{rule['type']}] does not contain correct value type for key [{key}].") def _ensure_key_value_in(rule, key, values): value = rule[key] if value not in values: raise ValueError(f"Invalid value [{value}] for [{key}] encountered.") def _ensure_valid_pattern(expression): re.compile(expression)
[docs]def apply_regex(regex, target, data, replacement=None, group_count=None): pattern = re.compile(regex) def new_row(row): source = row[target] if replacement is None: match = pattern.search(source) if not match: raise Exception(f"Problem applying regular expression [{regex}] to [{source}].") if group_count: if len(match.groups()) != group_count: raise Exception("Problem applying regular expression, wrong number of groups found.") result = row + list(match.groups()) else: result = row + [match.group(0)] else: result = row + [pattern.search(source).expand(replacement)] return result new_data = list(map(new_row, data)) return new_data
[docs]class BaseRuleDefinition(metaclass=abc.ABCMeta): @abc.abstractproperty def rule_type(self): """Short string describing type of rule (plugin class) to use."""
[docs] @abc.abstractmethod def validate_rule(self, rule): """Validate dictified rule definition of this type."""
[docs] @abc.abstractmethod def apply(self, rule, data, sources): """Apply validated, dictified rule definition to supplied data."""
[docs]class AddColumnMetadataRuleDefinition(BaseRuleDefinition): rule_type = "add_column_metadata"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources): rule_value = rule["value"] if rule_value.startswith("identifier"): identifier_index = int(rule_value[len("identifier") :]) new_rows = [] for index, row in enumerate(data): new_rows.append(row + [sources[index]["identifiers"][identifier_index]]) elif rule_value == "tags": def sorted_tags(index): tags = sorted(sources[index]["tags"]) return [",".join(tags)] new_rows = [] for index, row in enumerate(data): new_rows.append(row + sorted_tags(index)) return new_rows, sources
[docs]class AddColumnGroupTagValueRuleDefinition(BaseRuleDefinition): rule_type = "add_column_group_tag_value"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources): rule_value = rule["value"] tag_prefix = f"group:{rule_value}:" new_rows = [] for index, row in enumerate(data): group_tag_value = None source = sources[index] tags = source["tags"] for tag in sorted(tags): if tag.startswith(tag_prefix): group_tag_value = tag[len(tag_prefix) :] break if group_tag_value is None: group_tag_value = rule.get("default_value", "") new_rows.append(row + [group_tag_value]) return new_rows, sources
[docs]class AddColumnConcatenateRuleDefinition(BaseRuleDefinition): rule_type = "add_column_concatenate"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column_0": int, "target_column_1": int})
[docs] def apply(self, rule, data, sources): column_0 = rule["target_column_0"] column_1 = rule["target_column_1"] new_rows = [] for row in data: new_rows.append(row + [row[column_0] + row[column_1]]) return new_rows, sources
[docs]class AddColumnBasenameRuleDefinition(BaseRuleDefinition): rule_type = "add_column_basename"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column": int})
[docs] def apply(self, rule, data, sources): column = rule["target_column"] re = r"[^/]*$" return apply_regex(re, column, data), sources
[docs]class AddColumnRegexRuleDefinition(BaseRuleDefinition): rule_type = "add_column_regex"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column": int, "expression": str}) _ensure_valid_pattern(rule["expression"])
[docs] def apply(self, rule, data, sources): target = rule["target_column"] expression = rule["expression"] replacement = rule.get("replacement") group_count = rule.get("group_count") return apply_regex(expression, target, data, replacement, group_count), sources
[docs]class AddColumnRownumRuleDefinition(BaseRuleDefinition): rule_type = "add_column_rownum"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"start": int})
[docs] def apply(self, rule, data, sources): start = rule["start"] new_rows = [] for index, row in enumerate(data): new_rows.append(row + ["%d" % (index + start)]) return new_rows, sources
[docs]class AddColumnValueRuleDefinition(BaseRuleDefinition): rule_type = "add_column_value"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources): value = rule["value"] new_rows = [] for row in data: new_rows.append(row + [str(value)]) return new_rows, sources
[docs]class AddColumnSubstrRuleDefinition(BaseRuleDefinition): rule_type = "add_column_substr"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "length": int, "substr_type": str, }, ) _ensure_key_value_in(rule, "substr_type", ["keep_prefix", "drop_prefix", "keep_suffix", "drop_suffix"])
[docs] def apply(self, rule, data, sources): target = rule["target_column"] length = rule["length"] substr_type = rule["substr_type"] def new_row(row): original_value = row[target] start = 0 end = len(original_value) if substr_type == "keep_prefix": end = length elif substr_type == "drop_prefix": start = length elif substr_type == "keep_suffix": start = end - length if start < 0: start = 0 else: end = end - length if end < 0: end = 0 return row + [original_value[start:end]] return list(map(new_row, data)), sources
[docs]class RemoveColumnsRuleDefinition(BaseRuleDefinition): rule_type = "remove_columns"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_columns": list, }, )
[docs] def apply(self, rule, data, sources): target_columns = rule["target_columns"] def new_row(row): new = [] for index, val in enumerate(row): if index not in target_columns: new.append(val) return new return list(map(new_row, data)), sources
def _filter_index(func, iterable): result = [] for index, x in enumerate(iterable): if func(index): result.append(x) return result
[docs]class AddFilterRegexRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_regex"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "invert": bool, "expression": str, }, ) _ensure_valid_pattern(rule["expression"])
[docs] def apply(self, rule, data, sources): target_column = rule["target_column"] invert = rule["invert"] regex = rule["expression"] def _filter(index): row = data[index] val = row[target_column] pattern = re.compile(regex) return not invert if pattern.search(val) else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterCountRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_count"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "count": int, "invert": bool, "which": str, }, ) _ensure_key_value_in(rule, "which", ["first", "last"])
[docs] def apply(self, rule, data, sources): num_rows = len(data) invert = rule["invert"] n = rule["count"] which = rule["which"] def _filter(index): if which == "first": matches = index >= n else: matches = index < (num_rows - n) return not invert if matches else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterEmptyRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_empty"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys(rule, {"target_column": int, "invert": bool})
[docs] def apply(self, rule, data, sources): invert = rule["invert"] target_column = rule["target_column"] def _filter(index): non_empty = len(data[index][target_column]) != 0 return not invert if non_empty else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterMatchesRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_matches"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "invert": bool, "value": str, }, )
[docs] def apply(self, rule, data, sources): invert = rule["invert"] target_column = rule["target_column"] value = rule["value"] def _filter(index): row = data[index] val = row[target_column] return not invert if val == value else invert return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterCompareRuleDefinition(BaseRuleDefinition): rule_type = "add_filter_compare"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "value": int, "compare_type": str, }, ) _ensure_key_value_in( rule, "compare_type", ["less_than", "less_than_equal", "greater_than", "greater_than_equal"] )
[docs] def apply(self, rule, data, sources): target_column = rule["target_column"] value = rule["value"] compare_type = rule["compare_type"] def _filter(index): row = data[index] target_value = float(row[target_column]) if compare_type == "less_than": matches = target_value < value elif compare_type == "less_than_equal": matches = target_value <= value elif compare_type == "greater_than": matches = target_value > value elif compare_type == "greater_than_equal": matches = target_value >= value return matches return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class SortRuleDefinition(BaseRuleDefinition): rule_type = "sort"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column": int, "numeric": bool, }, )
[docs] def apply(self, rule, data, sources): target = rule["target_column"] numeric = rule["numeric"] sortable = zip(data, sources) def sort_func(item): a_val = item[0][target] if numeric: a_val = float(a_val) return a_val sorted_data = sorted(sortable, key=sort_func) new_data = [] new_sources = [] for (row, source) in sorted_data: new_data.append(row) new_sources.append(source) return new_data, new_sources
[docs]class SwapColumnsRuleDefinition(BaseRuleDefinition): rule_type = "swap_columns"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_column_0": int, "target_column_1": int, }, )
[docs] def apply(self, rule, data, sources): target_column_0 = rule["target_column_0"] target_column_1 = rule["target_column_1"] def new_row(row): row_copy = row[:] row_copy[target_column_0] = row[target_column_1] row_copy[target_column_1] = row[target_column_0] return row_copy return list(map(new_row, data)), sources
[docs]class SplitColumnsRuleDefinition(BaseRuleDefinition): rule_type = "split_columns"
[docs] def validate_rule(self, rule): _ensure_rule_contains_keys( rule, { "target_columns_0": list, "target_columns_1": list, }, )
[docs] def apply(self, rule, data, sources): target_columns_0 = rule["target_columns_0"] target_columns_1 = rule["target_columns_1"] def split_row(row): new_row_0 = [] new_row_1 = [] for index, el in enumerate(row): if index in target_columns_0: new_row_0.append(el) elif index in target_columns_1: new_row_1.append(el) else: new_row_0.append(el) new_row_1.append(el) return [new_row_0, new_row_1] data = flat_map(split_row, data) sources = flat_map(lambda x: [x, x], sources) return data, sources
[docs]def flat_map(f, items): return list(itertools.chain.from_iterable(map(f, items)))
[docs]class RuleSet:
[docs] def __init__(self, rule_set_as_dict): self.raw_rules = rule_set_as_dict["rules"] self.raw_mapping = rule_set_as_dict.get("mapping", [])
@property def rules(self): return self.raw_rules def _rules_with_definitions(self): for rule in self.raw_rules: yield (rule, RULES_DEFINITIONS[rule["type"]])
[docs] def apply(self, data, sources): for rule, rule_definition in self._rules_with_definitions(): rule_definition.validate_rule(rule) data, sources = rule_definition.apply(rule, data, sources) return data, sources
@property def has_errors(self): errored = False try: for rule, rule_definition in self._rules_with_definitions(): rule_definition.validate_rule(rule) except Exception: errored = True return errored @property def mapping_as_dict(self): as_dict = {} for mapping in self.raw_mapping: as_dict[mapping["type"]] = mapping return as_dict # Rest of this is generic, things here are Galaxy collection specific, think about about # subclass of RuleSet for collection creation. @property def identifier_columns(self): mapping_as_dict = self.mapping_as_dict identifier_columns = [] if "list_identifiers" in mapping_as_dict: identifier_columns.extend(mapping_as_dict["list_identifiers"]["columns"]) if "paired_identifier" in mapping_as_dict: identifier_columns.append(mapping_as_dict["paired_identifier"]["columns"][0]) return identifier_columns @property def collection_type(self): mapping_as_dict = self.mapping_as_dict list_columns = mapping_as_dict.get("list_identifiers", {"columns": []})["columns"] collection_type = ":".join(map(lambda c: "list", list_columns)) if "paired_identifier" in mapping_as_dict: if collection_type: collection_type += ":paired" else: collection_type = "paired" return collection_type @property def display(self): message = "Rules:\n" message += "".join(f"- {r}\n" for r in self.raw_rules) message += "Column Definitions:\n" message += "".join(f"- {m}\n" for m in self.raw_mapping) return message
RULES_DEFINITION_CLASSES: List[Type[BaseRuleDefinition]] = [ AddColumnMetadataRuleDefinition, AddColumnGroupTagValueRuleDefinition, AddColumnConcatenateRuleDefinition, AddColumnBasenameRuleDefinition, AddColumnRegexRuleDefinition, AddColumnRownumRuleDefinition, AddColumnValueRuleDefinition, AddColumnSubstrRuleDefinition, RemoveColumnsRuleDefinition, AddFilterRegexRuleDefinition, AddFilterCountRuleDefinition, AddFilterEmptyRuleDefinition, AddFilterMatchesRuleDefinition, AddFilterCompareRuleDefinition, SortRuleDefinition, SwapColumnsRuleDefinition, SplitColumnsRuleDefinition, ] RULES_DEFINITIONS = {} for rule_class in RULES_DEFINITION_CLASSES: RULES_DEFINITIONS[rule_class.rule_type] = rule_class()