Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.visualization.data_providers.basic

import sys
from json import loads
from typing import Iterator

from galaxy.datatypes.tabular import Tabular
from galaxy.model import DatasetInstance


[docs]class BaseDataProvider: """ Base class for data providers. Data providers both: - read and package data from datasets - write subsets of data to new datasets """ original_dataset: DatasetInstance
[docs] def __init__(self, converted_dataset=None, original_dataset=None, dependencies=None, error_max_vals="Only the first %i values are returned."): """ Create basic data provider. """ self.converted_dataset = converted_dataset self.original_dataset = original_dataset self.dependencies = dependencies self.error_max_vals = error_max_vals
[docs] def has_data(self, **kwargs): """ Returns true if dataset has data in the specified genome window, false otherwise. """ raise Exception("Unimplemented Function")
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs) -> Iterator[str]: """ Returns an iterator that provides data in the region chrom:start-end """ raise Exception("Unimplemented Function")
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Process data from an iterator to a format that can be provided to client. """ raise Exception("Unimplemented Function")
[docs] def get_data(self, chrom, start, end, start_val=0, max_vals=sys.maxsize, **kwargs): """ Returns data as specified by kwargs. start_val is the first element to return and max_vals indicates the number of values to return. Return value must be a dictionary with the following attributes: dataset_type, data """ iterator = self.get_iterator(chrom, start, end) return self.process_data(iterator, start_val, max_vals, **kwargs)
[docs] def write_data_to_file(self, filename, **kwargs): """ Write data in region defined by chrom, start, and end to a file. """ raise Exception("Unimplemented Function")
[docs]class ColumnDataProvider(BaseDataProvider): """ Data provider for columnar data """ MAX_LINES_RETURNED = 30000
[docs] def __init__(self, original_dataset, max_lines_returned=MAX_LINES_RETURNED): # Compatibility check. if not isinstance(original_dataset.datatype, Tabular): raise Exception("Data provider can only be used with tabular data") # Attribute init. self.original_dataset = original_dataset # allow throttling self.max_lines_returned = max_lines_returned
[docs] def get_data(self, columns=None, start_val=0, max_vals=None, skip_comments=True, **kwargs): """ Returns data from specified columns in dataset. Format is list of lists where each list is a line of data. """ if not columns: raise TypeError('parameter required: columns') # TODO: validate kwargs try: max_vals = int(max_vals) max_vals = min([max_vals, self.max_lines_returned]) except (ValueError, TypeError): max_vals = self.max_lines_returned try: start_val = int(start_val) start_val = max([start_val, 0]) except (ValueError, TypeError): start_val = 0 # skip comment lines (if any/avail) # pre: should have original_dataset and if(skip_comments and self.original_dataset.metadata.comment_lines and start_val < self.original_dataset.metadata.comment_lines): start_val = int(self.original_dataset.metadata.comment_lines) # columns is an array of ints for now (should handle column names later) columns = loads(columns) for column in columns: assert((column < self.original_dataset.metadata.columns) and (column >= 0)), ( "column index (%d) must be positive and less" % (column) + " than the number of columns: %d" % (self.original_dataset.metadata.columns)) # set up the response, column lists response = {} response['data'] = data = [[] for column in columns] response['meta'] = meta = [{ 'min': None, 'max': None, 'count': 0, 'sum': 0 } for column in columns] column_types = [self.original_dataset.metadata.column_types[column] for column in columns] # function for casting by column_types def cast_val(val, type): """ Cast value based on type. Return None if can't be cast """ if type == 'int': try: val = int(val) except ValueError: return None elif type == 'float': try: val = float(val) except ValueError: return None return val returning_data = False f = open(self.original_dataset.file_name) # TODO: add f.seek if given fptr in kwargs for count, line in enumerate(f): # check line v. desired start, end if count < start_val: continue if (count - start_val) >= max_vals: break returning_data = True fields = line.split() fields_len = len(fields) # NOTE: this will return None/null for abberrant column values (including bad indeces) for index, column in enumerate(columns): column_val = None column_type = column_types[index] if column < fields_len: column_val = cast_val(fields[column], column_type) if column_val is not None: # if numeric, maintain min, max, sum if(column_type == 'float' or column_type == 'int'): if((meta[index]['min'] is None) or (column_val < meta[index]['min'])): meta[index]['min'] = column_val if((meta[index]['max'] is None) or (column_val > meta[index]['max'])): meta[index]['max'] = column_val meta[index]['sum'] += column_val # maintain a count - for other stats meta[index]['count'] += 1 data[index].append(column_val) response['endpoint'] = dict(last_line=(count - 1), file_ptr=f.tell()) f.close() if not returning_data: return None for index, meta in enumerate(response['meta']): column_type = column_types[index] count = meta['count'] if((column_type == 'float' or column_type == 'int') and count): meta['mean'] = float(meta['sum']) / count sorted_data = sorted(response['data'][index]) middle_index = (count / 2) - 1 if count % 2 == 0: meta['median'] = ((sorted_data[middle_index] + sorted_data[(middle_index + 1)]) / 2.0) else: meta['median'] = sorted_data[middle_index] # ugh ... metadata_data_lines is not a reliable source; hafta have an EOF return response