Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.datatypes.dataproviders.column
"""
Providers that provide lists of lists generally where each line of a source
is further subdivided into multiple data (e.g. columns from a line).
"""
import logging
import re
from urllib.parse import unquote_plus
from . import line
_TODO = """
move ColumnarDataProvider parsers to more sensible location
TransposedColumnarDataProvider: provides each column as a single array
- see existing visualizations/dataprovider/basic.ColumnDataProvider
"""
log = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- base classes
[docs]class ColumnarDataProvider(line.RegexLineDataProvider):
"""
Data provider that provide a list of columns from the lines of its source.
Columns are returned in the order given in indeces, so this provider can
re-arrange columns.
If any desired index is outside the actual number of columns
in the source, this provider will None-pad the output and you are guaranteed
the same number of columns as the number of indeces asked for (even if they
are filled with None).
"""
settings = {
"indeces": "list:int",
"column_count": "int",
"column_types": "list:str",
"parse_columns": "bool",
"deliminator": "str",
"filters": "list:str",
}
[docs] def __init__(
self,
source,
indeces=None,
column_count=None,
column_types=None,
parsers=None,
parse_columns=True,
deliminator="\t",
filters=None,
**kwargs,
):
"""
:param indeces: a list of indeces of columns to gather from each row
Optional: will default to `None`.
If `None`, this provider will return all rows (even when a
particular row contains more/less than others).
If a row/line does not contain an element at a given index, the
provider will-return/fill-with a `None` value as the element.
:type indeces: list or None
:param column_count: an alternate means of defining indeces, use an int
here to effectively provide the first N columns.
Optional: will default to `None`.
:type column_count: int
:param column_types: a list of string names of types that the
provider will use to look up an appropriate parser for the column.
(e.g. 'int', 'float', 'str', 'bool')
Optional: will default to parsing all columns as strings.
:type column_types: list of strings
:param parsers: a dictionary keyed with column type strings
and with values that are functions to use when parsing those
types.
Optional: will default to using the function `_get_default_parsers`.
:type parsers: dictionary
:param parse_columns: attempt to parse columns? Optional: defaults to `True`.
:type parse_columns: bool
:param deliminator: character(s) used to split each row/line of the source. Optional: defaults to the tab character.
:type deliminator: str
.. note:: that the subclass constructors are passed kwargs - so they're
params (limit, offset, etc.) are also applicable here.
"""
# TODO: other columnar formats: csv, etc.
super().__init__(source, **kwargs)
# IMPLICIT: if no indeces, column_count, or column_types passed: return all columns
self.selected_column_indeces = indeces
self.column_count = column_count
self.column_types = column_types or []
# if no column count given, try to infer from indeces or column_types
if not self.column_count:
if self.selected_column_indeces:
self.column_count = len(self.selected_column_indeces)
elif self.column_types:
self.column_count = len(self.column_types)
# if no indeces given, infer from column_count
if not self.selected_column_indeces and self.column_count:
self.selected_column_indeces = list(range(self.column_count))
self.deliminator = deliminator
# how/whether to parse each column value
self.parsers = {}
if parse_columns:
self.parsers = self.get_default_parsers()
# overwrite with user desired parsers
self.parsers.update(parsers or {})
filters = filters or []
self.column_filters = []
for filter_ in filters:
parsed = self.parse_filter(filter_)
# TODO: might be better to error on bad filter/None here
if callable(parsed):
self.column_filters.append(parsed)
[docs] def parse_filter(self, filter_param_str):
split = filter_param_str.split("-", 2)
if not len(split) >= 3:
return None
column, op, val = split
# better checking v. len and indeces
column = int(column)
if column > len(self.column_types):
return None
if self.column_types[column] in ("float", "int"):
return self.create_numeric_filter(column, op, val)
if self.column_types[column] in ("str"):
return self.create_string_filter(column, op, val)
if self.column_types[column] in ("list"):
return self.create_list_filter(column, op, val)
return None
[docs] def create_numeric_filter(self, column, op, val):
"""
Return an anonymous filter function that will be passed the array
of parsed columns. Return None if no filter function can be
created for the given params.
The function will compare the column at index `column` against `val`
using the given op where op is one of:
- lt: less than
- le: less than or equal to
- eq: equal to
- ne: not equal to
- ge: greather than or equal to
- gt: greater than
`val` is cast as float here and will return None if there's a parsing error.
"""
try:
val = float(val)
except ValueError:
return None
if "lt" == op:
return lambda d: d[column] < val
elif "le" == op:
return lambda d: d[column] <= val
elif "eq" == op:
return lambda d: d[column] == val
elif "ne" == op:
return lambda d: d[column] != val
elif "ge" == op:
return lambda d: d[column] >= val
elif "gt" == op:
return lambda d: d[column] > val
return None
[docs] def create_string_filter(self, column, op, val):
"""
Return an anonymous filter function that will be passed the array
of parsed columns. Return None if no filter function can be
created for the given params.
The function will compare the column at index `column` against `val`
using the given op where op is one of:
- eq: exactly matches
- has: the column contains the substring `val`
- re: the column matches the regular expression in `val`
"""
if "eq" == op:
return lambda d: d[column] == val
elif "has" == op:
return lambda d: val in d[column]
elif "re" == op:
val = unquote_plus(val)
val = re.compile(val)
return lambda d: val.match(d[column]) is not None
return None
[docs] def create_list_filter(self, column, op, val):
"""
Return an anonymous filter function that will be passed the array
of parsed columns. Return None if no filter function can be
created for the given params.
The function will compare the column at index `column` against `val`
using the given op where op is one of:
- eq: the list `val` exactly matches the list in the column
- has: the list in the column contains the sublist `val`
"""
if "eq" == op:
val = self.parse_value(val, "list")
return lambda d: d[column] == val
elif "has" == op:
return lambda d: val in d[column]
return None
[docs] def get_default_parsers(self):
"""
Return parser dictionary keyed for each columnar type
(as defined in datatypes).
.. note:: primitives only by default (str, int, float, boolean, None).
Other (more complex) types are retrieved as strings.
:returns: a dictionary of the form:
`{ <parser type name> : <function used to parse type> }`
"""
# TODO: move to module level (or datatypes, util)
return {
# str is default and not needed here
"int": int,
"float": float,
"bool": bool,
# unfortunately, 'list' is used in dataset metadata both for
# query style maps (9th col gff) AND comma-sep strings.
# (disabled for now)
# 'list' : lambda v: v.split( ',' ),
# 'csv' : lambda v: v.split( ',' ),
# i don't like how urlparses does sub-lists...
# 'querystr' : lambda v: dict([ ( p.split( '=', 1 ) if '=' in p else ( p, True ) )
# for p in v.split( ';', 1 ) ])
# 'scifloat': #floating point which may be in scientific notation
# always with the 1 base, biologists?
# 'int1' : ( lambda i: int( i ) - 1 ),
# 'gffval': string or '.' for None
# 'gffint': # int or '.' for None
# 'gffphase': # 0, 1, 2, or '.' for None
# 'gffstrand': # -, +, ?, or '.' for None, etc.
}
[docs] def filter(self, line):
line = super().filter(line)
if line is None:
return line
columns = self.parse_columns_from_line(line)
return self.filter_by_columns(columns)
[docs] def parse_columns_from_line(self, line):
"""
Returns a list of the desired, parsed columns.
:param line: the line to parse
:type line: str
"""
# TODO: too much going on in this loop - the above should all be precomputed AMAP...
all_columns = line.split(self.deliminator)
# if no indeces were passed to init, return all columns
selected_indeces = self.selected_column_indeces or list(range(len(all_columns)))
parsed_columns = []
for parser_index, column_index in enumerate(selected_indeces):
parsed_columns.append(self.parse_column_at_index(all_columns, parser_index, column_index))
return parsed_columns
[docs] def parse_column_at_index(self, columns, parser_index, index):
"""
Get the column type for the parser from `self.column_types` or `None`
if the type is unavailable.
"""
try:
return self.parse_value(columns[index], self.get_column_type(parser_index))
# if a selected index is not within columns, return None
except IndexError:
return None
[docs] def parse_value(self, val, type):
"""
Attempt to parse and return the given value based on the given type.
:param val: the column value to parse (often a string)
:param type: the string type 'name' used to find the appropriate parser
:returns: the parsed value
or `value` if no `type` found in `parsers`
or `None` if there was a parser error (ValueError)
"""
if type == "str" or type is None:
return val
try:
return self.parsers[type](val)
except KeyError:
# no parser - return as string
pass
except ValueError:
# bad value - return None
return None
return val
[docs] def get_column_type(self, index):
"""
Get the column type for the parser from `self.column_types` or `None`
if the type is unavailable.
:param index: the column index
:returns: string name of type (e.g. 'float', 'int', etc.)
"""
try:
return self.column_types[index]
except IndexError:
return None
[docs] def filter_by_columns(self, columns):
for filter_fn in self.column_filters:
if not filter_fn(columns):
return None
return columns
[docs]class DictDataProvider(ColumnarDataProvider):
"""
Data provider that zips column_names and columns from the source's contents
into a dictionary.
A combination use of both `column_names` and `indeces` allows 'picking'
key/value pairs from the source.
.. note:: The subclass constructors are passed kwargs - so their
params (limit, offset, etc.) are also applicable here.
"""
settings = {
"column_names": "list:str",
}
[docs] def __init__(self, source, column_names=None, **kwargs):
"""
:param column_names: an ordered list of strings that will be used as the keys
for each column in the returned dictionaries.
The number of key, value pairs each returned dictionary has will
be as short as the number of column names provided.
:type column_names:
"""
# TODO: allow passing in a map instead of name->index { 'name1': index1, ... }
super().__init__(source, **kwargs)
self.column_names = column_names or []
def __iter__(self):
parent_gen = super().__iter__()
for column_values in parent_gen:
map = dict(zip(self.column_names, column_values))
yield map