"""
Dataproviders that use either:
- the file contents and/or metadata from a Galaxy DatasetInstance as
their source.
- or provide data in some way relevant to bioinformatic data
(e.g. parsing genomic regions from their source)
"""
import logging
import sys
from bx import (
seq as bx_seq,
wiggle as bx_wig,
)
from galaxy.util import sqlite
from galaxy.util.compression_utils import get_fileobj
from . import (
base,
column,
external,
line,
)
_TODO = """
use bx as much as possible
gff3 hierarchies
change SamtoolsDataProvider to use pysam
"""
log = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- base for using a Glx dataset
[docs]class DatasetDataProvider(base.DataProvider):
"""
Class that uses the file contents and/or metadata from a Galaxy DatasetInstance
as its source.
DatasetDataProvider can be seen as the intersection between a datatype's
metadata and a dataset's file contents. It (so far) mainly provides helper
and conv. methods for using dataset metadata to set up and control how
the data is provided.
"""
[docs] def __init__(self, dataset, **kwargs):
"""
:param dataset: the Galaxy dataset whose file will be the source
:type dataset: model.DatasetInstance
"""
# precondition: dataset is a galaxy.model.DatasetInstance
self.dataset = dataset
# this dataset file is obviously the source
# TODO: this might be a good place to interface with the object_store...
mode = "rb" if dataset.datatype.is_binary else "r"
super().__init__(get_fileobj(dataset.get_file_name(), mode))
# TODO: this is a bit of a mess
# TODO: merge the next two
[docs] def get_indeces_by_column_names(self, list_of_column_names):
"""
Return the list of column indeces when given a list of column_names.
:param list_of_column_names: the names of the columns of which to get indeces.
:type list_of_column_names: list of strs
:raises KeyError: if column_names are not found
:raises ValueError: if an entry in list_of_column_names is not in column_names
"""
metadata_column_names = (
self.dataset.metadata.column_names or getattr(self.dataset.datatype, "column_names", None) or None
)
if not metadata_column_names:
raise KeyError(
"No column_names found for " + f"datatype: {str(self.dataset.datatype)}, dataset: {str(self.dataset)}"
)
indeces = [] # if indeces and column_names:
# pull using indeces and re-name with given names - no need to alter (does as super would)
# pass
for column_name in list_of_column_names:
indeces.append(metadata_column_names.index(column_name))
return indeces
[docs] def get_genomic_region_indeces(self, check=False):
"""
Return a list of column indeces for 'chromCol', 'startCol', 'endCol' from
a source representing a genomic region.
:param check: if True will raise a ValueError if any were not found.
:type check: bool
:raises ValueError: if check is `True` and one or more indeces were not found.
:returns: list of column indeces for the named columns.
"""
region_column_names = ("chromCol", "startCol", "endCol")
region_indices = [self.get_metadata_column_index_by_name(name) for name in region_column_names]
if check and not all(_ is not None for _ in region_indices):
raise ValueError(f"Could not determine proper column indices for chrom, start, end: {str(region_indices)}")
return region_indices
[docs]class ConvertedDatasetDataProvider(DatasetDataProvider):
"""
Class that uses the file contents of a dataset after conversion to a different
format.
"""
[docs] def __init__(self, dataset, **kwargs):
raise NotImplementedError("Abstract class")
# self.original_dataset = dataset
# self.converted_dataset = self.convert_dataset(dataset, **kwargs)
# super(ConvertedDatasetDataProvider, self).__init__(self.converted_dataset, **kwargs)
# NOTE: now self.converted_dataset == self.dataset
[docs] def convert_dataset(self, dataset, **kwargs):
"""
Convert the given dataset in some way.
"""
return dataset
# ----------------------------------------------------------------------------- uses metadata for settings
[docs]class DatasetColumnarDataProvider(column.ColumnarDataProvider):
"""
Data provider that uses a DatasetDataProvider as its source and the
dataset's metadata to buuild settings for the ColumnarDataProvider it's
inherited from.
"""
[docs] def __init__(self, dataset, **kwargs):
"""
All kwargs are inherited from ColumnarDataProvider.
.. seealso:: column.ColumnarDataProvider
If no kwargs are given, this class will attempt to get those kwargs
from the dataset source's metadata.
If any kwarg is given, it will override and be used in place of
any metadata available.
"""
dataset_source = DatasetDataProvider(dataset)
if not kwargs.get("column_types", None):
indeces = kwargs.get("indeces", None)
kwargs["column_types"] = dataset_source.get_metadata_column_types(indeces=indeces)
super().__init__(dataset_source, **kwargs)
[docs]class DatasetDictDataProvider(column.DictDataProvider):
"""
Data provider that uses a DatasetDataProvider as its source and the
dataset's metadata to buuild settings for the DictDataProvider it's
inherited from.
"""
[docs] def __init__(self, dataset, **kwargs):
"""
All kwargs are inherited from DictDataProvider.
.. seealso:: column.DictDataProvider
If no kwargs are given, this class will attempt to get those kwargs
from the dataset source's metadata.
If any kwarg is given, it will override and be used in place of
any metadata available.
The relationship between column_names and indeces is more complex:
+-----------------+-------------------------------+-----------------------+
| | Indeces given | Indeces NOT given |
+=================+===============================+=======================+
| Names given | pull indeces, rename w/ names | pull by name |
+=================+-------------------------------+-----------------------+
| Names NOT given | pull indeces, name w/ meta | pull all, name w/meta |
+=================+-------------------------------+-----------------------+
"""
dataset_source = DatasetDataProvider(dataset)
# TODO: getting too complicated - simplify at some lvl, somehow
# if no column_types given, get column_types from indeces (or all if indeces == None)
indeces = kwargs.get("indeces", None)
column_names = kwargs.get("column_names", None)
if not indeces and column_names:
# pull columns by name
indeces = kwargs["indeces"] = dataset_source.get_indeces_by_column_names(column_names)
elif indeces and not column_names:
# pull using indeces, name with meta
column_names = kwargs["column_names"] = dataset_source.get_metadata_column_names(indeces=indeces)
elif not indeces and not column_names:
# pull all indeces and name using metadata
column_names = kwargs["column_names"] = dataset_source.get_metadata_column_names(indeces=indeces)
# if no column_types given, use metadata column_types
if not kwargs.get("column_types", None):
kwargs["column_types"] = dataset_source.get_metadata_column_types(indeces=indeces)
super().__init__(dataset_source, **kwargs)
# ----------------------------------------------------------------------------- provides a bio-relevant datum
[docs]class GenomicRegionDataProvider(column.ColumnarDataProvider):
"""
Data provider that parses chromosome, start, and end data from a file
using the datasets metadata settings.
Is a ColumnarDataProvider that uses a DatasetDataProvider as its source.
If `named_columns` is true, will return dictionaries with the keys
'chrom', 'start', 'end'.
"""
# dictionary keys when named_columns=True
COLUMN_NAMES = ["chrom", "start", "end"]
settings = {
"chrom_column": "int",
"start_column": "int",
"end_column": "int",
"named_columns": "bool",
}
[docs] def __init__(self, dataset, chrom_column=None, start_column=None, end_column=None, named_columns=False, **kwargs):
"""
:param dataset: the Galaxy dataset whose file will be the source
:type dataset: model.DatasetInstance
:param chrom_column: optionally specify the chrom column index
:type chrom_column: int
:param start_column: optionally specify the start column index
:type start_column: int
:param end_column: optionally specify the end column index
:type end_column: int
:param named_columns: optionally return dictionaries keying each column
with 'chrom', 'start', or 'end'.
Optional: defaults to False
:type named_columns: bool
"""
# TODO: allow passing in a string format e.g. "{chrom}:{start}-{end}"
dataset_source = DatasetDataProvider(dataset)
if chrom_column is None:
chrom_column = dataset_source.get_metadata_column_index_by_name("chromCol")
if start_column is None:
start_column = dataset_source.get_metadata_column_index_by_name("startCol")
if end_column is None:
end_column = dataset_source.get_metadata_column_index_by_name("endCol")
indeces = [chrom_column, start_column, end_column]
if not all(_ is not None for _ in indeces):
raise ValueError("Could not determine proper column indeces for" + f" chrom, start, end: {str(indeces)}")
kwargs.update({"indeces": indeces})
if not kwargs.get("column_types", None):
kwargs.update({"column_types": dataset_source.get_metadata_column_types(indeces=indeces)})
self.named_columns = named_columns
if self.named_columns:
self.column_names = self.COLUMN_NAMES
super().__init__(dataset_source, **kwargs)
def __iter__(self):
parent_gen = super().__iter__()
for column_values in parent_gen:
if self.named_columns:
yield dict(zip(self.column_names, column_values))
else:
yield column_values
# TODO: this optionally provides the same data as the above and makes GenomicRegionDataProvider redundant
# GenomicRegionDataProvider is a better name, tho
[docs]class IntervalDataProvider(column.ColumnarDataProvider):
"""
Data provider that parses chromosome, start, and end data (as well as strand
and name if set in the metadata) using the dataset's metadata settings.
If `named_columns` is true, will return dictionaries with the keys
'chrom', 'start', 'end' (and 'strand' and 'name' if available).
"""
COLUMN_NAMES = ["chrom", "start", "end", "strand", "name"]
settings = {
"chrom_column": "int",
"start_column": "int",
"end_column": "int",
"strand_column": "int",
"name_column": "int",
"named_columns": "bool",
}
[docs] def __init__(
self,
dataset,
chrom_column=None,
start_column=None,
end_column=None,
strand_column=None,
name_column=None,
named_columns=False,
**kwargs,
):
"""
:param dataset: the Galaxy dataset whose file will be the source
:type dataset: model.DatasetInstance
:param named_columns: optionally return dictionaries keying each column
with 'chrom', 'start', 'end', 'strand', or 'name'.
Optional: defaults to False
:type named_columns: bool
"""
# TODO: allow passing in a string format e.g. "{chrom}:{start}-{end}"
dataset_source = DatasetDataProvider(dataset)
# get genomic indeces and add strand and name
self.column_names = []
indeces = []
# TODO: this is sort of involved and oogly
if chrom_column is None:
chrom_column = dataset_source.get_metadata_column_index_by_name("chromCol")
if chrom_column is not None:
self.column_names.append("chrom")
indeces.append(chrom_column)
if start_column is None:
start_column = dataset_source.get_metadata_column_index_by_name("startCol")
if start_column is not None:
self.column_names.append("start")
indeces.append(start_column)
if end_column is None:
end_column = dataset_source.get_metadata_column_index_by_name("endCol")
if end_column is not None:
self.column_names.append("end")
indeces.append(end_column)
if strand_column is None:
strand_column = dataset_source.get_metadata_column_index_by_name("strandCol")
if strand_column is not None:
self.column_names.append("strand")
indeces.append(strand_column)
if name_column is None:
name_column = dataset_source.get_metadata_column_index_by_name("nameCol")
if name_column is not None:
self.column_names.append("name")
indeces.append(name_column)
kwargs.update({"indeces": indeces})
if not kwargs.get("column_types", None):
kwargs.update({"column_types": dataset_source.get_metadata_column_types(indeces=indeces)})
self.named_columns = named_columns
super().__init__(dataset_source, **kwargs)
def __iter__(self):
parent_gen = super().__iter__()
for column_values in parent_gen:
if self.named_columns:
yield dict(zip(self.column_names, column_values))
else:
yield column_values
# TODO: ideally with these next two - you'd allow pulling some region from the sequence
# WITHOUT reading the entire seq into memory - possibly apply some version of limit/offset
[docs]class FastaDataProvider(base.FilteredDataProvider):
"""
Class that returns fasta format data in a list of maps of the form::
{
id: <fasta header id>,
sequence: <joined lines of nucleotide/amino data>
}
"""
settings = {
"ids": "list:str",
}
[docs] def __init__(self, source, ids=None, **kwargs):
"""
:param ids: optionally return only ids (and sequences) that are in this list.
Optional: defaults to None (provide all ids)
:type ids: list or None
"""
source = bx_seq.fasta.FastaReader(source)
# TODO: validate is a fasta
super().__init__(source, **kwargs)
self.ids = ids
# how to do ids?
def __iter__(self):
parent_gen = super().__iter__()
for fasta_record in parent_gen:
yield {"id": fasta_record.name, "seq": fasta_record.text}
[docs]class TwoBitFastaDataProvider(DatasetDataProvider):
"""
Class that returns fasta format data in a list of maps of the form::
{
id: <fasta header id>,
sequence: <joined lines of nucleotide/amino data>
}
"""
settings = {
"ids": "list:str",
}
[docs] def __init__(self, source, ids=None, **kwargs):
"""
:param ids: optionally return only ids (and sequences) that are in this list.
Optional: defaults to None (provide all ids)
:type ids: list or None
"""
source = bx_seq.twobit.TwoBitFile(source)
# TODO: validate is a 2bit
super(FastaDataProvider, self).__init__(source, **kwargs)
# could do in order provided with twobit
self.ids = ids or self.source.keys()
def __iter__(self):
for id_ in self.ids:
yield {"id": id_, "seq": self.source[id_]}
# TODO:
[docs]class WiggleDataProvider(base.LimitedOffsetDataProvider):
"""
Class that returns chrom, pos, data from a wiggle source.
"""
COLUMN_NAMES = ["chrom", "pos", "value"]
settings = {
"named_columns": "bool",
"column_names": "list:str",
}
[docs] def __init__(self, source, named_columns=False, column_names=None, **kwargs):
"""
:param named_columns: optionally return dictionaries keying each column
with 'chrom', 'start', 'end', 'strand', or 'name'.
Optional: defaults to False
:type named_columns: bool
:param column_names: an ordered list of strings that will be used as the keys
for each column in the returned dictionaries.
The number of key, value pairs each returned dictionary has will
be as short as the number of column names provided.
:type column_names:
"""
# TODO: validate is a wig
# still good to maintain a ref to the raw source bc Reader won't
self.raw_source = source
self.parser = bx_wig.Reader(source)
super().__init__(self.parser, **kwargs)
self.named_columns = named_columns
self.column_names = column_names or self.COLUMN_NAMES
def __iter__(self):
parent_gen = super().__iter__()
for three_tuple in parent_gen:
if self.named_columns:
yield dict(zip(self.column_names, three_tuple))
else:
# list is not strictly necessary - but consistent
yield list(three_tuple)
[docs]class BigWigDataProvider(base.LimitedOffsetDataProvider):
"""
Class that returns chrom, pos, data from a wiggle source.
"""
COLUMN_NAMES = ["chrom", "pos", "value"]
settings = {
"named_columns": "bool",
"column_names": "list:str",
}
[docs] def __init__(self, source, chrom, start, end, named_columns=False, column_names=None, **kwargs):
"""
:param chrom: which chromosome within the bigbed file to extract data for
:type chrom: str
:param start: the start of the region from which to extract data
:type start: int
:param end: the end of the region from which to extract data
:type end: int
:param named_columns: optionally return dictionaries keying each column
with 'chrom', 'start', 'end', 'strand', or 'name'.
Optional: defaults to False
:type named_columns: bool
:param column_names: an ordered list of strings that will be used as the keys
for each column in the returned dictionaries.
The number of key, value pairs each returned dictionary has will
be as short as the number of column names provided.
:type column_names:
"""
raise NotImplementedError("Work in progress")
# TODO: validate is a wig
# still good to maintain a ref to the raw source bc Reader won't
# self.raw_source = source
# self.parser = bx_bbi.bigwig_file.BigWigFile(source)
# super(BigWigDataProvider, self).__init__(self.parser, **kwargs)
# self.named_columns = named_columns
# self.column_names = column_names or self.COLUMN_NAMES
def __iter__(self):
parent_gen = super().__iter__()
for three_tuple in parent_gen:
if self.named_columns:
yield dict(zip(self.column_names, three_tuple))
else:
# list is not strictly necessary - but consistent
yield list(three_tuple)
# ----------------------------------------------------------------------------- binary, external conversion or tool
[docs]class DatasetSubprocessDataProvider(external.SubprocessDataProvider):
"""
Create a source from running a subprocess on a dataset's file.
Uses a subprocess as its source and has a dataset (gen. as an input file
for the process).
"""
# TODO: below should be a subclass of this and not RegexSubprocess
[docs] def __init__(self, dataset, *args, **kwargs):
"""
:param args: the list of strings used to build commands.
:type args: variadic function args
"""
raise NotImplementedError("Abstract class")
# super(DatasetSubprocessDataProvider, self).__init__(*args, **kwargs)
# self.dataset = dataset
[docs]class SQliteDataProvider(base.DataProvider):
"""
Data provider that uses a sqlite database file as its source.
Allows any query to be run and returns the resulting rows as sqlite3 row objects
"""
settings = {"query": "str"}
[docs] def __init__(self, source, query=None, **kwargs):
self.query = query
self.connection = sqlite.connect(source.dataset.get_file_name())
super().__init__(source, **kwargs)
def __iter__(self):
if (self.query is not None) and sqlite.is_read_only_query(self.query):
yield from self.connection.cursor().execute(self.query)
else:
yield
[docs]class SQliteDataTableProvider(base.DataProvider):
"""
Data provider that uses a sqlite database file as its source.
Allows any query to be run and returns the resulting rows as arrays of arrays
"""
settings = {"query": "str", "headers": "bool", "limit": "int"}
[docs] def __init__(self, source, query=None, headers=False, limit=sys.maxsize, **kwargs):
self.query = query
self.headers = headers
self.limit = limit
self.connection = sqlite.connect(source.dataset.get_file_name())
super().__init__(source, **kwargs)
def __iter__(self):
if (self.query is not None) and sqlite.is_read_only_query(self.query):
cur = self.connection.cursor()
results = cur.execute(self.query)
if self.headers:
yield [col[0] for col in cur.description]
for i, row in enumerate(results):
if i >= self.limit:
break
yield list(row)
else:
yield
[docs]class SQliteDataDictProvider(base.DataProvider):
"""
Data provider that uses a sqlite database file as its source.
Allows any query to be run and returns the resulting rows as arrays of dicts
"""
settings = {"query": "str"}
[docs] def __init__(self, source, query=None, **kwargs):
self.query = query
self.connection = sqlite.connect(source.dataset.get_file_name())
super().__init__(source, **kwargs)
def __iter__(self):
if (self.query is not None) and sqlite.is_read_only_query(self.query):
cur = self.connection.cursor()
for row in cur.execute(self.query):
yield [{cur.description[i][0]: value for i, value in enumerate(row)}]
else:
yield