Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.visualization.data_providers.basic
import sys
from json import loads
from typing import Iterator
from galaxy.datatypes.tabular import Tabular
from galaxy.model import DatasetInstance
[docs]class BaseDataProvider:
"""
Base class for data providers. Data providers both:
- read and package data from datasets
- write subsets of data to new datasets
"""
original_dataset: DatasetInstance
[docs] def __init__(
self,
converted_dataset=None,
original_dataset=None,
dependencies=None,
error_max_vals="Only the first %i values are returned.",
):
"""Create basic data provider."""
self.converted_dataset = converted_dataset
self.original_dataset = original_dataset
self.dependencies = dependencies
self.error_max_vals = error_max_vals
[docs] def has_data(self, **kwargs):
"""
Returns true if dataset has data in the specified genome window, false
otherwise.
"""
raise Exception("Unimplemented Function")
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs) -> Iterator[str]:
"""
Returns an iterator that provides data in the region chrom:start-end
"""
raise Exception("Unimplemented Function")
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs):
"""
Process data from an iterator to a format that can be provided to client.
"""
raise Exception("Unimplemented Function")
[docs] def get_data(self, chrom, start, end, start_val=0, max_vals=sys.maxsize, **kwargs):
"""
Returns data as specified by kwargs. start_val is the first element to
return and max_vals indicates the number of values to return.
Return value must be a dictionary with the following attributes:
dataset_type, data
"""
iterator = self.get_iterator(chrom, start, end)
return self.process_data(iterator, start_val, max_vals, **kwargs)
[docs] def write_data_to_file(self, filename, **kwargs):
"""
Write data in region defined by chrom, start, and end to a file.
"""
raise Exception("Unimplemented Function")
[docs]class ColumnDataProvider(BaseDataProvider):
"""Data provider for columnar data"""
MAX_LINES_RETURNED = 30000
[docs] def __init__(self, original_dataset, max_lines_returned=MAX_LINES_RETURNED):
# Compatibility check.
if not isinstance(original_dataset.datatype, Tabular):
raise Exception("Data provider can only be used with tabular data")
# Attribute init.
self.original_dataset = original_dataset
# allow throttling
self.max_lines_returned = max_lines_returned
[docs] def get_data(self, columns=None, start_val=0, max_vals=None, skip_comments=True, **kwargs):
"""
Returns data from specified columns in dataset. Format is list of lists
where each list is a line of data.
"""
if not columns:
raise TypeError("parameter required: columns")
# TODO: validate kwargs
try:
max_vals = int(max_vals)
max_vals = min([max_vals, self.max_lines_returned])
except (ValueError, TypeError):
max_vals = self.max_lines_returned
try:
start_val = int(start_val)
start_val = max([start_val, 0])
except (ValueError, TypeError):
start_val = 0
# skip comment lines (if any/avail)
# pre: should have original_dataset and
if (
skip_comments
and self.original_dataset.metadata.comment_lines
and start_val < self.original_dataset.metadata.comment_lines
):
start_val = int(self.original_dataset.metadata.comment_lines)
# columns is an array of ints for now (should handle column names later)
columns = loads(columns)
for column in columns:
assert (column < self.original_dataset.metadata.columns) and (
column >= 0
), "column index (%d) must be positive and less" % (column) + " than the number of columns: %d" % (
self.original_dataset.metadata.columns
)
# set up the response, column lists
response = {}
response["data"] = data = [[] for column in columns]
response["meta"] = meta = [{"min": None, "max": None, "count": 0, "sum": 0} for column in columns]
column_types = [self.original_dataset.metadata.column_types[column] for column in columns]
# function for casting by column_types
def cast_val(val, type):
"""Cast value based on type. Return None if can't be cast"""
if type == "int":
try:
val = int(val)
except ValueError:
return None
elif type == "float":
try:
val = float(val)
except ValueError:
return None
return val
returning_data = False
f = open(self.original_dataset.file_name)
# TODO: add f.seek if given fptr in kwargs
for count, line in enumerate(f):
# check line v. desired start, end
if count < start_val:
continue
if (count - start_val) >= max_vals:
break
returning_data = True
fields = line.split()
fields_len = len(fields)
# NOTE: this will return None/null for abberrant column values (including bad indeces)
for index, column in enumerate(columns):
column_val = None
column_type = column_types[index]
if column < fields_len:
column_val = cast_val(fields[column], column_type)
if column_val is not None:
# if numeric, maintain min, max, sum
if column_type == "float" or column_type == "int":
if (meta[index]["min"] is None) or (column_val < meta[index]["min"]):
meta[index]["min"] = column_val
if (meta[index]["max"] is None) or (column_val > meta[index]["max"]):
meta[index]["max"] = column_val
meta[index]["sum"] += column_val
# maintain a count - for other stats
meta[index]["count"] += 1
data[index].append(column_val)
response["endpoint"] = dict(last_line=(count - 1), file_ptr=f.tell())
f.close()
if not returning_data:
return None
for index, meta in enumerate(response["meta"]):
column_type = column_types[index]
count = meta["count"]
if (column_type == "float" or column_type == "int") and count:
meta["mean"] = float(meta["sum"]) / count
sorted_data = sorted(response["data"][index])
middle_index = (count / 2) - 1
if count % 2 == 0:
meta["median"] = (sorted_data[middle_index] + sorted_data[(middle_index + 1)]) / 2.0
else:
meta["median"] = sorted_data[middle_index]
# ugh ... metadata_data_lines is not a reliable source; hafta have an EOF
return response