Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.dataproviders.line

"""
Dataproviders that iterate over lines from their sources.
"""

import collections
import logging
import re

from . import base

log = logging.getLogger(__name__)

_TODO = """
line offsets (skip to some place in a file) needs to work more efficiently than simply iterating till we're there
    capture tell() when provider is done
        def stop( self ): self.endpoint = source.tell(); raise StopIteration()
a lot of the hierarchy here could be flattened since we're implementing pipes
"""


[docs]class FilteredLineDataProvider(base.LimitedOffsetDataProvider): """ Data provider that yields lines of data from its source allowing optional control over which line to start on and how many lines to return. """ DEFAULT_COMMENT_CHAR = "#" settings = { "strip_lines": "bool", "strip_newlines": "bool", "provide_blank": "bool", "comment_char": "str", }
[docs] def __init__( self, source, strip_lines=True, strip_newlines=False, provide_blank=False, comment_char=DEFAULT_COMMENT_CHAR, **kwargs, ): """ :param strip_lines: remove whitespace from the beginning an ending of each line (or not). Optional: defaults to True :type strip_lines: bool :param strip_newlines: remove newlines only (only functions when ``strip_lines`` is false) Optional: defaults to False :type strip_lines: bool :param provide_blank: are empty lines considered valid and provided? Optional: defaults to False :type provide_blank: bool :param comment_char: character(s) that indicate a line isn't data (a comment) and should not be provided. Optional: defaults to '#' :type comment_char: str """ super().__init__(source, **kwargs) self.strip_lines = strip_lines self.strip_newlines = strip_newlines self.provide_blank = provide_blank self.comment_char = comment_char
[docs] def filter(self, line): """ Determines whether to provide line or not. :param line: the incoming line from the source :type line: str :returns: a line or `None` """ if line is not None: # ??: shouldn't it strip newlines regardless, if not why not use on of the base.dprovs if self.strip_lines: line = line.strip() elif self.strip_newlines: line = line.strip("\n") if not self.provide_blank and line == "": return None elif self.comment_char and line.startswith(self.comment_char): return None return super().filter(line)
[docs]class RegexLineDataProvider(FilteredLineDataProvider): """ Data provider that yields only those lines of data from its source that do (or do not when `invert` is True) match one or more of the given list of regexs. .. note:: the regex matches are effectively OR'd (if **any** regex matches the line it is considered valid and will be provided). """ settings = { "regex_list": "list:escaped", "invert": "bool", }
[docs] def __init__(self, source, regex_list=None, invert=False, **kwargs): """ :param regex_list: list of strings or regular expression strings that will be `match`ed to each line Optional: defaults to `None` (no matching) :type regex_list: list (of str) :param invert: if `True` will provide only lines that **do not match**. Optional: defaults to False :type invert: bool """ super().__init__(source, **kwargs) self.regex_list = regex_list if isinstance(regex_list, list) else [] self.compiled_regex_list = [re.compile(regex) for regex in self.regex_list] self.invert = invert
# NOTE: no support for flags
[docs] def filter(self, line): # NOTE: filter_fn will occur BEFORE any matching line = super().filter(line) if line is not None and self.compiled_regex_list: line = self.filter_by_regex(line) return line
[docs] def filter_by_regex(self, line): matches = any(regex.match(line) for regex in self.compiled_regex_list) if self.invert: return line if not matches else None return line if matches else None
# ============================================================================= MICELLAINEOUS OR UNIMPLEMENTED # ----------------------------------------------------------------------------- block data providers
[docs]class BlockDataProvider(base.LimitedOffsetDataProvider): """ Class that uses formats where multiple lines combine to describe a single datum. The data output will be a list of either map/dicts or sub-arrays. Uses FilteredLineDataProvider as its source (kwargs **not** passed). e.g. Fasta, GenBank, MAF, hg log Note: mem intensive (gathers list of lines before output) """
[docs] def __init__(self, source, new_block_delim_fn=None, block_filter_fn=None, **kwargs): """ :param new_block_delim_fn: T/F function to determine whether a given line is the start of a new block. :type new_block_delim_fn: function :param block_filter_fn: function that determines if a block is valid and will be provided. Optional: defaults to `None` (no filtering) :type block_filter_fn: function """ # composition - not inheritance # TODO: not a fan of this: (filter_fn, limit, offset) = (kwargs.pop("filter_fn", None), kwargs.pop("limit", None), kwargs.pop("offset", 0)) line_provider = FilteredLineDataProvider(source, **kwargs) super().__init__(line_provider, filter_fn=filter_fn, limit=limit, offset=offset) self.new_block_delim_fn = new_block_delim_fn self.block_filter_fn = block_filter_fn self.init_new_block()
[docs] def init_new_block(self): """ Set up internal data for next block. """ # called in __init__ and after yielding the prev. block self.block_lines = collections.deque([])
def __iter__(self): """ Overridden to provide last block. """ parent_gen = super().__iter__() yield from parent_gen if (last_block := self.handle_last_block()) is not None: self.num_data_returned += 1 yield last_block
[docs] def filter(self, line): """ Line filter here being used to aggregate/assemble lines into a block and determine whether the line indicates a new block. :param line: the incoming line from the source :type line: str :returns: a block or `None` """ line = super().filter(line) # TODO: HACK self.num_data_read -= 1 if line is None: return None block_to_return = None if self.is_new_block(line): # if we're already in a block, return the prev. block and add the line to a new block if self.block_lines: block_to_return = self.assemble_current_block() block_to_return = self.filter_block(block_to_return) self.num_data_read += 1 self.init_new_block() self.add_line_to_block(line) return block_to_return
[docs] def is_new_block(self, line): """ Returns True if the given line indicates the start of a new block (and the current block should be provided) or False if not. """ if self.new_block_delim_fn: return self.new_block_delim_fn(line) return True
# NOTE: # some formats have one block attr per line # some formats rely on having access to multiple lines to make sensible data # So, building the block from the lines can happen in either: # add_line_to_block AND/OR assemble_current_block
[docs] def add_line_to_block(self, line): """ Integrate the given line into the current block. Called per line. """ # here either: # consume the line (using it to add attrs to self.block) # save the line (appending to self.block_lines) for use in assemble_current_block self.block_lines.append(line)
[docs] def assemble_current_block(self): """ Build the current data into a block. Called per block (just before providing). """ # empty block_lines and assemble block return [self.block_lines.popleft() for i in range(len(self.block_lines))]
[docs] def filter_block(self, block): """ Is the current block a valid/desired datum. Called per block (just before providing). """ if self.block_filter_fn: return self.block_filter_fn(block) return block
[docs] def handle_last_block(self): """ Handle any blocks remaining after the main loop. """ if self.limit is not None and self.num_data_returned >= self.limit: return None last_block = self.assemble_current_block() self.num_data_read += 1 last_block = self.filter_block(last_block) if last_block is not None: self.num_valid_data_read += 1 return last_block