Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.dataproviders.hierarchy

"""
Dataproviders that iterate over lines from their sources.
"""

import logging

from galaxy.util import (
    Element,
    etree,
)
from . import line

_TODO = """
"""

log = logging.getLogger(__name__)


# ----------------------------------------------------------------------------- hierarchal/tree data providers
[docs]class HierarchalDataProvider(line.BlockDataProvider): """ Class that uses formats where a datum may have a parent or children data. e.g. XML, HTML, GFF3, Phylogenetic """
[docs] def __init__(self, source, **kwargs): # TODO: (and defer to better (than I can write) parsers for each subtype) super().__init__(source, **kwargs)
# ----------------------------------------------------------------------------- xml
[docs]class XMLDataProvider(HierarchalDataProvider): """ Data provider that converts selected XML elements to dictionaries. """ # using lxml.etree's iterparse method to keep mem down # TODO: this, however (AFAIK), prevents the use of xpath settings = { "selector": "str", # urlencoded "max_depth": "int", } ITERPARSE_ALL_EVENTS = ("start", "end", "start-ns", "end-ns") # TODO: move appropo into super
[docs] def __init__(self, source, selector=None, max_depth=None, **kwargs): """ :param selector: some partial string in the desired tags to return :param max_depth: the number of generations of descendents to return """ self.selector = selector self.max_depth = max_depth self.namespaces = {} super().__init__(source, **kwargs)
[docs] def matches_selector(self, element, selector=None): """ Returns true if the ``element`` matches the ``selector``. :param element: an XML ``Element`` :param selector: some partial string in the desired tags to return Change point for more sophisticated selectors. """ # search for partial match of selector to the element tag # TODO: add more flexibility here w/o re-implementing xpath # TODO: fails with '#' - browser thinks it's an anchor - use urlencode # TODO: need removal/replacement of etree namespacing here - then move to string match return bool((selector is None) or (isinstance(element, Element) and selector in element.tag))
[docs] def element_as_dict(self, element): """ Converts an XML element (its text, tag, and attributes) to dictionary form. :param element: an XML ``Element`` """ # TODO: Key collision is unlikely here, but still should be better handled return { "tag": element.tag, "text": element.text.strip() if element.text else None, # needs shallow copy to protect v. element.clear() "attrib": dict(element.attrib), }
[docs] def get_children(self, element, max_depth=None): """ Yield all children of element (and their children - recursively) in dictionary form. :param element: an XML ``Element`` :param max_depth: the number of generations of descendents to return """ if not isinstance(max_depth, int) or max_depth >= 1: for child in element: child_data = self.element_as_dict(child) next_depth = max_depth - 1 if isinstance(max_depth, int) else None grand_children = list(self.get_children(child, next_depth)) if grand_children: child_data["children"] = grand_children yield child_data
def __iter__(self): context = etree.iterparse(self.source, events=self.ITERPARSE_ALL_EVENTS) context = iter(context) selected_element = None for event, element in context: if event == "start-ns": ns, uri = element self.namespaces[ns] = uri elif event == "start": if (selected_element is None) and (self.matches_selector(element, self.selector)): # start tag of selected element - wait for 'end' to emit/yield selected_element = element elif event == "end": if (selected_element is not None) and (element == selected_element): self.num_valid_data_read += 1 # offset if self.num_valid_data_read > self.offset: # convert to dict and yield selected_element_dict = self.element_as_dict(selected_element) children = list(self.get_children(selected_element, self.max_depth)) if children: selected_element_dict["children"] = children yield selected_element_dict # limit self.num_data_returned += 1 if self.limit is not None and self.num_data_returned >= self.limit: break selected_element.clear() selected_element = None self.num_data_read += 1