Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.datatypes.dataproviders.line
"""
Dataproviders that iterate over lines from their sources.
"""
import collections
import logging
import re
from . import base
log = logging.getLogger(__name__)
_TODO = """
line offsets (skip to some place in a file) needs to work more efficiently than simply iterating till we're there
capture tell() when provider is done
def stop( self ): self.endpoint = source.tell(); raise StopIteration()
a lot of the hierarchy here could be flattened since we're implementing pipes
"""
[docs]class FilteredLineDataProvider(base.LimitedOffsetDataProvider):
"""
Data provider that yields lines of data from its source allowing
optional control over which line to start on and how many lines
to return.
"""
DEFAULT_COMMENT_CHAR = "#"
settings = {
"strip_lines": "bool",
"strip_newlines": "bool",
"provide_blank": "bool",
"comment_char": "str",
}
[docs] def __init__(
self,
source,
strip_lines=True,
strip_newlines=False,
provide_blank=False,
comment_char=DEFAULT_COMMENT_CHAR,
**kwargs,
):
"""
:param strip_lines: remove whitespace from the beginning an ending
of each line (or not).
Optional: defaults to True
:type strip_lines: bool
:param strip_newlines: remove newlines only
(only functions when ``strip_lines`` is false)
Optional: defaults to False
:type strip_lines: bool
:param provide_blank: are empty lines considered valid and provided?
Optional: defaults to False
:type provide_blank: bool
:param comment_char: character(s) that indicate a line isn't data (a comment)
and should not be provided.
Optional: defaults to '#'
:type comment_char: str
"""
super().__init__(source, **kwargs)
self.strip_lines = strip_lines
self.strip_newlines = strip_newlines
self.provide_blank = provide_blank
self.comment_char = comment_char
[docs] def filter(self, line):
"""
Determines whether to provide line or not.
:param line: the incoming line from the source
:type line: str
:returns: a line or `None`
"""
if line is not None:
# ??: shouldn't it strip newlines regardless, if not why not use on of the base.dprovs
if self.strip_lines:
line = line.strip()
elif self.strip_newlines:
line = line.strip("\n")
if not self.provide_blank and line == "":
return None
elif self.comment_char and line.startswith(self.comment_char):
return None
return super().filter(line)
[docs]class RegexLineDataProvider(FilteredLineDataProvider):
"""
Data provider that yields only those lines of data from its source
that do (or do not when `invert` is True) match one or more of the given list
of regexs.
.. note:: the regex matches are effectively OR'd (if **any** regex matches
the line it is considered valid and will be provided).
"""
settings = {
"regex_list": "list:escaped",
"invert": "bool",
}
[docs] def __init__(self, source, regex_list=None, invert=False, **kwargs):
"""
:param regex_list: list of strings or regular expression strings that will
be `match`ed to each line
Optional: defaults to `None` (no matching)
:type regex_list: list (of str)
:param invert: if `True` will provide only lines that **do not match**.
Optional: defaults to False
:type invert: bool
"""
super().__init__(source, **kwargs)
self.regex_list = regex_list if isinstance(regex_list, list) else []
self.compiled_regex_list = [re.compile(regex) for regex in self.regex_list]
self.invert = invert
# NOTE: no support for flags
[docs] def filter(self, line):
# NOTE: filter_fn will occur BEFORE any matching
line = super().filter(line)
if line is not None and self.compiled_regex_list:
line = self.filter_by_regex(line)
return line
[docs] def filter_by_regex(self, line):
matches = any(regex.match(line) for regex in self.compiled_regex_list)
if self.invert:
return line if not matches else None
return line if matches else None
# ============================================================================= MICELLAINEOUS OR UNIMPLEMENTED
# ----------------------------------------------------------------------------- block data providers
[docs]class BlockDataProvider(base.LimitedOffsetDataProvider):
"""
Class that uses formats where multiple lines combine to describe a single
datum. The data output will be a list of either map/dicts or sub-arrays.
Uses FilteredLineDataProvider as its source (kwargs **not** passed).
e.g. Fasta, GenBank, MAF, hg log
Note: mem intensive (gathers list of lines before output)
"""
[docs] def __init__(self, source, new_block_delim_fn=None, block_filter_fn=None, **kwargs):
"""
:param new_block_delim_fn: T/F function to determine whether a given line
is the start of a new block.
:type new_block_delim_fn: function
:param block_filter_fn: function that determines if a block is valid and
will be provided.
Optional: defaults to `None` (no filtering)
:type block_filter_fn: function
"""
# composition - not inheritance
# TODO: not a fan of this:
(filter_fn, limit, offset) = (kwargs.pop("filter_fn", None), kwargs.pop("limit", None), kwargs.pop("offset", 0))
line_provider = FilteredLineDataProvider(source, **kwargs)
super().__init__(line_provider, filter_fn=filter_fn, limit=limit, offset=offset)
self.new_block_delim_fn = new_block_delim_fn
self.block_filter_fn = block_filter_fn
self.init_new_block()
[docs] def init_new_block(self):
"""
Set up internal data for next block.
"""
# called in __init__ and after yielding the prev. block
self.block_lines = collections.deque([])
def __iter__(self):
"""
Overridden to provide last block.
"""
parent_gen = super().__iter__()
yield from parent_gen
if (last_block := self.handle_last_block()) is not None:
self.num_data_returned += 1
yield last_block
[docs] def filter(self, line):
"""
Line filter here being used to aggregate/assemble lines into a block
and determine whether the line indicates a new block.
:param line: the incoming line from the source
:type line: str
:returns: a block or `None`
"""
line = super().filter(line)
# TODO: HACK
self.num_data_read -= 1
if line is None:
return None
block_to_return = None
if self.is_new_block(line):
# if we're already in a block, return the prev. block and add the line to a new block
if self.block_lines:
block_to_return = self.assemble_current_block()
block_to_return = self.filter_block(block_to_return)
self.num_data_read += 1
self.init_new_block()
self.add_line_to_block(line)
return block_to_return
[docs] def is_new_block(self, line):
"""
Returns True if the given line indicates the start of a new block
(and the current block should be provided) or False if not.
"""
if self.new_block_delim_fn:
return self.new_block_delim_fn(line)
return True
# NOTE:
# some formats have one block attr per line
# some formats rely on having access to multiple lines to make sensible data
# So, building the block from the lines can happen in either:
# add_line_to_block AND/OR assemble_current_block
[docs] def add_line_to_block(self, line):
"""
Integrate the given line into the current block.
Called per line.
"""
# here either:
# consume the line (using it to add attrs to self.block)
# save the line (appending to self.block_lines) for use in assemble_current_block
self.block_lines.append(line)
[docs] def assemble_current_block(self):
"""
Build the current data into a block.
Called per block (just before providing).
"""
# empty block_lines and assemble block
return [self.block_lines.popleft() for i in range(len(self.block_lines))]
[docs] def filter_block(self, block):
"""
Is the current block a valid/desired datum.
Called per block (just before providing).
"""
if self.block_filter_fn:
return self.block_filter_fn(block)
return block
[docs] def handle_last_block(self):
"""
Handle any blocks remaining after the main loop.
"""
if self.limit is not None and self.num_data_returned >= self.limit:
return None
last_block = self.assemble_current_block()
self.num_data_read += 1
last_block = self.filter_block(last_block)
if last_block is not None:
self.num_valid_data_read += 1
return last_block