Source code for galaxy.visualization.data_providers.genome

"""
Data providers for genome visualizations.
"""

import abc
import itertools
import math
import os
import random
import re
import sys
from contextlib import contextmanager
from json import loads
from typing import (
    Any,
    Dict,
    IO,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
)

import pysam
from bx.bbi.bigbed_file import BigBedFile
from bx.bbi.bigwig_file import BigWigFile
from bx.interval_index_file import Indexes

from galaxy.datatypes.interval import (
    Bed,
    Gff,
    Gtf,
)
from galaxy.datatypes.util.gff_util import (
    convert_gff_coords_to_bed,
    GFFFeature,
    GFFInterval,
    GFFReaderWrapper,
    parse_gff_attributes,
)
from galaxy.exceptions import MessageException
from galaxy.model import DatasetInstance
from galaxy.visualization.data_providers.basic import BaseDataProvider
from galaxy.visualization.data_providers.cigar import get_ref_based_read_seq_and_cigar

#
# Utility functions.
#

# pysam 0.16.0.1 emits logs containing the word 'Error', this can confuse the stdout/stderr checkers.
# Can be removed once https://github.com/pysam-developers/pysam/issues/939 is resolved.
pysam.set_verbosity(0)

PAYLOAD_LIST_TYPE = List[Optional[Union[str, int, float, List[Tuple[int, int]]]]]


[docs]def float_nan(n): """ Return None instead of NaN to pass jQuery 1.4's strict JSON """ if n != n: # NaN != NaN return None else: return float(n)
[docs]def get_bounds(reads, start_pos_index, end_pos_index): """ Returns the minimum and maximum position for a set of reads. """ max_low = sys.maxsize max_high = -sys.maxsize for read in reads: if read[start_pos_index] < max_low: max_low = read[start_pos_index] if read[end_pos_index] > max_high: max_high = read[end_pos_index] return max_low, max_high
def _convert_between_ucsc_and_ensemble_naming(chrom): """ Convert between UCSC chromosome ('chr1') naming conventions and Ensembl naming conventions ('1') """ if chrom.startswith("chr"): # Convert from UCSC to Ensembl return chrom[3:] else: # Convert from Ensembl to UCSC return "chr" + chrom def _chrom_naming_matches(chrom1, chrom2): return (chrom1.startswith("chr") and chrom2.startswith("chr")) or ( not chrom1.startswith("chr") and not chrom2.startswith("chr") )
[docs]class FeatureLocationIndexDataProvider(BaseDataProvider): """ Reads/writes/queries feature location index (FLI) datasets. """
[docs] def __init__(self, converted_dataset): self.converted_dataset = converted_dataset
[docs] def get_data(self, query): if self.converted_dataset is None or not self.converted_dataset.is_ok: raise MessageException("The dataset is not available or is in an error state.") # Init. result = [] with open(self.converted_dataset.get_file_name()) as textloc_file: line = textloc_file.readline() if not line: raise MessageException("The dataset is empty.") try: line_len = int(line) except ValueError: raise MessageException(f"Expected an integer at first line, but found: '{line}'") if line_len < 1: raise MessageException(f"The first line must be a positive integer, but found: {line_len}") file_len = os.path.getsize(self.converted_dataset.get_file_name()) query = query.lower() # Find query in file using binary search. low = 0 high = int(file_len / line_len) while low < high: mid: int = (low + high) // 2 position = mid * line_len textloc_file.seek(position) # Compare line with query and update low, high. line = textloc_file.readline() if line < query: low = mid + 1 else: high = mid # Need to move back one line because last line read may be included in # results. position = low * line_len textloc_file.seek(position) # At right point in file, generate hits. while True: line = textloc_file.readline() if not line.startswith(query): break if line[-1:] == "\n": line = line[:-1] result.append(line.split()[1:]) return result
[docs]class GenomeDataProvider(BaseDataProvider): """ Base class for genome data providers. All genome providers use BED coordinate format (0-based, half-open coordinates) for both queries and returned data. """ dataset_type: str # Mapping from column name to payload data; this mapping is used to create # filters. Key is column name, value is a dict with mandatory key 'index' # and optional key 'name'. E.g. this defines column 4: # col_name_data_attr_mapping = {4 : { index: 5, name: 'Score' } } col_name_data_attr_mapping: Dict[Union[str, int], Dict] = {}
[docs] def __init__( self, converted_dataset=None, original_dataset=None, dependencies=None, error_max_vals="Only the first %i %s in this region are displayed.", ): super().__init__( converted_dataset=converted_dataset, original_dataset=original_dataset, dependencies=dependencies, error_max_vals=error_max_vals, )
[docs] def write_data_to_file(self, regions, filename): """ Write data in region defined by chrom, start, and end to a file. """ raise Exception("Unimplemented Function")
[docs] def valid_chroms(self): """ Returns chroms/contigs that the dataset contains """ return None # by default
[docs] def has_data(self, chrom, start, end, **kwargs): """ Returns true if dataset has data in the specified genome window, false otherwise. """ raise Exception("Unimplemented Function")
[docs] @contextmanager def open_data_file(self): """ Open data file for reading data. """ raise Exception("Unimplemented Function")
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs) -> Iterator[str]: """ Returns an iterator that provides data in the region chrom:start-end """ raise Exception("Unimplemented Function")
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Process data from an iterator to a format that can be provided to client. """ raise Exception("Unimplemented Function")
[docs] def get_data(self, chrom=None, low=None, high=None, start_val=0, max_vals=sys.maxsize, **kwargs): """ Returns data in region defined by chrom, start, and end. start_val and max_vals are used to denote the data to return: start_val is the first element to return and max_vals indicates the number of values to return. Return value must be a dictionary with the following attributes: dataset_type, data """ start, end = int(low), int(high) with self.open_data_file() as data_file: iterator = self.get_iterator(data_file, chrom, start, end, **kwargs) data = self.process_data(iterator, start_val, max_vals, start=start, end=end, **kwargs) return data
[docs] def get_genome_data(self, chroms_info, **kwargs): """ Returns data for complete genome. """ genome_data = [] for chrom_info in chroms_info["chrom_info"]: chrom = chrom_info["chrom"] chrom_len = chrom_info["len"] chrom_data = self.get_data(chrom, 0, chrom_len, **kwargs) # FIXME: data providers probably should never return None. # Some data providers return None when there's no data, so # create a dummy dict if necessary. if not chrom_data: chrom_data = {"data": None} chrom_data["region"] = "%s:%i-%i" % (chrom, 0, chrom_len) genome_data.append(chrom_data) return {"data": genome_data, "dataset_type": self.dataset_type}
[docs] def get_filters(self): """ Returns filters for provider's data. Return value is a list of filters; each filter is a dictionary with the keys 'name', 'index', 'type'. NOTE: This method uses the original dataset's datatype and metadata to create the filters. """ # Get column names. try: column_names = self.original_dataset.datatype.column_names # type: ignore[attr-defined] except AttributeError: try: column_names = list(range(self.original_dataset.metadata.columns)) except Exception: # Give up return [] # Dataset must have column types; if not, cannot create filters. try: column_types = self.original_dataset.metadata.column_types except AttributeError: return [] # Create and return filters. filters = [] if self.original_dataset.metadata.viz_filter_cols: for viz_col_index in self.original_dataset.metadata.viz_filter_cols: # Some columns are optional, so can't assume that a filter # column is in dataset. if viz_col_index >= len(column_names): continue col_name = column_names[viz_col_index] # Make sure that column has a mapped index. If not, do not add filter. try: attrs = self.col_name_data_attr_mapping[col_name] except KeyError: continue filters.append({"name": attrs["name"], "type": column_types[viz_col_index], "index": attrs["index"]}) return filters
[docs] def get_default_max_vals(self): return 5000
# # -- Base mixins and providers -- #
[docs]class FilterableMixin: original_dataset: DatasetInstance
[docs] def get_filters(self): """Returns a dataset's filters.""" # Get filters. # TODOs: # (a) might be useful to move this into each datatype's set_meta method; # (b) could look at first N lines to ensure GTF attribute types are consistent. # filters = [] # HACK: first 8 fields are for drawing, so start filter column index at 9. filter_col = 8 if isinstance(self.original_dataset.datatype, Gff): # Can filter by score and GTF attributes. filters = [ {"name": "Score", "type": "number", "index": filter_col, "tool_id": "Filter1", "tool_exp_name": "c6"} ] filter_col += 1 if isinstance(self.original_dataset.datatype, Gtf): # Create filters based on dataset metadata. for name, a_type in self.original_dataset.metadata.attribute_types.items(): if a_type in ["int", "float"]: filters.append( { "name": name, "type": "number", "index": filter_col, "tool_id": "gff_filter_by_attribute", "tool_exp_name": name, } ) filter_col += 1 elif isinstance(self.original_dataset.datatype, Bed): # Can filter by score column only. filters = [ {"name": "Score", "type": "number", "index": filter_col, "tool_id": "Filter1", "tool_exp_name": "c5"} ] return filters
[docs]class TabixDataProvider(GenomeDataProvider, FilterableMixin): """ Tabix index data provider for the Galaxy track browser. """ dataset_type = "tabix" col_name_data_attr_mapping: Dict[Union[str, int], Dict] = {4: {"index": 4, "name": "Score"}}
[docs] @contextmanager def open_data_file(self): # We create a symlink to the index file. This is # required until https://github.com/pysam-developers/pysam/pull/586 is merged. index_path = self.converted_dataset.get_file_name() with pysam.TabixFile(self.dependencies["bgzip"].get_file_name(), index=index_path) as f: yield f
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs) -> Iterator[str]: # chrom must be a string, start/end integers. # in previous versions of pysam, unicode was accepted for chrom, but not in 8.4 chrom = str(chrom) start = int(start) end = int(end) if end >= (2 << 29): end = 2 << 29 - 1 # Tabix-enforced maximum # Get iterator using either naming scheme. iterator: Iterator[str] = iter([]) if chrom in data_file.contigs: iterator = data_file.fetch(reference=chrom, start=start, end=end) else: # Try alternative naming scheme. chrom = _convert_between_ucsc_and_ensemble_naming(chrom) if chrom in data_file.contigs: iterator = data_file.fetch(reference=chrom, start=start, end=end) return iterator
[docs] def write_data_to_file(self, regions, filename): with self.open_data_file() as data_file, open(filename, "w") as out: for region in regions: # Write data in region. iterator = self.get_iterator(data_file, region.chrom, region.start, region.end) for line in iterator: out.write(f"{line}\n")
# # -- Interval data providers -- #
[docs]class IntervalDataProvider(GenomeDataProvider): """ Processes interval data from native format to payload format. Payload format: [ uid (offset), start, end, name, strand, thick_start, thick_end, blocks ] """ dataset_type = "interval_index"
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs): raise Exception("Unimplemented Function")
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Provides """ # Build data to return. Payload format is: # [ <guid/offset>, <start>, <end>, <name>, <strand> ] # # First three entries are mandatory, others are optional. # filter_cols = loads(kwargs.get("filter_cols", "[]")) no_detail = "no_detail" in kwargs rval = [] message = None # Subtract one b/c columns are 1-based but indices are 0-based. def col_fn(col): return None if col is None else col - 1 start_col = self.original_dataset.metadata.startCol - 1 end_col = self.original_dataset.metadata.endCol - 1 strand_col = col_fn(self.original_dataset.metadata.strandCol) name_col = col_fn(self.original_dataset.metadata.nameCol) for count, line in enumerate(iterator): if count < start_val: continue if max_vals and count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "features") break feature = line.split() length = len(feature) # Unique id is just a hash of the line payload: PAYLOAD_LIST_TYPE = [hash(line), int(feature[start_col]), int(feature[end_col])] if no_detail: rval.append(payload) continue # Name, strand. if name_col: payload.append(feature[name_col]) if strand_col: # Put empty name as placeholder. if not name_col: payload.append("") payload.append(feature[strand_col]) # Score (filter data) if length >= 5 and filter_cols and filter_cols[0] == "Score": try: payload.append(float(feature[4])) except Exception: payload.append(feature[4]) rval.append(payload) return {"data": rval, "message": message}
[docs] def write_data_to_file(self, regions, filename): raise Exception("Unimplemented Function")
[docs]class IntervalTabixDataProvider(TabixDataProvider, IntervalDataProvider): """ Provides data from a BED file indexed via tabix. """
# # -- BED data providers -- #
[docs]class BedDataProvider(GenomeDataProvider): """ Processes BED data from native format to payload format. Payload format: [ uid (offset), start, end, name, strand, thick_start, thick_end, blocks ] """ dataset_type = "interval_index"
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs): raise Exception("Unimplemented Method")
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Provides """ # Build data to return. Payload format is: # [ <guid/offset>, <start>, <end>, <name>, <strand>, <thick_start>, # <thick_end>, <blocks> ] # # First three entries are mandatory, others are optional. # filter_cols = loads(kwargs.get("filter_cols", "[]")) no_detail = "no_detail" in kwargs rval = [] message = None for count, line in enumerate(iterator): if count < start_val: continue if max_vals and count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "features") break # TODO: can we use column metadata to fill out payload? # TODO: use function to set payload data feature = line.split() length = len(feature) # Unique id is just a hash of the line payload: PAYLOAD_LIST_TYPE = [hash(line), int(feature[1]), int(feature[2])] if no_detail: rval.append(payload) continue # Name, strand, thick start, thick end. if length >= 4: payload.append(feature[3]) if length >= 6: payload.append(feature[5]) if length >= 8: payload.append(int(feature[6])) payload.append(int(feature[7])) # Blocks. if length >= 12: block_sizes = [int(n) for n in feature[10].split(",") if n != ""] block_starts = [int(n) for n in feature[11].split(",") if n != ""] blocks = list(zip(block_sizes, block_starts)) payload.append( [(int(feature[1]) + block[1], int(feature[1]) + block[1] + block[0]) for block in blocks] ) # Score (filter data) if length >= 5 and filter_cols and filter_cols[0] == "Score": # If dataset doesn't have name/strand/thick start/thick end/blocks, # add placeholders. There should be 8 entries if all attributes # are present. payload.extend([None for i in range(8 - len(payload))]) try: payload.append(float(feature[4])) except Exception: payload.append(feature[4]) rval.append(payload) return {"data": rval, "dataset_type": self.dataset_type, "message": message}
[docs] def write_data_to_file(self, regions, filename): with open(filename, "w") as out: for region in regions: # Write data in region. chrom = region.chrom start = region.start end = region.end with self.open_data_file() as data_file: iterator = self.get_iterator(data_file, chrom, start, end) for line in iterator: out.write(f"{line}\n")
[docs]class BedTabixDataProvider(TabixDataProvider, BedDataProvider): """ Provides data from a BED file indexed via tabix. """
[docs]class RawBedDataProvider(BedDataProvider): """ Provide data from BED file. NOTE: this data provider does not use indices, and hence will be very slow for large datasets. """
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs): # Read first line in order to match chrom naming format. line = data_file.readline() dataset_chrom = line.split()[0] if not _chrom_naming_matches(chrom, dataset_chrom): chrom = _convert_between_ucsc_and_ensemble_naming(chrom) # Undo read. data_file.seek(0) def line_filter_iter(): with open(self.original_dataset.get_file_name()) as data_file: for line in data_file: if line.startswith("track") or line.startswith("browser"): continue feature = line.split() feature_chrom = feature[0] feature_start = int(feature[1]) feature_end = int(feature[2]) if ( (chrom is not None and feature_chrom != chrom) or (start is not None and feature_start > end) or (end is not None and feature_end < start) ): continue yield line return line_filter_iter()
# # -- VCF data providers -- #
[docs]class VcfDataProvider(GenomeDataProvider): """ Abstract class that processes VCF data from native format to payload format. Payload format: An array of entries for each locus in the file. Each array has the following entries: 1. GUID (unused) 2. location (0-based) 3. reference base(s) 4. alternative base(s) 5. quality score 6. whether variant passed filter 7. sample genotypes -- a single string with samples separated by commas; empty string denotes the reference genotype 8. allele counts for each alternative """ col_name_data_attr_mapping: Dict[Union[str, int], Dict] = {"Qual": {"index": 6, "name": "Qual"}} dataset_type = "variant"
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Returns a dict with the following attributes:: data - a list of variants with the format .. raw:: text [<guid>, <start>, <end>, <name>, cigar, seq] message - error/informative message """ data = [] message = None def get_mapping(ref, alt): """ Returns ( offset, new_seq, cigar ) tuple that defines mapping of alt to ref. Cigar format is an array of [ op_index, length ] pairs where op_index is the 0-based index into the string "MIDNSHP=X" """ cig_ops = "MIDNSHP=X" ref_len = len(ref) alt_len = len(alt) # Substitutions? if ref_len == alt_len: return 0, alt, [[cig_ops.find("M"), ref_len]] # Deletions? alt_in_ref_index = ref.find(alt) if alt_in_ref_index != -1: return alt_in_ref_index, ref[alt_in_ref_index + 1 :], [[cig_ops.find("D"), ref_len - alt_len]] # Insertions? ref_in_alt_index = alt.find(ref) if ref_in_alt_index != -1: return ref_in_alt_index, alt[ref_in_alt_index + 1 :], [[cig_ops.find("I"), alt_len - ref_len]] # Pack data. genotype_re = re.compile(r"/|\|") for count, line in enumerate(iterator): if count < start_val: continue if max_vals and count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "features") break # Split line and aggregate data. feature = line.split() pos, c_id, ref, alt, qual, c_filter, info = feature[1:8] # Format and samples data are optional. samples_data = [] if len(feature) > 8: samples_data = feature[9:] # VCF is 1-based but provided position is 0-based. pos = int(pos) - 1 # FIXME: OK to skip? if alt == ".": count -= 1 continue # Set up array to track allele counts. allele_counts = [0 for i in range(alt.count(",") + 1)] sample_gts = [] if samples_data: # Process and pack samples' genotype and count alleles across samples. alleles_seen: Dict[int, bool] = {} has_alleles = False for sample in samples_data: # Parse and count alleles. genotype = sample.split(":")[0] has_alleles = False alleles_seen.clear() for allele_str in genotype_re.split(genotype): try: # This may throw a ValueError if allele is missing. allele = int(allele_str) # Only count allele if it hasn't been seen yet. if allele != 0 and allele not in alleles_seen: allele_counts[allele - 1] += 1 alleles_seen[allele] = True has_alleles = True except ValueError: pass # If no alleles, use empty string as proxy. if not has_alleles: genotype = "" sample_gts.append(genotype) else: # No samples, so set allele count and sample genotype manually. allele_counts = [1] sample_gts = ["1/1"] # Add locus data. locus_data = [-1, pos, c_id, ref, alt, qual, c_filter, ",".join(sample_gts)] locus_data.extend(allele_counts) data.append(locus_data) return {"data": data, "message": message}
[docs] def write_data_to_file(self, regions, filename): out = open(filename, "w") with self.open_data_file() as data_file: for region in regions: # Write data in region. iterator = self.get_iterator(data_file, region.chrom, region.start, region.end) for line in iterator: out.write(f"{line}\n")
[docs]class VcfTabixDataProvider(TabixDataProvider, VcfDataProvider): """ Provides data from a VCF file indexed via tabix. """ dataset_type = "variant"
[docs]class RawVcfDataProvider(VcfDataProvider): """ Provide data from VCF file. NOTE: this data provider does not use indices, and hence will be very slow for large datasets. """
[docs] @contextmanager def open_data_file(self): with open(self.original_dataset.get_file_name()) as f: yield f
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs): # Skip comments. line = None for line in data_file: if not line.startswith("#"): break # If last line is a comment, there are no data lines. if line.startswith("#"): return [] # Match chrom naming format. if line: dataset_chrom = line.split()[0] if not _chrom_naming_matches(chrom, dataset_chrom): chrom = _convert_between_ucsc_and_ensemble_naming(chrom) def line_in_region(vcf_line, chrom, start, end): """Returns true if line is in region.""" variant_chrom, variant_start = vcf_line.split()[0:2] # VCF format is 1-based. variant_start = int(variant_start) - 1 return variant_chrom == chrom and variant_start >= start and variant_start <= end def line_filter_iter(): """Yields lines in data that are in region chrom:start-end""" # Yield data line read above. if line_in_region(line, chrom, start, end): yield line # Search for and yield other data lines. for data_line in data_file: if line_in_region(data_line, chrom, start, end): yield data_line return line_filter_iter()
[docs]class BamDataProvider(GenomeDataProvider, FilterableMixin): """ Provides access to intervals from a sorted indexed BAM file. Coordinate data is reported in BED format: 0-based, half-open. """ dataset_type = "bai"
[docs] def get_filters(self): """ Returns filters for dataset. """ # HACK: first 7 fields are for drawing, so start filter column index at 7. filter_col = 7 filters = [] filters.append({"name": "Mapping Quality", "type": "number", "index": filter_col}) return filters
[docs] def write_data_to_file(self, regions, filename): """ Write reads in regions to file. """ # Open current BAM file using index. bamfile = pysam.AlignmentFile( self.original_dataset.get_file_name(), mode="rb", index_filename=self.converted_dataset.get_file_name() ) # TODO: write headers as well? new_bamfile = pysam.AlignmentFile(filename, template=bamfile, mode="wb") for region in regions: # Write data from region. chrom = region.chrom start = region.start end = region.end try: data = bamfile.fetch(start=start, end=end, reference=chrom) except ValueError: # Try alternative chrom naming. chrom = _convert_between_ucsc_and_ensemble_naming(chrom) try: data = bamfile.fetch(start=start, end=end, reference=chrom) except ValueError: return None # Write reads in region. for read in data: new_bamfile.write(read) # Cleanup. new_bamfile.close() bamfile.close()
[docs] @contextmanager def open_data_file(self): # Attempt to open the BAM file with index with pysam.AlignmentFile( self.original_dataset.get_file_name(), mode="rb", index_filename=self.converted_dataset.get_file_name() ) as f: yield f
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs) -> Iterator[str]: """ Returns an iterator that provides data in the region chrom:start-end """ # Fetch and return data. chrom = str(chrom) start = int(start) end = int(end) try: data = data_file.fetch(start=start, end=end, reference=chrom) except ValueError: # Try alternative chrom naming. chrom = _convert_between_ucsc_and_ensemble_naming(chrom) try: data = data_file.fetch(start=start, end=end, reference=chrom) except ValueError: return iter([]) return data
[docs] def process_data( self, iterator, start_val=0, max_vals=None, ref_seq=None, iterator_type="nth", mean_depth=None, start=0, end=0, **kwargs, ): """ Returns a dict with the following attributes:: data - a list of reads with the format [<guid>, <start>, <end>, <name>, <read_1>, <read_2>, [empty], <mapq_scores>] where <read_1> has the format [<start>, <end>, <cigar>, <strand>, <read_seq>] and <read_2> has the format [<start>, <end>, <cigar>, <strand>, <read_seq>] Field 7 is empty so that mapq scores' location matches that in single-end reads. For single-end reads, read has format: [<guid>, <start>, <end>, <name>, <cigar>, <strand>, <seq>, <mapq_score>] NOTE: read end and sequence data are not valid for reads outside of requested region and should not be used. max_low - lowest coordinate for the returned reads max_high - highest coordinate for the returned reads message - error/informative message """ # No iterator indicates no reads. if iterator is None: return {"data": [], "message": None} # # Helper functions. # def decode_strand(read_flag, mask): """Decode strand from read flag.""" strand_flag = read_flag & mask == 0 if strand_flag: return "+" else: return "-" def _random_read_iterator(read_iterator, threshold): """ An iterator that returns a random stream of reads from the read_iterator as well as corresponding pairs for returned reads. threshold is a value in [0,1] that denotes the percentage of reads to return. """ for e in read_iterator: if e.qname in paired_pending or random.uniform(0, 1) <= threshold: yield e def _nth_read_iterator(read_iterator, threshold): """ An iterator that returns every nth read. """ # Convert threshold to N for stepping through iterator. n = int(1 / threshold) return itertools.islice(read_iterator, None, None, n) # -- Choose iterator. -- # Calculate threshold for non-sequential iterators based on mean_depth and read length. try: first_read = next(iterator) except StopIteration: # no reads. return {"data": [], "message": None, "max_low": start, "max_high": start} read_len = len(first_read.seq) num_reads = max((end - start) * mean_depth / float(read_len), 1) threshold = float(max_vals) / num_reads iterator = itertools.chain(iter([first_read]), iterator) # Use specified iterator type, save for when threshold is >= 1. # A threshold of >= 1 indicates all reads are to be returned, so no # sampling needed and seqential iterator will be used. if iterator_type == "sequential" or threshold >= 1: read_iterator = iterator elif iterator_type == "random": read_iterator = _random_read_iterator(iterator, threshold) elif iterator_type == "nth": read_iterator = _nth_read_iterator(iterator, threshold) # # Encode reads as list of lists. # results = [] paired_pending: Dict[str, Dict[str, Any]] = {} unmapped = 0 message = None count = 0 for read in read_iterator: if count < start_val: continue if (count - start_val - unmapped) >= max_vals: message = self.error_max_vals % (max_vals, "reads") break # If not mapped, skip read. is_mapped = read.flag & 0x0004 == 0 if not is_mapped: unmapped += 1 continue qname = read.qname seq = read.seq strand = decode_strand(read.flag, 0x0010) if read.cigar is not None: read_len = sum(cig[1] for cig in read.cigar) # Use cigar to determine length else: read_len = len(seq) # If no cigar, just use sequence length if read.is_proper_pair: if qname in paired_pending: # Found pair. pair = paired_pending[qname] results.append( [ hash("%i_%s" % (pair["start"], qname)), pair["start"], read.pos + read_len, qname, [pair["start"], pair["end"], pair["cigar"], pair["strand"], pair["seq"]], [read.pos, read.pos + read_len, read.cigar, strand, seq], None, [pair["mapq"], read.mapq], ] ) del paired_pending[qname] else: # Insert first of pair. paired_pending[qname] = { "start": read.pos, "end": read.pos + read_len, "seq": seq, "mate_start": read.mpos, "rlen": read_len, "strand": strand, "cigar": read.cigar, "mapq": read.mapq, } count += 1 else: results.append( [ hash("%i_%s" % (read.pos, qname)), read.pos, read.pos + read_len, qname, read.cigar, strand, read.seq, read.mapq, ] ) count += 1 # Take care of reads whose mates are out of range. for qname, read in paired_pending.items(): if read["mate_start"] < read["start"]: # Mate is before read. read_start = read["mate_start"] read_end = read["end"] # Make read_1 start=end so that length is 0 b/c we don't know # read length. r1 = [read["mate_start"], read["mate_start"]] r2 = [read["start"], read["end"], read["cigar"], read["strand"], read["seq"]] else: # Mate is after read. read_start = read["start"] # Make read_2 start=end so that length is 0 b/c we don't know # read length. Hence, end of read is start of read_2. read_end = read["mate_start"] r1 = [read["start"], read["end"], read["cigar"], read["strand"], read["seq"]] r2 = [read["mate_start"], read["mate_start"]] results.append( [hash("%i_%s" % (read_start, qname)), read_start, read_end, qname, r1, r2, [read["mapq"], 125]] ) # Clean up. TODO: is this needed? If so, we'll need a cleanup function after processing the data. # bamfile.close() def compress_seq_and_cigar(read, start_field, cigar_field, seq_field): """ Use reference-based compression to compress read sequence and cigar. """ read_seq, read_cigar = get_ref_based_read_seq_and_cigar( read[seq_field].upper(), read[start_field], ref_seq.sequence, ref_seq.start, read[cigar_field] ) read[seq_field] = read_seq read[cigar_field] = read_cigar def convert_cigar(read, start_field, cigar_field, seq_field): """ Convert read cigar from pysam format to string format. """ cigar_ops = "MIDNSHP=X" read_cigar = "" for op_tuple in read[cigar_field]: read_cigar += "%i%s" % (op_tuple[1], cigar_ops[op_tuple[0]]) read[cigar_field] = read_cigar # Choose method for processing reads. Use reference-based compression # if possible. Otherwise, convert cigar. if ref_seq: # Uppercase for easy comparison. ref_seq.sequence = ref_seq.sequence.upper() process_read = compress_seq_and_cigar else: process_read = convert_cigar # Process reads. for read in results: if isinstance(read[5], list): # Paired-end read. if len(read[4]) > 2: process_read(read[4], 0, 2, 4) if len(read[5]) > 2: process_read(read[5], 0, 2, 4) else: # Single-end read. process_read(read, 1, 4, 6) max_low, max_high = get_bounds(results, 1, 2) return {"data": results, "message": message, "max_low": max_low, "max_high": max_high}
[docs]class SamDataProvider(BamDataProvider): dataset_type = "bai"
[docs] def __init__(self, converted_dataset=None, original_dataset=None, dependencies=None): """Create SamDataProvider.""" super().__init__( converted_dataset=converted_dataset, original_dataset=original_dataset, dependencies=dependencies ) # To use BamDataProvider, original dataset must be BAM and # converted dataset must be BAI. Use BAI from BAM metadata. if converted_dataset: self.original_dataset = converted_dataset self.converted_dataset = converted_dataset.metadata.bam_index
[docs]class BBIDataProvider(GenomeDataProvider): """ BBI data provider for the Galaxy track browser. """ dataset_type = "bigwig" @abc.abstractmethod def _get_dataset(self) -> Tuple[IO[bytes], Union[BigBedFile, BigWigFile]]: ...
[docs] def valid_chroms(self): # No way to return this info as of now return None
[docs] def has_data(self, chrom): f, bbi = self._get_dataset() all_dat = bbi.query(chrom, 0, 2147483647, 1) or bbi.query( _convert_between_ucsc_and_ensemble_naming(chrom), 0, 2147483647, 1 ) f.close() return all_dat is not None
[docs] def get_data(self, chrom, start, end, start_val=0, max_vals=None, num_samples=1000, **kwargs): start = int(start) end = int(end) # Helper function for getting summary data regardless of chromosome # naming convention. def _summarize_bbi(bbi, chrom, start, end, num_points): return bbi.summarize(chrom, start, end, num_points) or bbi.summarize( _convert_between_ucsc_and_ensemble_naming(chrom), start, end, num_points ) # Bigwig can be a standalone bigwig file, in which case we use # original_dataset, or coming from wig->bigwig conversion in # which we use converted_dataset f, bbi = self._get_dataset() # If stats requested, compute overall summary data for the range # start:endbut no reduced data. This is currently used by client # to determine the default range. if "stats" in kwargs: summary = _summarize_bbi(bbi, chrom, start, end, 1) f.close() min_val = 0 max_val = 0 mean = 0 sd = 0.0 if summary is not None: # Does the summary contain any defined values? valid_count = summary.valid_count[0] if summary.valid_count > 0: # Compute $\mu \pm 2\sigma$ to provide an estimate for upper and lower # bounds that contain ~95% of the data. mean = summary.sum_data[0] / valid_count var = max(summary.sum_squares[0] - mean, 0) # Prevent variance underflow. if valid_count > 1: var /= valid_count - 1 sd = math.sqrt(var) min_val = summary.min_val[0] max_val = summary.max_val[0] return dict(data=dict(min=min_val, max=max_val, mean=mean, sd=sd)) def summarize_region(bbi, chrom, start, end, num_points): """ Returns results from summarizing a region using num_points. NOTE: num_points cannot be greater than end - start or BBI will return None for all positions. """ result = [] # Get summary; this samples at intervals of length # (end - start)/num_points -- i.e. drops any fractional component # of interval length. if summary := _summarize_bbi(bbi, chrom, start, end, num_points): # mean = summary.sum_data / summary.valid_count # Standard deviation by bin, not yet used # var = summary.sum_squares - mean # var /= minimum( valid_count - 1, 1 ) # sd = sqrt( var ) pos = start step_size = (end - start) / num_points for i in range(num_points): result.append((pos, float_nan(summary.sum_data[i] / summary.valid_count[i]))) pos += step_size return result # Approach is different depending on region size. num_samples = int(num_samples) if end - start < num_samples: # Get values for individual bases in region, including start and end. # To do this, need to increase end to next base and request number of points. num_points = end - start + 1 end += 1 else: # # The goal is to sample the region between start and end uniformly # using ~N (num_samples) data points. The challenge is that the size of # sampled intervals rarely is full bases, so sampling using N points # will leave the end of the region unsampled due to remainders for # each interval. To recitify this, a new N is calculated based on the # step size that covers as much of the region as possible. # # However, this still leaves some of the region unsampled. This # could be addressed by repeatedly sampling remainder using a # smaller and smaller step_size, but that would require iteratively # going to BBI, which could be time consuming. # # Start with N samples. num_points = num_samples step_size = (end - start) // num_points # Add additional points to sample in the remainder not covered by # the initial N samples. remainder_start = start + step_size * num_points additional_points = (end - remainder_start) // step_size num_points += additional_points result = summarize_region(bbi, chrom, start, end, num_points) # Cleanup and return. f.close() return {"data": result, "dataset_type": self.dataset_type}
[docs]class BigBedDataProvider(BBIDataProvider): def _get_dataset(self): # Nothing converts to bigBed so we don't consider converted dataset f = open(self.original_dataset.get_file_name(), "rb") return f, BigBedFile(file=f)
[docs]class BigWigDataProvider(BBIDataProvider): """ Provides data from BigWig files; position data is reported in 1-based coordinate system, i.e. wiggle format. """ def _get_dataset(self): if self.converted_dataset is not None: f = open(self.converted_dataset.get_file_name(), "rb") else: f = open(self.original_dataset.get_file_name(), "rb") return f, BigWigFile(file=f)
[docs]class IntervalIndexDataProvider(GenomeDataProvider, FilterableMixin): """ Interval index files used for GFF, Pileup files. """ col_name_data_attr_mapping = {4: {"index": 4, "name": "Score"}} dataset_type = "interval_index"
[docs] def write_data_to_file(self, regions, filename): index = Indexes(self.converted_dataset.get_file_name()) with open(self.original_dataset.get_file_name()) as source, open(filename, "w") as out: for region in regions: # Write data from region. chrom = region.chrom start = region.start end = region.end for _start, _end, offset in index.find(chrom, start, end): source.seek(offset) # HACK: write differently depending on original dataset format. if self.original_dataset.ext not in ["gff", "gff3", "gtf"]: line = source.readline() out.write(line) else: reader = GFFReaderWrapper(source, fix_strand=True) feature = next(reader) for interval in feature.intervals: out.write("\t".join(interval.fields) + "\n")
[docs] @contextmanager def open_data_file(self): i = Indexes(self.converted_dataset.get_file_name()) yield i
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs) -> Iterator[str]: """ Returns an iterator for data in data_file in chrom:start-end """ if chrom not in data_file.indexes: # Try alternative naming. chrom = _convert_between_ucsc_and_ensemble_naming(chrom) return data_file.find(chrom, start, end)
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): results = [] message = None with open(self.original_dataset.get_file_name()) as source: # Build data to return. Payload format is: # [ <guid/offset>, <start>, <end>, <name>, <score>, <strand>, <thick_start>, # <thick_end>, <blocks> ] # # First three entries are mandatory, others are optional. filter_cols = loads(kwargs.get("filter_cols", "[]")) no_detail = "no_detail" in kwargs for count, val in enumerate(iterator): offset = val[2] if count < start_val: continue if count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "features") break source.seek(offset) # TODO: can we use column metadata to fill out payload? # GFF dataset. reader = GFFReaderWrapper(source, fix_strand=True) feature = next(reader) payload = package_gff_feature(feature, no_detail, filter_cols) payload.insert(0, offset) results.append(payload) return {"data": results, "message": message}
[docs]class RawGFFDataProvider(GenomeDataProvider): """ Provide data from GFF file that has not been indexed. NOTE: this data provider does not use indices, and hence will be very slow for large datasets. """ dataset_type = "interval_index"
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs): """ Returns an iterator that provides data in the region chrom:start-end as well as a file offset. """ source = open(self.original_dataset.get_file_name()) # Read first line in order to match chrom naming format. line = source.readline() # If line empty, assume file is empty and return empty iterator. if len(line) == 0: return iter([]) # Determine chromosome naming format. dataset_chrom = line.split()[0] if not _chrom_naming_matches(chrom, dataset_chrom): chrom = _convert_between_ucsc_and_ensemble_naming(chrom) # Undo read. source.seek(0) def features_in_region_iter(): offset = 0 for feature in GFFReaderWrapper(source, fix_strand=True): # Only provide features that are in region. feature_start, feature_end = convert_gff_coords_to_bed([feature.start, feature.end]) if feature.chrom == chrom and feature_end > start and feature_start < end: yield feature, offset offset += feature.raw_size return features_in_region_iter()
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Process data from an iterator to a format that can be provided to client. """ filter_cols = loads(kwargs.get("filter_cols", "[]")) no_detail = "no_detail" in kwargs results = [] message = None for count, (feature, offset) in enumerate(iterator): if count < start_val: continue if count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "reads") break payload = package_gff_feature(feature, no_detail=no_detail, filter_cols=filter_cols) payload.insert(0, offset) results.append(payload) return {"data": results, "dataset_type": self.dataset_type, "message": message}
[docs]class GtfTabixDataProvider(TabixDataProvider): """ Returns data from GTF datasets that are indexed via tabix. """
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): # Loop through lines and group by transcript_id; each group is a feature. # TODO: extend this code or use code in gff_util to process GFF/3 as well # and then create a generic GFFDataProvider that can be used with both # raw and tabix datasets. features: Dict[str, List[GFFInterval]] = {} for line in iterator: line_attrs = parse_gff_attributes(line.split("\t")[8]) transcript_id = line_attrs["transcript_id"] feature_list: List[GFFInterval] if transcript_id in features: feature_list = features[transcript_id] else: feature_list = [] features[transcript_id] = feature_list feature_list.append(GFFInterval(None, line.split("\t"))) # Process data. filter_cols = loads(kwargs.get("filter_cols", "[]")) no_detail = "no_detail" in kwargs results = [] message = None for count, intervals in enumerate(features.values()): if count < start_val: continue if count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "reads") break feature = GFFFeature(None, intervals=intervals) payload = package_gff_feature(feature, no_detail=no_detail, filter_cols=filter_cols) payload.insert(0, feature.intervals[0].attributes["transcript_id"]) results.append(payload) return {"data": results, "message": message}
# # -- ENCODE Peak data providers. #
[docs]class ENCODEPeakDataProvider(GenomeDataProvider): """ Abstract class that processes ENCODEPeak data from native format to payload format. Payload format: [ uid (offset), start, end, name, strand, thick_start, thick_end, blocks ] """
[docs] def get_iterator(self, data_file, chrom, start, end, **kwargs): raise Exception("Unimplemented Method")
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Provides """ # FIXMEs: # (1) should be able to unify some of this code with BedDataProvider.process_data # (2) are optional number of parameters supported? # Build data to return. Payload format is: # [ <guid/offset>, <start>, <end>, <name>, <strand>, <thick_start>, # <thick_end>, <blocks> ] # # First three entries are mandatory, others are optional. # no_detail = "no_detail" in kwargs rval = [] message = None for count, line in enumerate(iterator): if count < start_val: continue if max_vals and count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "features") break feature = line.split() # Feature initialization. payload: PAYLOAD_LIST_TYPE = [ # GUID is just a hash of the line hash(line), # Add start, end. int(feature[1]), int(feature[2]), ] if no_detail: rval.append(payload) continue # Extend with additional data. payload.extend( [ # Add name, strand. feature[3], feature[5], # Thick start, end are feature start, end for now. int(feature[1]), int(feature[2]), # No blocks. None, # Filtering data: Score, signalValue, pValue, qValue. float(feature[4]), float(feature[6]), float(feature[7]), float(feature[8]), ] ) rval.append(payload) return {"data": rval, "message": message}
[docs]class ENCODEPeakTabixDataProvider(TabixDataProvider, ENCODEPeakDataProvider): """ Provides data from an ENCODEPeak dataset indexed via tabix. """
[docs] def get_filters(self): """ Returns filters for dataset. """ # HACK: first 8 fields are for drawing, so start filter column index at 9. filter_col = 8 filters = [] filters.append( {"name": "Score", "type": "number", "index": filter_col, "tool_id": "Filter1", "tool_exp_name": "c6"} ) filter_col += 1 filters.append( {"name": "Signal Value", "type": "number", "index": filter_col, "tool_id": "Filter1", "tool_exp_name": "c7"} ) filter_col += 1 filters.append( {"name": "pValue", "type": "number", "index": filter_col, "tool_id": "Filter1", "tool_exp_name": "c8"} ) filter_col += 1 filters.append( {"name": "qValue", "type": "number", "index": filter_col, "tool_id": "Filter1", "tool_exp_name": "c9"} ) return filters
# # -- ChromatinInteraction data providers -- #
[docs]class ChromatinInteractionsDataProvider(GenomeDataProvider):
[docs] def process_data(self, iterator, start_val=0, max_vals=None, **kwargs): """ Provides """ rval = [] message = None for count, line in enumerate(iterator): if count < start_val: continue if max_vals and count - start_val >= max_vals: message = self.error_max_vals % (max_vals, "interactions") break feature = line.split() s1 = int(feature[1]) e1 = int(feature[2]) c = feature[3] s2 = int(feature[4]) e2 = int(feature[5]) v = float(feature[6]) # Feature initialization. payload = [ # GUID is just a hash of the line hash(line), # Add start1, end1, chr2, start2, end2, value. s1, e1, c, s2, e2, v, ] rval.append(payload) return {"data": rval, "message": message}
[docs] def get_default_max_vals(self): return 100000
[docs]class ChromatinInteractionsTabixDataProvider(TabixDataProvider, ChromatinInteractionsDataProvider):
[docs] def get_iterator( self, data_file, chrom, start=0, end=sys.maxsize, interchromosomal=False, **kwargs ) -> Iterator[str]: """ """ # Modify start as needed to get earlier interactions with start region. span = int(end) - int(start) filter_start = max(0, int(start) - span - span / 2) def filter(iter): for line in iter: feature = line.split() s1 = int(feature[1]) e1 = int(feature[2]) c = feature[3] s2 = int(feature[4]) e2 = int(feature[5]) # Check for intrachromosal interactions. if ((s1 + s2) / 2 <= end) and ((e1 + e2) / 2 >= start) and (c == chrom): yield line # Check for interchromosal interactions. if interchromosomal and c != chrom: yield line return filter(TabixDataProvider.get_iterator(self, data_file, chrom, filter_start, end))
# # -- Helper methods. -- #
[docs]def package_gff_feature(feature, no_detail=False, filter_cols=None) -> PAYLOAD_LIST_TYPE: """Package a GFF feature in an array for data providers.""" filter_cols = filter_cols or [] feature = convert_gff_coords_to_bed(feature) # No detail means only start, end. if no_detail: return [feature.start, feature.end] # Return full feature. payload = [ feature.start, feature.end, feature.name(), feature.strand, # No notion of thick start, end in GFF, so make everything # thick. feature.start, feature.end, ] # HACK: ignore interval with name 'transcript' from feature. # Cufflinks puts this interval in each of its transcripts, # and they mess up trackster by covering the feature's blocks. # This interval will always be a feature's first interval, # and the GFF's third column is its feature name. feature_intervals = feature.intervals if feature.intervals[0].fields[2] == "transcript": feature_intervals = feature.intervals[1:] # Add blocks. block_sizes = [(interval.end - interval.start) for interval in feature_intervals] block_starts = [(interval.start - feature.start) for interval in feature_intervals] blocks = list(zip(block_sizes, block_starts)) payload.append([(feature.start + block[1], feature.start + block[1] + block[0]) for block in blocks]) # Add filter data to payload. for col in filter_cols: if col == "Score": if feature.score == "nan": payload.append(feature.score) else: try: f = float(feature.score) payload.append(f) except Exception: payload.append(feature.score) elif col in feature.attributes: if feature.attributes[col] == "nan": payload.append(feature.attributes[col]) else: try: f = float(feature.attributes[col]) payload.append(f) except Exception: payload.append(feature.attributes[col]) else: # Dummy value. payload.append(0) return payload