Source code for galaxy.datatypes.dataproviders.hierarchy

"""
Dataproviders that iterate over lines from their sources.
"""

import logging

from galaxy.util import (
    Element,
    etree,
)
from . import line

_TODO = """
"""

log = logging.getLogger(__name__)


# ----------------------------------------------------------------------------- hierarchal/tree data providers
[docs]class HierarchalDataProvider(line.BlockDataProvider): """ Class that uses formats where a datum may have a parent or children data. e.g. XML, HTML, GFF3, Phylogenetic """
[docs] def __init__(self, source, **kwargs): # TODO: (and defer to better (than I can write) parsers for each subtype) super().__init__(source, **kwargs)
# ----------------------------------------------------------------------------- xml
[docs]class XMLDataProvider(HierarchalDataProvider): """ Data provider that converts selected XML elements to dictionaries. """ # using lxml.etree's iterparse method to keep mem down # TODO: this, however (AFAIK), prevents the use of xpath settings = { "selector": "str", # urlencoded "max_depth": "int", } ITERPARSE_ALL_EVENTS = ("start", "end", "start-ns", "end-ns") # TODO: move appropo into super
[docs] def __init__(self, source, selector=None, max_depth=None, **kwargs): """ :param selector: some partial string in the desired tags to return :param max_depth: the number of generations of descendents to return """ self.selector = selector self.max_depth = max_depth self.namespaces = {} super().__init__(source, **kwargs)
[docs] def matches_selector(self, element, selector=None): """ Returns true if the ``element`` matches the ``selector``. :param element: an XML ``Element`` :param selector: some partial string in the desired tags to return Change point for more sophisticated selectors. """ # search for partial match of selector to the element tag # TODO: add more flexibility here w/o re-implementing xpath # TODO: fails with '#' - browser thinks it's an anchor - use urlencode # TODO: need removal/replacement of etree namespacing here - then move to string match return bool((selector is None) or (isinstance(element, Element) and selector in element.tag))
[docs] def element_as_dict(self, element): """ Converts an XML element (its text, tag, and attributes) to dictionary form. :param element: an XML ``Element`` """ # TODO: Key collision is unlikely here, but still should be better handled return { "tag": element.tag, "text": element.text.strip() if element.text else None, # needs shallow copy to protect v. element.clear() "attrib": dict(element.attrib), }
[docs] def get_children(self, element, max_depth=None): """ Yield all children of element (and their children - recursively) in dictionary form. :param element: an XML ``Element`` :param max_depth: the number of generations of descendents to return """ if not isinstance(max_depth, int) or max_depth >= 1: for child in element: child_data = self.element_as_dict(child) next_depth = max_depth - 1 if isinstance(max_depth, int) else None grand_children = list(self.get_children(child, next_depth)) if grand_children: child_data["children"] = grand_children yield child_data
def __iter__(self): context = etree.iterparse(self.source, events=self.ITERPARSE_ALL_EVENTS) context = iter(context) selected_element = None for event, element in context: if event == "start-ns": ns, uri = element self.namespaces[ns] = uri elif event == "start": if (selected_element is None) and (self.matches_selector(element, self.selector)): # start tag of selected element - wait for 'end' to emit/yield selected_element = element elif event == "end": if (selected_element is not None) and (element == selected_element): self.num_valid_data_read += 1 # offset if self.num_valid_data_read > self.offset: # convert to dict and yield selected_element_dict = self.element_as_dict(selected_element) children = list(self.get_children(selected_element, self.max_depth)) if children: selected_element_dict["children"] = children yield selected_element_dict # limit self.num_data_returned += 1 if self.limit is not None and self.num_data_returned >= self.limit: break selected_element.clear() selected_element = None self.num_data_read += 1