Source code for galaxy.datatypes.util.maf_utilities

#!/usr/bin/env python
"""
Provides wrappers and utilities for working with MAF files and alignments.
"""
# Dan Blankenberg

import functools
import logging
import os
import resource
import sys
import tempfile
from copy import deepcopy
from errno import EMFILE

import bx.align.maf
import bx.interval_index_file
import bx.intervals

maketrans = str.maketrans

log = logging.getLogger(__name__)

GAP_CHARS = ["-"]
SRC_SPLIT_CHAR = "."


[docs]def src_split(src): fields = src.split(SRC_SPLIT_CHAR, 1) spec = fields.pop(0) if fields: chrom = fields.pop(0) else: chrom = spec return spec, chrom
[docs]def src_merge(spec, chrom, contig=None): if None in [spec, chrom]: spec = chrom = spec or chrom return bx.align.src_merge(spec, chrom, contig)
[docs]def get_species_in_block(block): species = [] for c in block.components: spec, chrom = src_split(c.src) if spec not in species: species.append(spec) return species
[docs]def tool_fail(msg="Unknown Error"): sys.exit(f"Fatal Error: {msg}")
[docs]class TempFileHandler: """ Handles creating, opening, closing, and deleting of Temp files, with a maximum number of files open at one time. """ DEFAULT_MAX_OPEN_FILES = max(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2, 1)
[docs] def __init__(self, max_open_files=None, **kwds): if max_open_files is None: max_open_files = self.DEFAULT_MAX_OPEN_FILES self.max_open_files = max_open_files self.files = [] self.open_file_indexes = [] self.kwds = kwds
[docs] def get_open_tempfile(self, index=None, **kwds): if index is not None and index in self.open_file_indexes: self.open_file_indexes.remove(index) else: if self.max_open_files: while len(self.open_file_indexes) >= self.max_open_files: self.close(self.open_file_indexes[0]) if index is None: index = len(self.files) temp_kwds = dict(self.kwds) temp_kwds["delete"] = False temp_kwds.update(kwds) # Being able to use delete=True here, would simplify a bit, # but we support python2.4 in these tools while True: try: tmp_file = tempfile.NamedTemporaryFile(**temp_kwds) filename = tmp_file.name break except OSError as e: if self.open_file_indexes and e.errno == EMFILE: self.max_open_files = len(self.open_file_indexes) self.close(self.open_file_indexes[0]) else: raise e tmp_file.close() self.files.append(open(filename, "r+")) else: while True: try: self.files[index] = open(self.files[index].name, "r+") break except OSError as e: if self.open_file_indexes and e.errno == EMFILE: self.max_open_files = len(self.open_file_indexes) self.close(self.open_file_indexes[0]) else: raise e self.files[index].seek(0, 2) self.open_file_indexes.append(index) return index, self.files[index]
[docs] def close(self, index, delete=False): if index in self.open_file_indexes: self.open_file_indexes.remove(index) rval = self.files[index].close() if delete: try: os.unlink(self.files[index].name) except OSError: pass return rval
[docs] def flush(self, index): if index in self.open_file_indexes: self.files[index].flush()
def __del__(self): for i in range(len(self.files)): self.close(i, delete=True)
# an object corresponding to a reference layered alignment
[docs]class RegionAlignment: DNA_COMPLEMENT = maketrans("ACGTacgt", "TGCAtgca") MAX_SEQUENCE_SIZE = sys.maxsize # Maximum length of sequence allowed
[docs] def __init__(self, size, species=None, temp_file_handler=None): assert ( size <= self.MAX_SEQUENCE_SIZE ), "Maximum length allowed for an individual sequence has been exceeded (%i > %i)." % ( size, self.MAX_SEQUENCE_SIZE, ) species = species or [] self.size = size if not temp_file_handler: temp_file_handler = TempFileHandler() self.temp_file_handler = temp_file_handler self.sequences = {} if not isinstance(species, list): species = [species] for spec in species: self.add_species(spec)
# add a species to the alignment
[docs] def add_species(self, species): # make temporary sequence files file_index, fh = self.temp_file_handler.get_open_tempfile() self.sequences[species] = file_index fh.write("-" * self.size)
# returns the names for species found in alignment, skipping names as requested
[docs] def get_species_names(self, skip=None): skip = skip or [] if not isinstance(skip, list): skip = [skip] names = list(self.sequences.keys()) for name in skip: try: names.remove(name) except ValueError: pass return names
# returns the sequence for a species
[docs] def get_sequence(self, species): file_index, fh = self.temp_file_handler.get_open_tempfile(self.sequences[species]) fh.seek(0) return fh.read()
# returns the reverse complement of the sequence for a species
[docs] def get_sequence_reverse_complement(self, species): complement = list(self.get_sequence(species).translate(self.DNA_COMPLEMENT)) complement.reverse() return "".join(complement)
# sets a position for a species
[docs] def set_position(self, index, species, base): if len(base) != 1: raise Exception("A genomic position can only have a length of 1.") return self.set_range(index, species, base)
# sets a range for a species
[docs] def set_range(self, index, species, bases): if index >= self.size or index < 0: raise Exception("Your index (%i) is out of range (0 - %i)." % (index, self.size - 1)) if len(bases) == 0: raise Exception("A set of genomic positions can only have a positive length.") if species not in self.sequences.keys(): self.add_species(species) file_index, fh = self.temp_file_handler.get_open_tempfile(self.sequences[species]) fh.seek(index) fh.write(bases)
# Flush temp file of specified species, or all species
[docs] def flush(self, species=None): if species is None: species = self.sequences.keys() elif not isinstance(species, list): species = [species] for spec in species: self.temp_file_handler.flush(self.sequences[spec])
[docs]class GenomicRegionAlignment(RegionAlignment):
[docs] def __init__(self, start, end, species=None, temp_file_handler=None): species = species or [] RegionAlignment.__init__(self, end - start, species, temp_file_handler=temp_file_handler) self.start = start self.end = end
[docs]class SplicedAlignment: DNA_COMPLEMENT = maketrans("ACGTacgt", "TGCAtgca")
[docs] def __init__(self, exon_starts, exon_ends, species=None, temp_file_handler=None): species = species or [] if not isinstance(exon_starts, list): exon_starts = [exon_starts] if not isinstance(exon_ends, list): exon_ends = [exon_ends] assert len(exon_starts) == len(exon_ends), "The number of starts does not match the number of sizes." self.exons = [] if not temp_file_handler: temp_file_handler = TempFileHandler() self.temp_file_handler = temp_file_handler for i in range(len(exon_starts)): self.exons.append( GenomicRegionAlignment(exon_starts[i], exon_ends[i], species, temp_file_handler=temp_file_handler) )
# returns the names for species found in alignment, skipping names as requested
[docs] def get_species_names(self, skip=None): skip = skip or [] if not isinstance(skip, list): skip = [skip] names = [] for exon in self.exons: for name in exon.get_species_names(skip=skip): if name not in names: names.append(name) return names
# returns the sequence for a species
[docs] def get_sequence(self, species): index, fh = self.temp_file_handler.get_open_tempfile() for exon in self.exons: if species in exon.get_species_names(): seq = exon.get_sequence(species) # we need to refetch fh here, since exon.get_sequence( species ) uses a tempfile # and if max==1, it would close fh index, fh = self.temp_file_handler.get_open_tempfile(index) fh.write(seq) else: fh.write("-" * exon.size) fh.seek(0) rval = fh.read() self.temp_file_handler.close(index, delete=True) return rval
# returns the reverse complement of the sequence for a species
[docs] def get_sequence_reverse_complement(self, species): complement = list(self.get_sequence(species).translate(self.DNA_COMPLEMENT)) complement.reverse() return "".join(complement)
# Start and end of coding region @property def start(self): return self.exons[0].start @property def end(self): return self.exons[-1].end
# Open a MAF index using a UID
[docs]def maf_index_by_uid(maf_uid, index_location_file): for line in open(index_location_file): try: # read each line, if not enough fields, go to next line if line[0:1] == "#": continue fields = line.split("\t") if maf_uid == fields[1]: try: maf_files = fields[4].replace("\n", "").replace("\r", "").split(",") return bx.align.maf.MultiIndexed(maf_files, keep_open=True, parse_e_rows=False) except Exception as e: raise Exception(f"MAF UID ({maf_uid}) found, but configuration appears to be malformed: {e}") except Exception: pass return None
# return ( index, temp_index_filename ) for user maf, if available, or build one and return it, return None when no tempfile is created
[docs]def open_or_build_maf_index(maf_file, index_filename, species=None): try: return (bx.align.maf.Indexed(maf_file, index_filename=index_filename, keep_open=True, parse_e_rows=False), None) except Exception: return build_maf_index(maf_file, species=species)
[docs]def build_maf_index_species_chromosomes(filename, index_species=None): species = [] species_chromosomes = {} indexes = bx.interval_index_file.Indexes() blocks = 0 try: maf_reader = bx.align.maf.Reader(open(filename)) while True: pos = maf_reader.file.tell() block = next(maf_reader) if block is None: break blocks += 1 for c in block.components: spec = c.src chrom = None if "." in spec: spec, chrom = spec.split(".", 1) if spec not in species: species.append(spec) species_chromosomes[spec] = [] if chrom and chrom not in species_chromosomes[spec]: species_chromosomes[spec].append(chrom) if index_species is None or spec in index_species: forward_strand_start = c.forward_strand_start forward_strand_end = c.forward_strand_end try: forward_strand_start = int(forward_strand_start) forward_strand_end = int(forward_strand_end) except ValueError: continue # start and end are not integers, can't add component to index, goto next component # this likely only occurs when parse_e_rows is True? # could a species exist as only e rows? should the if forward_strand_end > forward_strand_start: # require positive length; i.e. certain lines have start = end = 0 and cannot be indexed indexes.add(c.src, forward_strand_start, forward_strand_end, pos, max=c.src_size) except Exception as e: # most likely a bad MAF log.debug(f"Building MAF index on {filename} failed: {e}") return (None, [], {}, 0) return (indexes, species, species_chromosomes, blocks)
# builds and returns ( index, index_filename ) for specified maf_file
[docs]def build_maf_index(maf_file, species=None): indexes, *_ = build_maf_index_species_chromosomes(maf_file, species) if indexes is not None: with tempfile.NamedTemporaryFile(mode="w", delete=False) as index: indexes.write(index) return ( bx.align.maf.Indexed(maf_file, index_filename=index.name, keep_open=True, parse_e_rows=False), index.name, ) return (None, None)
[docs]def component_overlaps_region(c, region): if c is None: return False start, end = c.get_forward_strand_start(), c.get_forward_strand_end() if region.start >= end or region.end <= start: return False return True
[docs]def chop_block_by_region(block, src, region, species=None, mincols=0): # This chopping method was designed to maintain consistency with how start/end padding gaps have been working in Galaxy thus far: # behavior as seen when forcing blocks to be '+' relative to src sequence (ref) and using block.slice_by_component( ref, slice_start, slice_end ) # whether-or-not this is the 'correct' behavior is questionable, but this will at least maintain consistency # comments welcome slice_start = block.text_size # max for the min() slice_end = 0 # min for the max() old_score = block.score # save old score for later use # We no longer assume only one occurance of src per block, so we need to check them all for c in iter_components_by_src(block, src): if component_overlaps_region(c, region): if c.text is not None: rev_strand = False if c.strand == "-": # We want our coord_to_col coordinates to be returned from positive stranded component rev_strand = True c = c.reverse_complement() start = max(region.start, c.start) end = min(region.end, c.end) start = c.coord_to_col(start) end = c.coord_to_col(end) if rev_strand: # need to orient slice coordinates to the original block direction slice_len = end - start end = len(c.text) - start start = end - slice_len slice_start = min(start, slice_start) slice_end = max(end, slice_end) if slice_start < slice_end: block = block.slice(slice_start, slice_end) if block.text_size > mincols: # restore old score, may not be accurate, but it is better than 0 for everything? block.score = old_score if species is not None: block = block.limit_to_species(species) block.remove_all_gap_columns() return block return None
[docs]def orient_block_by_region(block, src, region, force_strand=None): # loop through components matching src, # make sure each of these components overlap region # cache strand for each of overlaping regions # if force_strand / region.strand not in strand cache, reverse complement # we could have 2 sequences with same src, overlapping region, on different strands, this would cause no reverse_complementing strands = [c.strand for c in iter_components_by_src(block, src) if component_overlaps_region(c, region)] if ( strands and (force_strand is None and region.strand not in strands) or (force_strand is not None and force_strand not in strands) ): block = block.reverse_complement() return block
[docs]def get_oriented_chopped_blocks_for_region(index, src, region, species=None, mincols=0, force_strand=None): for block, _, _ in get_oriented_chopped_blocks_with_index_offset_for_region( index, src, region, species, mincols, force_strand ): yield block
[docs]def get_oriented_chopped_blocks_with_index_offset_for_region( index, src, region, species=None, mincols=0, force_strand=None ): for block, idx, offset in get_chopped_blocks_with_index_offset_for_region(index, src, region, species, mincols): yield orient_block_by_region(block, src, region, force_strand), idx, offset
# split a block with multiple occurances of src into one block per src
[docs]def iter_blocks_split_by_src(block, src): for src_c in iter_components_by_src(block, src): new_block = bx.align.Alignment(score=block.score, attributes=deepcopy(block.attributes)) new_block.text_size = block.text_size for c in block.components: if c == src_c or c.src != src: new_block.add_component( deepcopy(c) ) # components have reference to alignment, don't want to lose reference to original alignment block in original components yield new_block
# split a block into multiple blocks with all combinations of a species appearing only once per block
[docs]def iter_blocks_split_by_species(block, species=None): def __split_components_by_species(components_by_species, new_block): if components_by_species: # more species with components to add to this block components_by_species = deepcopy(components_by_species) spec_comps = components_by_species.pop(0) for c in spec_comps: newer_block = deepcopy(new_block) newer_block.add_component(deepcopy(c)) yield from __split_components_by_species(components_by_species, newer_block) else: # no more components to add, yield this block yield new_block # divide components by species spec_dict = {} if not species: species = [] for c in block.components: spec, chrom = src_split(c.src) if spec not in spec_dict: spec_dict[spec] = [] species.append(spec) spec_dict[spec].append(c) else: for spec in species: spec_dict[spec] = [] for c in iter_components_by_src_start(block, spec): spec_dict[spec].append(c) empty_block = bx.align.Alignment( score=block.score, attributes=deepcopy(block.attributes) ) # should we copy attributes? empty_block.text_size = block.text_size # call recursive function to split into each combo of spec/blocks for value in __split_components_by_species(list(spec_dict.values()), empty_block): sort_block_components_by_block(value, block) # restore original component order yield value
# generator yielding only chopped and valid blocks for a specified region
[docs]def get_chopped_blocks_for_region(index, src, region, species=None, mincols=0): for block, _, _ in get_chopped_blocks_with_index_offset_for_region(index, src, region, species, mincols): yield block
[docs]def get_chopped_blocks_with_index_offset_for_region(index, src, region, species=None, mincols=0): for block, idx, offset in index.get_as_iterator_with_index_and_offset(src, region.start, region.end): block = chop_block_by_region(block, src, region, species, mincols) if block is not None: yield block, idx, offset
# returns a filled region alignment for specified regions
[docs]def get_region_alignment( index, primary_species, chrom, start, end, strand="+", species=None, mincols=0, overwrite_with_gaps=True, temp_file_handler=None, ): if species is not None: alignment = RegionAlignment(end - start, species, temp_file_handler=temp_file_handler) else: alignment = RegionAlignment(end - start, primary_species, temp_file_handler=temp_file_handler) return fill_region_alignment( alignment, index, primary_species, chrom, start, end, strand, species, mincols, overwrite_with_gaps )
# reduces a block to only positions exisiting in the src provided
[docs]def reduce_block_by_primary_genome(block, species, chromosome, region_start): # returns ( startIndex, {species:texts} # where texts' contents are reduced to only positions existing in the primary genome src = f"{species}.{chromosome}" ref = block.get_component_by_src(src) start_offset = ref.start - region_start species_texts = {} for c in block.components: species_texts[c.src.split(".")[0]] = list(c.text) # remove locations which are gaps in the primary species, starting from the downstream end for i in range(len(species_texts[species]) - 1, -1, -1): if species_texts[species][i] == "-": for text in species_texts.values(): text.pop(i) for spec, text in species_texts.items(): species_texts[spec] = "".join(text) return (start_offset, species_texts)
# fills a region alignment
[docs]def fill_region_alignment( alignment, index, primary_species, chrom, start, end, strand="+", species=None, mincols=0, overwrite_with_gaps=True ): region = bx.intervals.Interval(start, end) region.chrom = chrom region.strand = strand primary_src = f"{primary_species}.{chrom}" # Order blocks overlaping this position by score, lowest first blocks = [] for block, idx, offset in index.get_as_iterator_with_index_and_offset(primary_src, start, end): score = float(block.score) for i in range(0, len(blocks)): if score < blocks[i][0]: blocks.insert(i, (score, idx, offset)) break else: blocks.append((score, idx, offset)) # gap_chars_tuple = tuple( GAP_CHARS ) gap_chars_str = "".join(GAP_CHARS) # Loop through ordered blocks and layer by increasing score for block_dict in blocks: for block in iter_blocks_split_by_species( block_dict[1].get_at_offset(block_dict[2]) ): # need to handle each occurance of sequence in block seperately if component_overlaps_region(block.get_component_by_src(primary_src), region): block = chop_block_by_region(block, primary_src, region, species, mincols) # chop block block = orient_block_by_region(block, primary_src, region) # orient block start_offset, species_texts = reduce_block_by_primary_genome(block, primary_species, chrom, start) for spec, text in species_texts.items(): # we should trim gaps from both sides, since these are not positions in this species genome (sequence) text = text.rstrip(gap_chars_str) gap_offset = 0 # while text.startswith( gap_chars_tuple ): while True in [ text.startswith(gap_char) for gap_char in GAP_CHARS ]: # python2.4 doesn't accept a tuple for .startswith() gap_offset += 1 text = text[1:] if not text: break if text: if overwrite_with_gaps: alignment.set_range(start_offset + gap_offset, spec, text) else: for i, char in enumerate(text): if char not in GAP_CHARS: alignment.set_position(start_offset + gap_offset + i, spec, char) return alignment
# returns a filled spliced region alignment for specified region with start and end lists
[docs]def get_spliced_region_alignment( index, primary_species, chrom, starts, ends, strand="+", species=None, mincols=0, overwrite_with_gaps=True, temp_file_handler=None, ): # create spliced alignment object if species is not None: alignment = SplicedAlignment(starts, ends, species, temp_file_handler=temp_file_handler) else: alignment = SplicedAlignment(starts, ends, [primary_species], temp_file_handler=temp_file_handler) for exon in alignment.exons: fill_region_alignment( exon, index, primary_species, chrom, exon.start, exon.end, strand, species, mincols, overwrite_with_gaps ) return alignment
# loop through string array, only return non-commented lines
[docs]def line_enumerator(lines, comment_start="#"): i = 0 for line in lines: if not line.startswith(comment_start): i += 1 yield (i, line)
# read a GeneBed file, return list of starts, ends, raw fields
[docs]def get_starts_ends_fields_from_gene_bed(line): # Starts and ends for exons starts = [] ends = [] fields = line.split() # Requires atleast 12 BED columns if len(fields) < 12: raise Exception(f"Not a proper 12 column BED line ({line}).") tx_start = int(fields[1]) cds_start = int(fields[6]) cds_end = int(fields[7]) # Calculate and store starts and ends of coding exons region_start, region_end = cds_start, cds_end exon_starts = list(map(int, fields[11].rstrip(",\n").split(","))) exon_starts = [x + tx_start for x in exon_starts] exon_ends = list(map(int, fields[10].rstrip(",").split(","))) exon_ends = [x + y for x, y in zip(exon_starts, exon_ends)] for start, end in zip(exon_starts, exon_ends): start = max(start, region_start) end = min(end, region_end) if start < end: starts.append(start) ends.append(end) return (starts, ends, fields)
[docs]def iter_components_by_src(block, src): for c in block.components: if c.src == src: yield c
[docs]def get_components_by_src(block, src): return list(iter_components_by_src(block, src))
[docs]def iter_components_by_src_start(block, src): for c in block.components: if c.src.startswith(src): yield c
[docs]def get_components_by_src_start(block, src): return list(iter_components_by_src_start(block, src))
[docs]def sort_block_components_by_block(block1, block2): # orders the components in block1 by the index of the component in block2 # block1 must be a subset of block2 # occurs in-place return block1.components.sort( key=functools.cmp_to_key(lambda x, y: block2.components.index(x) - block2.components.index(y)) )
[docs]def get_species_in_maf(maf_filename): species = [] for block in bx.align.maf.Reader(open(maf_filename)): for spec in get_species_in_block(block): if spec not in species: species.append(spec) return species
[docs]def parse_species_option(species): if species: species = species.split(",") if "None" not in species: return species return None # provided species was '', None, or had 'None' in it
[docs]def remove_temp_index_file(index_filename): try: os.unlink(index_filename) except Exception: pass
# Below are methods to deal with FASTA files
[docs]def get_fasta_header(component, attributes=None, suffix=None): attributes = attributes or {} header = ">%s(%s):%i-%i|" % ( component.src, component.strand, component.get_forward_strand_start(), component.get_forward_strand_end(), ) for key, value in attributes.items(): header = f"{header}{key}={value}|" if suffix: header = f"{header}{suffix}" else: header = f"{header}{src_split(component.src)[0]}" return header
[docs]def get_attributes_from_fasta_header(header): if not header: return {} attributes = {} header = header.lstrip(">") header = header.strip() fields = header.split("|") try: region = fields[0] region = region.split("(", 1) temp = region[0].split(".", 1) attributes["species"] = temp[0] if len(temp) == 2: attributes["chrom"] = temp[1] else: attributes["chrom"] = temp[0] region = region[1].split(")", 1) attributes["strand"] = region[0] region = region[1].lstrip(":").split("-") attributes["start"] = int(region[0]) attributes["end"] = int(region[1]) except Exception: # fields 0 is not a region coordinate pass if len(fields) > 2: for i in range(1, len(fields) - 1): prop = fields[i].split("=", 1) if len(prop) == 2: attributes[prop[0]] = prop[1] if len(fields) > 1: attributes["__suffix__"] = fields[-1] return attributes
[docs]def iter_fasta_alignment(filename): class fastaComponent: def __init__(self, species, text=""): self.species = species self.text = text def extend(self, text): self.text = self.text + text.replace("\n", "").replace("\r", "").strip() # yields a list of fastaComponents for a FASTA file f = open(filename, "rb") components = [] # cur_component = None while True: line = f.readline() if not line: if components: yield components return line = line.strip() if not line: if components: yield components components = [] elif line.startswith(">"): attributes = get_attributes_from_fasta_header(line) components.append(fastaComponent(attributes["species"])) elif components: components[-1].extend(line)