import abc
import itertools
import re
from typing import (
List,
Type,
)
import yaml
from galaxy.util.resources import resource_string
[docs]def get_rules_specification():
return yaml.safe_load(resource_string(__package__, "rules_dsl_spec.yml"))
def _ensure_rule_contains_keys(rule, keys):
for key, instance_class in keys.items():
if key not in rule:
raise ValueError(f"Rule of type [{rule['type']}] does not contain key [{key}].")
value = rule[key]
if not isinstance(value, instance_class):
raise ValueError(f"Rule of type [{rule['type']}] does not contain correct value type for key [{key}].")
def _ensure_key_value_in(rule, key, values):
value = rule[key]
if value not in values:
raise ValueError(f"Invalid value [{value}] for [{key}] encountered.")
def _ensure_valid_pattern(expression):
re.compile(expression)
[docs]def apply_regex(regex, target, data, replacement=None, group_count=None):
pattern = re.compile(regex)
def new_row(row):
source = row[target]
if replacement is None:
match = pattern.search(source)
if not match:
raise Exception(f"Problem applying regular expression [{regex}] to [{source}].")
if group_count:
if len(match.groups()) != group_count:
raise Exception("Problem applying regular expression, wrong number of groups found.")
result = row + list(match.groups())
else:
result = row + [match.group(0)]
else:
result = row + [pattern.search(source).expand(replacement)]
return result
new_data = list(map(new_row, data))
return new_data
[docs]class BaseRuleDefinition(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def rule_type(self):
"""Short string describing type of rule (plugin class) to use."""
[docs] @abc.abstractmethod
def validate_rule(self, rule):
"""Validate dictified rule definition of this type."""
[docs] @abc.abstractmethod
def apply(self, rule, data, sources):
"""Apply validated, dictified rule definition to supplied data."""
[docs]class AddColumnGroupTagValueRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_group_tag_value"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources):
rule_value = rule["value"]
tag_prefix = f"group:{rule_value}:"
new_rows = []
for index, row in enumerate(data):
group_tag_value = None
source = sources[index]
tags = source["tags"]
for tag in sorted(tags):
if tag.startswith(tag_prefix):
group_tag_value = tag[len(tag_prefix) :]
break
if group_tag_value is None:
group_tag_value = rule.get("default_value", "")
new_rows.append(row + [group_tag_value])
return new_rows, sources
[docs]class AddColumnConcatenateRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_concatenate"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"target_column_0": int, "target_column_1": int})
[docs] def apply(self, rule, data, sources):
column_0 = rule["target_column_0"]
column_1 = rule["target_column_1"]
new_rows = []
for row in data:
new_rows.append(row + [row[column_0] + row[column_1]])
return new_rows, sources
[docs]class AddColumnBasenameRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_basename"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"target_column": int})
[docs] def apply(self, rule, data, sources):
column = rule["target_column"]
re = r"[^/]*$"
return apply_regex(re, column, data), sources
[docs]class AddColumnRegexRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_regex"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"target_column": int, "expression": str})
_ensure_valid_pattern(rule["expression"])
[docs] def apply(self, rule, data, sources):
target = rule["target_column"]
expression = rule["expression"]
replacement = rule.get("replacement")
group_count = rule.get("group_count")
return apply_regex(expression, target, data, replacement, group_count), sources
[docs]class AddColumnRownumRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_rownum"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"start": int})
[docs] def apply(self, rule, data, sources):
start = rule["start"]
new_rows = []
for index, row in enumerate(data):
new_rows.append(row + ["%d" % (index + start)])
return new_rows, sources
[docs]class AddColumnValueRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_value"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"value": str})
[docs] def apply(self, rule, data, sources):
value = rule["value"]
new_rows = []
for row in data:
new_rows.append(row + [str(value)])
return new_rows, sources
[docs]class AddColumnSubstrRuleDefinition(BaseRuleDefinition):
rule_type = "add_column_substr"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_column": int,
"length": int,
"substr_type": str,
},
)
_ensure_key_value_in(rule, "substr_type", ["keep_prefix", "drop_prefix", "keep_suffix", "drop_suffix"])
[docs] def apply(self, rule, data, sources):
target = rule["target_column"]
length = rule["length"]
substr_type = rule["substr_type"]
def new_row(row):
original_value = row[target]
start = 0
end = len(original_value)
if substr_type == "keep_prefix":
end = length
elif substr_type == "drop_prefix":
start = length
elif substr_type == "keep_suffix":
start = end - length
if start < 0:
start = 0
else:
end = end - length
if end < 0:
end = 0
return row + [original_value[start:end]]
return list(map(new_row, data)), sources
[docs]class RemoveColumnsRuleDefinition(BaseRuleDefinition):
rule_type = "remove_columns"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_columns": list,
},
)
[docs] def apply(self, rule, data, sources):
target_columns = rule["target_columns"]
def new_row(row):
new = []
for index, val in enumerate(row):
if index not in target_columns:
new.append(val)
return new
return list(map(new_row, data)), sources
def _filter_index(func, iterable):
result = []
for index, x in enumerate(iterable):
if func(index):
result.append(x)
return result
[docs]class AddFilterRegexRuleDefinition(BaseRuleDefinition):
rule_type = "add_filter_regex"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_column": int,
"invert": bool,
"expression": str,
},
)
_ensure_valid_pattern(rule["expression"])
[docs] def apply(self, rule, data, sources):
target_column = rule["target_column"]
invert = rule["invert"]
regex = rule["expression"]
def _filter(index):
row = data[index]
val = row[target_column]
pattern = re.compile(regex)
return not invert if pattern.search(val) else invert
return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterCountRuleDefinition(BaseRuleDefinition):
rule_type = "add_filter_count"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"count": int,
"invert": bool,
"which": str,
},
)
_ensure_key_value_in(rule, "which", ["first", "last"])
[docs] def apply(self, rule, data, sources):
num_rows = len(data)
invert = rule["invert"]
n = rule["count"]
which = rule["which"]
def _filter(index):
if which == "first":
matches = index >= n
else:
matches = index < (num_rows - n)
return not invert if matches else invert
return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterEmptyRuleDefinition(BaseRuleDefinition):
rule_type = "add_filter_empty"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(rule, {"target_column": int, "invert": bool})
[docs] def apply(self, rule, data, sources):
invert = rule["invert"]
target_column = rule["target_column"]
def _filter(index):
non_empty = len(data[index][target_column]) != 0
return not invert if non_empty else invert
return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterMatchesRuleDefinition(BaseRuleDefinition):
rule_type = "add_filter_matches"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_column": int,
"invert": bool,
"value": str,
},
)
[docs] def apply(self, rule, data, sources):
invert = rule["invert"]
target_column = rule["target_column"]
value = rule["value"]
def _filter(index):
row = data[index]
val = row[target_column]
return not invert if val == value else invert
return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class AddFilterCompareRuleDefinition(BaseRuleDefinition):
rule_type = "add_filter_compare"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_column": int,
"value": int,
"compare_type": str,
},
)
_ensure_key_value_in(
rule, "compare_type", ["less_than", "less_than_equal", "greater_than", "greater_than_equal"]
)
[docs] def apply(self, rule, data, sources):
target_column = rule["target_column"]
value = rule["value"]
compare_type = rule["compare_type"]
def _filter(index):
row = data[index]
target_value = float(row[target_column])
if compare_type == "less_than":
matches = target_value < value
elif compare_type == "less_than_equal":
matches = target_value <= value
elif compare_type == "greater_than":
matches = target_value > value
elif compare_type == "greater_than_equal":
matches = target_value >= value
return matches
return _filter_index(_filter, data), _filter_index(_filter, sources)
[docs]class SortRuleDefinition(BaseRuleDefinition):
rule_type = "sort"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_column": int,
"numeric": bool,
},
)
[docs] def apply(self, rule, data, sources):
target = rule["target_column"]
numeric = rule["numeric"]
sortable = zip(data, sources)
def sort_func(item):
a_val = item[0][target]
if numeric:
a_val = float(a_val)
return a_val
sorted_data = sorted(sortable, key=sort_func)
new_data = []
new_sources = []
for row, source in sorted_data:
new_data.append(row)
new_sources.append(source)
return new_data, new_sources
[docs]class SwapColumnsRuleDefinition(BaseRuleDefinition):
rule_type = "swap_columns"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_column_0": int,
"target_column_1": int,
},
)
[docs] def apply(self, rule, data, sources):
target_column_0 = rule["target_column_0"]
target_column_1 = rule["target_column_1"]
def new_row(row):
row_copy = row[:]
row_copy[target_column_0] = row[target_column_1]
row_copy[target_column_1] = row[target_column_0]
return row_copy
return list(map(new_row, data)), sources
[docs]class SplitColumnsRuleDefinition(BaseRuleDefinition):
rule_type = "split_columns"
[docs] def validate_rule(self, rule):
_ensure_rule_contains_keys(
rule,
{
"target_columns_0": list,
"target_columns_1": list,
},
)
[docs] def apply(self, rule, data, sources):
target_columns_0 = rule["target_columns_0"]
target_columns_1 = rule["target_columns_1"]
def split_row(row):
new_row_0 = []
new_row_1 = []
for index, el in enumerate(row):
if index in target_columns_0:
new_row_0.append(el)
elif index in target_columns_1:
new_row_1.append(el)
else:
new_row_0.append(el)
new_row_1.append(el)
return [new_row_0, new_row_1]
data = flat_map(split_row, data)
sources = flat_map(lambda x: [x, x], sources)
return data, sources
[docs]def flat_map(f, items):
return list(itertools.chain.from_iterable(map(f, items)))
[docs]class RuleSet:
[docs] def __init__(self, rule_set_as_dict):
self.raw_rules = rule_set_as_dict["rules"]
self.raw_mapping = rule_set_as_dict.get("mapping", [])
@property
def rules(self):
return self.raw_rules
def _rules_with_definitions(self):
for rule in self.raw_rules:
yield (rule, RULES_DEFINITIONS[rule["type"]])
[docs] def apply(self, data, sources):
for rule, rule_definition in self._rules_with_definitions():
rule_definition.validate_rule(rule)
data, sources = rule_definition.apply(rule, data, sources)
return data, sources
@property
def has_errors(self):
errored = False
try:
for rule, rule_definition in self._rules_with_definitions():
rule_definition.validate_rule(rule)
except Exception:
errored = True
return errored
@property
def mapping_as_dict(self):
as_dict = {}
for mapping in self.raw_mapping:
as_dict[mapping["type"]] = mapping
return as_dict
# Rest of this is generic, things here are Galaxy collection specific, think about about
# subclass of RuleSet for collection creation.
@property
def identifier_columns(self):
mapping_as_dict = self.mapping_as_dict
identifier_columns = []
if "list_identifiers" in mapping_as_dict:
identifier_columns.extend(mapping_as_dict["list_identifiers"]["columns"])
if "paired_identifier" in mapping_as_dict:
identifier_columns.append(mapping_as_dict["paired_identifier"]["columns"][0])
return identifier_columns
@property
def collection_type(self):
mapping_as_dict = self.mapping_as_dict
list_columns = mapping_as_dict.get("list_identifiers", {"columns": []})["columns"]
collection_type = ":".join("list" for c in list_columns)
if "paired_identifier" in mapping_as_dict:
if collection_type:
collection_type += ":paired"
else:
collection_type = "paired"
return collection_type
@property
def display(self):
message = "Rules:\n"
message += "".join(f"- {r}\n" for r in self.raw_rules)
message += "Column Definitions:\n"
message += "".join(f"- {m}\n" for m in self.raw_mapping)
return message
RULES_DEFINITION_CLASSES: List[Type[BaseRuleDefinition]] = [
AddColumnMetadataRuleDefinition,
AddColumnGroupTagValueRuleDefinition,
AddColumnConcatenateRuleDefinition,
AddColumnBasenameRuleDefinition,
AddColumnRegexRuleDefinition,
AddColumnRownumRuleDefinition,
AddColumnValueRuleDefinition,
AddColumnSubstrRuleDefinition,
RemoveColumnsRuleDefinition,
AddFilterRegexRuleDefinition,
AddFilterCountRuleDefinition,
AddFilterEmptyRuleDefinition,
AddFilterMatchesRuleDefinition,
AddFilterCompareRuleDefinition,
SortRuleDefinition,
SwapColumnsRuleDefinition,
SplitColumnsRuleDefinition,
]
RULES_DEFINITIONS = {}
for rule_class in RULES_DEFINITION_CLASSES:
RULES_DEFINITIONS[rule_class.rule_type] = rule_class()