Source code for galaxy.util.xml_macros

import os
from copy import deepcopy
from typing import (
    Dict,
    List,
    Optional,
    Tuple,
)

from galaxy.util import (
    Element,
    ElementTree,
    parse_xml,
)
from galaxy.util.path import StrPath

REQUIRED_PARAMETER = object()


[docs]def load_with_references(path: StrPath) -> Tuple[ElementTree, Optional[List[str]]]: """Load XML documentation from file system and preprocesses XML macros. Return the XML representation of the expanded tree and paths to referenced files that were imported (macros). """ tree = raw_xml_tree(path) root = tree.getroot() macros_el = _macros_el(root) if macros_el is None: return tree, [] macros: Dict[str, List[Element]] = {} macro_paths = _import_macros(macros_el, path, macros) macros_el.clear() # Collect tokens tokens = {} for m in macros.get("token", []): tokens[m.get("name")] = m.text or "" tokens = expand_nested_tokens(tokens) # Expand xml macros macro_dict = {} for m in macros.get("xml", []): macro_dict[m.get("name")] = XmlMacroDef(m) _expand_macros([root], macro_dict, tokens) # reinsert template macro which are used during tool execution for m in macros.get("template", []): macros_el.append(m) _expand_tokens_for_el(root, tokens) return tree, macro_paths
[docs]def load(path: StrPath) -> ElementTree: tree, _ = load_with_references(path) return tree
[docs]def template_macro_params(root): """ Look for template macros and populate param_dict (for cheetah) with these. """ param_dict = {} macros_el = _macros_el(root) macro_dict = _macros_of_type(macros_el, "template", lambda el: el.text) for key, value in macro_dict.items(): param_dict[key] = value return param_dict
[docs]def raw_xml_tree(path: StrPath) -> ElementTree: """Load raw (no macro expansion) tree representation of XML represented at the specified path. """ tree = parse_xml(path, strip_whitespace=False, remove_comments=True) return tree
[docs]def imported_macro_paths(root): macros_el = _macros_el(root) return _imported_macro_paths_from_el(macros_el)
def _import_macros(macros_el, path, macros) -> Optional[List[str]]: """ root the parsed XML tree path the path to the main xml document """ xml_base_dir = os.path.dirname(path) if macros_el is not None: macro_paths = _load_macros(macros_el, xml_base_dir, macros) # _xml_set_children(macros_el, macro_els) return macro_paths return None def _macros_el(root): return root.find("macros") def _macros_of_type(macros_el, type, el_func): if macros_el is None: return {} macro_els = macros_el.findall("macro") filtered_els = [(macro_el.get("name"), el_func(macro_el)) for macro_el in macro_els if macro_el.get("type") == type] return dict(filtered_els) def expand_nested_tokens(tokens): for token_name in tokens.keys(): for current_token_name, current_token_value in tokens.items(): if token_name in current_token_value: if token_name == current_token_name: raise Exception(f"Token '{token_name}' cannot contain itself") tokens[current_token_name] = current_token_value.replace(token_name, tokens[token_name]) return tokens def _expand_tokens(elements, tokens): if not tokens or elements is None: return for element in elements: _expand_tokens_for_el(element, tokens) def _expand_tokens_for_el(element, tokens): """ expand tokens in element and (recursively) in its children replacements of text attributes and attribute values are possible """ value = element.text if value: new_value = _expand_tokens_str(element.text, tokens) if new_value is not value: element.text = new_value for key, value in element.attrib.items(): new_value = _expand_tokens_str(value, tokens) if new_value is not value: element.attrib[key] = new_value new_key = _expand_tokens_str(key, tokens) if new_key is not key: element.attrib[new_key] = element.attrib[key] del element.attrib[key] # recursively expand in childrens _expand_tokens(list(element), tokens) def _expand_tokens_str(s, tokens): for key, value in tokens.items(): if key in s: s = s.replace(key, value) return s def _expand_macros(elements, macros, tokens, visited=None): if not macros and not tokens: return if visited is None: v = [] else: v = visited for element in elements: while True: expand_el = element.find(".//expand") if expand_el is None: break _expand_macro(expand_el, macros, tokens, v) def _expand_macro(expand_el, macros, tokens, visited): macro_name = expand_el.get("macro") assert macro_name is not None, "Attempted to expand macro with no 'macro' attribute defined." # check for cycles in the nested macro expansion assert ( macro_name not in visited ), f"Cycle in nested macros: already expanded {visited} can't expand '{macro_name}' again" visited.append(macro_name) assert macro_name in macros, f"No macro named {macro_name} found, known macros are {', '.join(macros.keys())}." macro_def = macros[macro_name] expanded_elements = deepcopy(macro_def.element) _expand_yield_statements(expanded_elements, expand_el) macro_tokens = macro_def.macro_tokens(expand_el) if macro_tokens: _expand_tokens(expanded_elements, macro_tokens) # Recursively expand contained macros. _expand_macros(expanded_elements, macros, tokens, visited) _xml_replace(expand_el, expanded_elements) del visited[-1] def _expand_yield_statements(macro_def, expand_el): """ Modifies the macro_def element by replacing 1. all named yield tags by the content of the corresponding token tags - token tags need to be direct children of the expand - processed in order of definition of the token tags 2. all unnamed yield tags by the non-token children of the expand tag """ # replace named yields for token_el in expand_el.findall("./token"): name = token_el.attrib.get("name", None) assert name is not None, "Found unnamed token" + str(token_el.attrib) yield_els = list(macro_def.findall(f".//yield[@name='{name}']")) assert len(yield_els) > 0, f"No named yield found for named token {name}" token_el_children = list(token_el) for yield_el in yield_els: _xml_replace(yield_el, token_el_children) # replace unnamed yields yield_els = list(macro_def.findall(".//yield")) expand_el_children = [c for c in expand_el if c.tag != "token"] for yield_el in yield_els: _xml_replace(yield_el, expand_el_children) def _load_macros(macros_el, xml_base_dir, macros) -> List[str]: # Import macros from external files. macro_paths = _load_imported_macros(macros_el, xml_base_dir, macros) # Load all directly defined macros. _load_embedded_macros(macros_el, xml_base_dir, macros) return macro_paths def _load_embedded_macros(macros_el, xml_base_dir, macros): if macros_el is None: return # attribute typed macro for macro in macros_el.iterfind("macro"): if "type" not in macro.attrib: macro.attrib["type"] = "xml" try: macros[macro.attrib["type"]].append(macro) except KeyError: macros[macro.attrib["type"]] = [macro] # type shortcuts (<xml> is a shortcut for <macro type="xml", # likewise for <template>. for tag in ["template", "xml", "token"]: for macro_el in macros_el.iterfind(tag): macro_el.attrib["type"] = tag macro_el.tag = "macro" try: macros[tag].append(macro_el) except KeyError: macros[tag] = [macro_el] def _load_imported_macros(macros_el, xml_base_dir, macros) -> List[str]: macro_paths = [] for tool_relative_import_path in _imported_macro_paths_from_el(macros_el): import_path = os.path.join(xml_base_dir, tool_relative_import_path) macro_paths.append(import_path) current_macro_paths = _load_macro_file(import_path, xml_base_dir, macros) macro_paths.extend(current_macro_paths) return macro_paths def _imported_macro_paths_from_el(macros_el): imported_macro_paths = [] macro_import_els = [] if macros_el is not None: macro_import_els = macros_el.findall("import") for macro_import_el in macro_import_els: raw_import_path = macro_import_el.text imported_macro_paths.append(raw_import_path) return imported_macro_paths def _load_macro_file(path: StrPath, xml_base_dir, macros) -> List[str]: tree = parse_xml(path, strip_whitespace=False) root = tree.getroot() return _load_macros(root, xml_base_dir, macros) def _xml_replace(query, targets): parent_el = query.find("..") matching_index = -1 # for index, el in enumerate(parent_el.iter('.')): ## Something like this for newer implementation for index, el in enumerate(list(parent_el)): if el == query: matching_index = index break assert matching_index >= 0 current_index = matching_index for target in targets: current_index += 1 parent_el.insert(current_index, deepcopy(target)) parent_el.remove(query) class XmlMacroDef: """ representation of a (Galaxy) XML macro stores the root element of the macro and the parameters. each parameter is represented as pair containing - the quote character, default '@' - parameter name parameter names can be given as comma separated list using the `token` attribute or as attributes `token_XXX` (where `XXX` is the name). The former option should be used to specify required attributes of the macro and the latter for optional attributes if the macro (the value of `token_XXX is used as default value). TODO: `token_quote` forbids `"quote"` as character name of optional parameters """ def __init__(self, el): self.element = el parameters = {} tokens = [] token_quote = "@" for key, value in el.attrib.items(): if key == "token_quote": token_quote = value if key == "tokens": for token in value.split(","): tokens.append((token, REQUIRED_PARAMETER)) elif key.startswith("token_"): token = key[len("token_") :] tokens.append((token, value)) for name, default in tokens: parameters[name] = (token_quote, default) self.parameters = parameters def macro_tokens(self, expand_el): """ get a dictionary mapping token names to values. The names are the parameter names surrounded by the quote character. Values are taken from the expand_el if absent default values of optional parameters are used. """ tokens = {} for key, (wrap_char, default_val) in self.parameters.items(): token_value = expand_el.attrib.get(key, default_val) if token_value is REQUIRED_PARAMETER: raise ValueError(f"Failed to expand macro - missing required parameter [{key}].") token_name = f"{wrap_char}{key.upper()}{wrap_char}" tokens[token_name] = token_value return tokens __all__ = ( "imported_macro_paths", "load", "load_with_references", "raw_xml_tree", "template_macro_params", )