Source code for galaxy.datatypes.dataproviders.line

"""
Dataproviders that iterate over lines from their sources.
"""

import collections
import logging
import re

from . import base

log = logging.getLogger(__name__)

_TODO = """
line offsets (skip to some place in a file) needs to work more efficiently than simply iterating till we're there
    capture tell() when provider is done
        def stop( self ): self.endpoint = source.tell(); raise StopIteration()
a lot of the hierarchy here could be flattened since we're implementing pipes
"""


[docs]class FilteredLineDataProvider(base.LimitedOffsetDataProvider): """ Data provider that yields lines of data from its source allowing optional control over which line to start on and how many lines to return. """ DEFAULT_COMMENT_CHAR = "#" settings = { "strip_lines": "bool", "strip_newlines": "bool", "provide_blank": "bool", "comment_char": "str", }
[docs] def __init__( self, source, strip_lines=True, strip_newlines=False, provide_blank=False, comment_char=DEFAULT_COMMENT_CHAR, **kwargs, ): """ :param strip_lines: remove whitespace from the beginning an ending of each line (or not). Optional: defaults to True :type strip_lines: bool :param strip_newlines: remove newlines only (only functions when ``strip_lines`` is false) Optional: defaults to False :type strip_lines: bool :param provide_blank: are empty lines considered valid and provided? Optional: defaults to False :type provide_blank: bool :param comment_char: character(s) that indicate a line isn't data (a comment) and should not be provided. Optional: defaults to '#' :type comment_char: str """ super().__init__(source, **kwargs) self.strip_lines = strip_lines self.strip_newlines = strip_newlines self.provide_blank = provide_blank self.comment_char = comment_char
[docs] def filter(self, line): """ Determines whether to provide line or not. :param line: the incoming line from the source :type line: str :returns: a line or `None` """ if line is not None: # ??: shouldn't it strip newlines regardless, if not why not use on of the base.dprovs if self.strip_lines: line = line.strip() elif self.strip_newlines: line = line.strip("\n") if not self.provide_blank and line == "": return None elif self.comment_char and line.startswith(self.comment_char): return None return super().filter(line)
[docs]class RegexLineDataProvider(FilteredLineDataProvider): """ Data provider that yields only those lines of data from its source that do (or do not when `invert` is True) match one or more of the given list of regexs. .. note:: the regex matches are effectively OR'd (if **any** regex matches the line it is considered valid and will be provided). """ settings = { "regex_list": "list:escaped", "invert": "bool", }
[docs] def __init__(self, source, regex_list=None, invert=False, **kwargs): """ :param regex_list: list of strings or regular expression strings that will be `match`ed to each line Optional: defaults to `None` (no matching) :type regex_list: list (of str) :param invert: if `True` will provide only lines that **do not match**. Optional: defaults to False :type invert: bool """ super().__init__(source, **kwargs) self.regex_list = regex_list if isinstance(regex_list, list) else [] self.compiled_regex_list = [re.compile(regex) for regex in self.regex_list] self.invert = invert
# NOTE: no support for flags
[docs] def filter(self, line): # NOTE: filter_fn will occur BEFORE any matching line = super().filter(line) if line is not None and self.compiled_regex_list: line = self.filter_by_regex(line) return line
[docs] def filter_by_regex(self, line): matches = any(regex.match(line) for regex in self.compiled_regex_list) if self.invert: return line if not matches else None return line if matches else None
# ============================================================================= MICELLAINEOUS OR UNIMPLEMENTED # ----------------------------------------------------------------------------- block data providers
[docs]class BlockDataProvider(base.LimitedOffsetDataProvider): """ Class that uses formats where multiple lines combine to describe a single datum. The data output will be a list of either map/dicts or sub-arrays. Uses FilteredLineDataProvider as its source (kwargs **not** passed). e.g. Fasta, GenBank, MAF, hg log Note: mem intensive (gathers list of lines before output) """
[docs] def __init__(self, source, new_block_delim_fn=None, block_filter_fn=None, **kwargs): """ :param new_block_delim_fn: T/F function to determine whether a given line is the start of a new block. :type new_block_delim_fn: function :param block_filter_fn: function that determines if a block is valid and will be provided. Optional: defaults to `None` (no filtering) :type block_filter_fn: function """ # composition - not inheritance # TODO: not a fan of this: (filter_fn, limit, offset) = (kwargs.pop("filter_fn", None), kwargs.pop("limit", None), kwargs.pop("offset", 0)) line_provider = FilteredLineDataProvider(source, **kwargs) super().__init__(line_provider, filter_fn=filter_fn, limit=limit, offset=offset) self.new_block_delim_fn = new_block_delim_fn self.block_filter_fn = block_filter_fn self.init_new_block()
[docs] def init_new_block(self): """ Set up internal data for next block. """ # called in __init__ and after yielding the prev. block self.block_lines = collections.deque([])
def __iter__(self): """ Overridden to provide last block. """ parent_gen = super().__iter__() yield from parent_gen if (last_block := self.handle_last_block()) is not None: self.num_data_returned += 1 yield last_block
[docs] def filter(self, line): """ Line filter here being used to aggregate/assemble lines into a block and determine whether the line indicates a new block. :param line: the incoming line from the source :type line: str :returns: a block or `None` """ line = super().filter(line) # TODO: HACK self.num_data_read -= 1 if line is None: return None block_to_return = None if self.is_new_block(line): # if we're already in a block, return the prev. block and add the line to a new block if self.block_lines: block_to_return = self.assemble_current_block() block_to_return = self.filter_block(block_to_return) self.num_data_read += 1 self.init_new_block() self.add_line_to_block(line) return block_to_return
[docs] def is_new_block(self, line): """ Returns True if the given line indicates the start of a new block (and the current block should be provided) or False if not. """ if self.new_block_delim_fn: return self.new_block_delim_fn(line) return True
# NOTE: # some formats have one block attr per line # some formats rely on having access to multiple lines to make sensible data # So, building the block from the lines can happen in either: # add_line_to_block AND/OR assemble_current_block
[docs] def add_line_to_block(self, line): """ Integrate the given line into the current block. Called per line. """ # here either: # consume the line (using it to add attrs to self.block) # save the line (appending to self.block_lines) for use in assemble_current_block self.block_lines.append(line)
[docs] def assemble_current_block(self): """ Build the current data into a block. Called per block (just before providing). """ # empty block_lines and assemble block return list(self.block_lines.popleft() for i in range(len(self.block_lines)))
[docs] def filter_block(self, block): """ Is the current block a valid/desired datum. Called per block (just before providing). """ if self.block_filter_fn: return self.block_filter_fn(block) return block
[docs] def handle_last_block(self): """ Handle any blocks remaining after the main loop. """ if self.limit is not None and self.num_data_returned >= self.limit: return None last_block = self.assemble_current_block() self.num_data_read += 1 last_block = self.filter_block(last_block) if last_block is not None: self.num_valid_data_read += 1 return last_block