Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.datatypes.util.maf_utilities
#!/usr/bin/env python
"""
Provides wrappers and utilities for working with MAF files and alignments.
"""
# Dan Blankenberg
import functools
import logging
import os
import resource
import sys
import tempfile
from copy import deepcopy
from errno import EMFILE
import bx.align.maf
import bx.interval_index_file
import bx.intervals
maketrans = str.maketrans
log = logging.getLogger(__name__)
GAP_CHARS = ["-"]
SRC_SPLIT_CHAR = "."
[docs]def src_split(src):
fields = src.split(SRC_SPLIT_CHAR, 1)
spec = fields.pop(0)
if fields:
chrom = fields.pop(0)
else:
chrom = spec
return spec, chrom
[docs]def src_merge(spec, chrom, contig=None):
if None in [spec, chrom]:
spec = chrom = spec or chrom
return bx.align.src_merge(spec, chrom, contig)
[docs]def get_species_in_block(block):
species = []
for c in block.components:
spec, chrom = src_split(c.src)
if spec not in species:
species.append(spec)
return species
[docs]class TempFileHandler:
"""
Handles creating, opening, closing, and deleting of Temp files, with a
maximum number of files open at one time.
"""
DEFAULT_MAX_OPEN_FILES = max(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2, 1)
[docs] def __init__(self, max_open_files=None, **kwds):
if max_open_files is None:
max_open_files = self.DEFAULT_MAX_OPEN_FILES
self.max_open_files = max_open_files
self.files = []
self.open_file_indexes = []
self.kwds = kwds
[docs] def get_open_tempfile(self, index=None, **kwds):
if index is not None and index in self.open_file_indexes:
self.open_file_indexes.remove(index)
else:
if self.max_open_files:
while len(self.open_file_indexes) >= self.max_open_files:
self.close(self.open_file_indexes[0])
if index is None:
index = len(self.files)
temp_kwds = dict(self.kwds)
temp_kwds["delete"] = False
temp_kwds.update(kwds)
# Being able to use delete=True here, would simplify a bit,
# but we support python2.4 in these tools
while True:
try:
tmp_file = tempfile.NamedTemporaryFile(**temp_kwds)
filename = tmp_file.name
break
except OSError as e:
if self.open_file_indexes and e.errno == EMFILE:
self.max_open_files = len(self.open_file_indexes)
self.close(self.open_file_indexes[0])
else:
raise e
tmp_file.close()
self.files.append(open(filename, "r+"))
else:
while True:
try:
self.files[index] = open(self.files[index].name, "r+")
break
except OSError as e:
if self.open_file_indexes and e.errno == EMFILE:
self.max_open_files = len(self.open_file_indexes)
self.close(self.open_file_indexes[0])
else:
raise e
self.files[index].seek(0, 2)
self.open_file_indexes.append(index)
return index, self.files[index]
[docs] def close(self, index, delete=False):
if index in self.open_file_indexes:
self.open_file_indexes.remove(index)
rval = self.files[index].close()
if delete:
try:
os.unlink(self.files[index].name)
except OSError:
pass
return rval
def __del__(self):
for i in range(len(self.files)):
self.close(i, delete=True)
# an object corresponding to a reference layered alignment
[docs]class RegionAlignment:
DNA_COMPLEMENT = maketrans("ACGTacgt", "TGCAtgca")
MAX_SEQUENCE_SIZE = sys.maxsize # Maximum length of sequence allowed
[docs] def __init__(self, size, species=None, temp_file_handler=None):
assert (
size <= self.MAX_SEQUENCE_SIZE
), "Maximum length allowed for an individual sequence has been exceeded (%i > %i)." % (
size,
self.MAX_SEQUENCE_SIZE,
)
species = species or []
self.size = size
if not temp_file_handler:
temp_file_handler = TempFileHandler()
self.temp_file_handler = temp_file_handler
self.sequences = {}
if not isinstance(species, list):
species = [species]
for spec in species:
self.add_species(spec)
# add a species to the alignment
[docs] def add_species(self, species):
# make temporary sequence files
file_index, fh = self.temp_file_handler.get_open_tempfile()
self.sequences[species] = file_index
fh.write("-" * self.size)
# returns the names for species found in alignment, skipping names as requested
[docs] def get_species_names(self, skip=None):
skip = skip or []
if not isinstance(skip, list):
skip = [skip]
names = list(self.sequences.keys())
for name in skip:
try:
names.remove(name)
except ValueError:
pass
return names
# returns the sequence for a species
[docs] def get_sequence(self, species):
file_index, fh = self.temp_file_handler.get_open_tempfile(self.sequences[species])
fh.seek(0)
return fh.read()
# returns the reverse complement of the sequence for a species
[docs] def get_sequence_reverse_complement(self, species):
complement = [base for base in self.get_sequence(species).translate(self.DNA_COMPLEMENT)]
complement.reverse()
return "".join(complement)
# sets a position for a species
[docs] def set_position(self, index, species, base):
if len(base) != 1:
raise Exception("A genomic position can only have a length of 1.")
return self.set_range(index, species, base)
# sets a range for a species
[docs] def set_range(self, index, species, bases):
if index >= self.size or index < 0:
raise Exception("Your index (%i) is out of range (0 - %i)." % (index, self.size - 1))
if len(bases) == 0:
raise Exception("A set of genomic positions can only have a positive length.")
if species not in self.sequences.keys():
self.add_species(species)
file_index, fh = self.temp_file_handler.get_open_tempfile(self.sequences[species])
fh.seek(index)
fh.write(bases)
# Flush temp file of specified species, or all species
[docs] def flush(self, species=None):
if species is None:
species = self.sequences.keys()
elif not isinstance(species, list):
species = [species]
for spec in species:
self.temp_file_handler.flush(self.sequences[spec])
[docs]class GenomicRegionAlignment(RegionAlignment):
[docs] def __init__(self, start, end, species=None, temp_file_handler=None):
species = species or []
RegionAlignment.__init__(self, end - start, species, temp_file_handler=temp_file_handler)
self.start = start
self.end = end
[docs]class SplicedAlignment:
DNA_COMPLEMENT = maketrans("ACGTacgt", "TGCAtgca")
[docs] def __init__(self, exon_starts, exon_ends, species=None, temp_file_handler=None):
species = species or []
if not isinstance(exon_starts, list):
exon_starts = [exon_starts]
if not isinstance(exon_ends, list):
exon_ends = [exon_ends]
assert len(exon_starts) == len(exon_ends), "The number of starts does not match the number of sizes."
self.exons = []
if not temp_file_handler:
temp_file_handler = TempFileHandler()
self.temp_file_handler = temp_file_handler
for i in range(len(exon_starts)):
self.exons.append(
GenomicRegionAlignment(exon_starts[i], exon_ends[i], species, temp_file_handler=temp_file_handler)
)
# returns the names for species found in alignment, skipping names as requested
[docs] def get_species_names(self, skip=None):
skip = skip or []
if not isinstance(skip, list):
skip = [skip]
names = []
for exon in self.exons:
for name in exon.get_species_names(skip=skip):
if name not in names:
names.append(name)
return names
# returns the sequence for a species
[docs] def get_sequence(self, species):
index, fh = self.temp_file_handler.get_open_tempfile()
for exon in self.exons:
if species in exon.get_species_names():
seq = exon.get_sequence(species)
# we need to refetch fh here, since exon.get_sequence( species ) uses a tempfile
# and if max==1, it would close fh
index, fh = self.temp_file_handler.get_open_tempfile(index)
fh.write(seq)
else:
fh.write("-" * exon.size)
fh.seek(0)
rval = fh.read()
self.temp_file_handler.close(index, delete=True)
return rval
# returns the reverse complement of the sequence for a species
[docs] def get_sequence_reverse_complement(self, species):
complement = [base for base in self.get_sequence(species).translate(self.DNA_COMPLEMENT)]
complement.reverse()
return "".join(complement)
# Start and end of coding region
@property
def start(self):
return self.exons[0].start
@property
def end(self):
return self.exons[-1].end
# Open a MAF index using a UID
[docs]def maf_index_by_uid(maf_uid, index_location_file):
for line in open(index_location_file):
try:
# read each line, if not enough fields, go to next line
if line[0:1] == "#":
continue
fields = line.split("\t")
if maf_uid == fields[1]:
try:
maf_files = fields[4].replace("\n", "").replace("\r", "").split(",")
return bx.align.maf.MultiIndexed(maf_files, keep_open=True, parse_e_rows=False)
except Exception as e:
raise Exception(f"MAF UID ({maf_uid}) found, but configuration appears to be malformed: {e}")
except Exception:
pass
return None
# return ( index, temp_index_filename ) for user maf, if available, or build one and return it, return None when no tempfile is created
[docs]def open_or_build_maf_index(maf_file, index_filename, species=None):
try:
return (bx.align.maf.Indexed(maf_file, index_filename=index_filename, keep_open=True, parse_e_rows=False), None)
except Exception:
return build_maf_index(maf_file, species=species)
[docs]def build_maf_index_species_chromosomes(filename, index_species=None):
species = []
species_chromosomes = {}
indexes = bx.interval_index_file.Indexes()
blocks = 0
try:
maf_reader = bx.align.maf.Reader(open(filename))
while True:
pos = maf_reader.file.tell()
block = next(maf_reader)
if block is None:
break
blocks += 1
for c in block.components:
spec = c.src
chrom = None
if "." in spec:
spec, chrom = spec.split(".", 1)
if spec not in species:
species.append(spec)
species_chromosomes[spec] = []
if chrom and chrom not in species_chromosomes[spec]:
species_chromosomes[spec].append(chrom)
if index_species is None or spec in index_species:
forward_strand_start = c.forward_strand_start
forward_strand_end = c.forward_strand_end
try:
forward_strand_start = int(forward_strand_start)
forward_strand_end = int(forward_strand_end)
except ValueError:
continue # start and end are not integers, can't add component to index, goto next component
# this likely only occurs when parse_e_rows is True?
# could a species exist as only e rows? should the
if forward_strand_end > forward_strand_start:
# require positive length; i.e. certain lines have start = end = 0 and cannot be indexed
indexes.add(c.src, forward_strand_start, forward_strand_end, pos, max=c.src_size)
except Exception as e:
# most likely a bad MAF
log.debug(f"Building MAF index on {filename} failed: {e}")
return (None, [], {}, 0)
return (indexes, species, species_chromosomes, blocks)
# builds and returns ( index, index_filename ) for specified maf_file
[docs]def build_maf_index(maf_file, species=None):
indexes, *_ = build_maf_index_species_chromosomes(maf_file, species)
if indexes is not None:
with tempfile.NamedTemporaryFile(mode="w", delete=False) as index:
indexes.write(index)
return (
bx.align.maf.Indexed(maf_file, index_filename=index.name, keep_open=True, parse_e_rows=False),
index.name,
)
return (None, None)
[docs]def component_overlaps_region(c, region):
if c is None:
return False
start, end = c.get_forward_strand_start(), c.get_forward_strand_end()
if region.start >= end or region.end <= start:
return False
return True
[docs]def chop_block_by_region(block, src, region, species=None, mincols=0):
# This chopping method was designed to maintain consistency with how start/end padding gaps have been working in Galaxy thus far:
# behavior as seen when forcing blocks to be '+' relative to src sequence (ref) and using block.slice_by_component( ref, slice_start, slice_end )
# whether-or-not this is the 'correct' behavior is questionable, but this will at least maintain consistency
# comments welcome
slice_start = block.text_size # max for the min()
slice_end = 0 # min for the max()
old_score = block.score # save old score for later use
# We no longer assume only one occurance of src per block, so we need to check them all
for c in iter_components_by_src(block, src):
if component_overlaps_region(c, region):
if c.text is not None:
rev_strand = False
if c.strand == "-":
# We want our coord_to_col coordinates to be returned from positive stranded component
rev_strand = True
c = c.reverse_complement()
start = max(region.start, c.start)
end = min(region.end, c.end)
start = c.coord_to_col(start)
end = c.coord_to_col(end)
if rev_strand:
# need to orient slice coordinates to the original block direction
slice_len = end - start
end = len(c.text) - start
start = end - slice_len
slice_start = min(start, slice_start)
slice_end = max(end, slice_end)
if slice_start < slice_end:
block = block.slice(slice_start, slice_end)
if block.text_size > mincols:
# restore old score, may not be accurate, but it is better than 0 for everything?
block.score = old_score
if species is not None:
block = block.limit_to_species(species)
block.remove_all_gap_columns()
return block
return None
[docs]def orient_block_by_region(block, src, region, force_strand=None):
# loop through components matching src,
# make sure each of these components overlap region
# cache strand for each of overlaping regions
# if force_strand / region.strand not in strand cache, reverse complement
# we could have 2 sequences with same src, overlapping region, on different strands, this would cause no reverse_complementing
strands = [c.strand for c in iter_components_by_src(block, src) if component_overlaps_region(c, region)]
if (
strands
and (force_strand is None and region.strand not in strands)
or (force_strand is not None and force_strand not in strands)
):
block = block.reverse_complement()
return block
[docs]def get_oriented_chopped_blocks_for_region(index, src, region, species=None, mincols=0, force_strand=None):
for block, _, _ in get_oriented_chopped_blocks_with_index_offset_for_region(
index, src, region, species, mincols, force_strand
):
yield block
[docs]def get_oriented_chopped_blocks_with_index_offset_for_region(
index, src, region, species=None, mincols=0, force_strand=None
):
for block, idx, offset in get_chopped_blocks_with_index_offset_for_region(index, src, region, species, mincols):
yield orient_block_by_region(block, src, region, force_strand), idx, offset
# split a block with multiple occurances of src into one block per src
[docs]def iter_blocks_split_by_src(block, src):
for src_c in iter_components_by_src(block, src):
new_block = bx.align.Alignment(score=block.score, attributes=deepcopy(block.attributes))
new_block.text_size = block.text_size
for c in block.components:
if c == src_c or c.src != src:
new_block.add_component(
deepcopy(c)
) # components have reference to alignment, don't want to lose reference to original alignment block in original components
yield new_block
# split a block into multiple blocks with all combinations of a species appearing only once per block
[docs]def iter_blocks_split_by_species(block, species=None):
def __split_components_by_species(components_by_species, new_block):
if components_by_species:
# more species with components to add to this block
components_by_species = deepcopy(components_by_species)
spec_comps = components_by_species.pop(0)
for c in spec_comps:
newer_block = deepcopy(new_block)
newer_block.add_component(deepcopy(c))
yield from __split_components_by_species(components_by_species, newer_block)
else:
# no more components to add, yield this block
yield new_block
# divide components by species
spec_dict = {}
if not species:
species = []
for c in block.components:
spec, chrom = src_split(c.src)
if spec not in spec_dict:
spec_dict[spec] = []
species.append(spec)
spec_dict[spec].append(c)
else:
for spec in species:
spec_dict[spec] = []
for c in iter_components_by_src_start(block, spec):
spec_dict[spec].append(c)
empty_block = bx.align.Alignment(
score=block.score, attributes=deepcopy(block.attributes)
) # should we copy attributes?
empty_block.text_size = block.text_size
# call recursive function to split into each combo of spec/blocks
for value in __split_components_by_species(list(spec_dict.values()), empty_block):
sort_block_components_by_block(value, block) # restore original component order
yield value
# generator yielding only chopped and valid blocks for a specified region
[docs]def get_chopped_blocks_for_region(index, src, region, species=None, mincols=0):
for block, _, _ in get_chopped_blocks_with_index_offset_for_region(index, src, region, species, mincols):
yield block
[docs]def get_chopped_blocks_with_index_offset_for_region(index, src, region, species=None, mincols=0):
for block, idx, offset in index.get_as_iterator_with_index_and_offset(src, region.start, region.end):
block = chop_block_by_region(block, src, region, species, mincols)
if block is not None:
yield block, idx, offset
# returns a filled region alignment for specified regions
[docs]def get_region_alignment(
index,
primary_species,
chrom,
start,
end,
strand="+",
species=None,
mincols=0,
overwrite_with_gaps=True,
temp_file_handler=None,
):
if species is not None:
alignment = RegionAlignment(end - start, species, temp_file_handler=temp_file_handler)
else:
alignment = RegionAlignment(end - start, primary_species, temp_file_handler=temp_file_handler)
return fill_region_alignment(
alignment, index, primary_species, chrom, start, end, strand, species, mincols, overwrite_with_gaps
)
# reduces a block to only positions exisiting in the src provided
[docs]def reduce_block_by_primary_genome(block, species, chromosome, region_start):
# returns ( startIndex, {species:texts}
# where texts' contents are reduced to only positions existing in the primary genome
src = f"{species}.{chromosome}"
ref = block.get_component_by_src(src)
start_offset = ref.start - region_start
species_texts = {}
for c in block.components:
species_texts[c.src.split(".")[0]] = list(c.text)
# remove locations which are gaps in the primary species, starting from the downstream end
for i in range(len(species_texts[species]) - 1, -1, -1):
if species_texts[species][i] == "-":
for text in species_texts.values():
text.pop(i)
for spec, text in species_texts.items():
species_texts[spec] = "".join(text)
return (start_offset, species_texts)
# fills a region alignment
[docs]def fill_region_alignment(
alignment, index, primary_species, chrom, start, end, strand="+", species=None, mincols=0, overwrite_with_gaps=True
):
region = bx.intervals.Interval(start, end)
region.chrom = chrom
region.strand = strand
primary_src = f"{primary_species}.{chrom}"
# Order blocks overlaping this position by score, lowest first
blocks = []
for block, idx, offset in index.get_as_iterator_with_index_and_offset(primary_src, start, end):
score = float(block.score)
for i in range(0, len(blocks)):
if score < blocks[i][0]:
blocks.insert(i, (score, idx, offset))
break
else:
blocks.append((score, idx, offset))
# gap_chars_tuple = tuple( GAP_CHARS )
gap_chars_str = "".join(GAP_CHARS)
# Loop through ordered blocks and layer by increasing score
for block_dict in blocks:
for block in iter_blocks_split_by_species(
block_dict[1].get_at_offset(block_dict[2])
): # need to handle each occurance of sequence in block seperately
if component_overlaps_region(block.get_component_by_src(primary_src), region):
block = chop_block_by_region(block, primary_src, region, species, mincols) # chop block
block = orient_block_by_region(block, primary_src, region) # orient block
start_offset, species_texts = reduce_block_by_primary_genome(block, primary_species, chrom, start)
for spec, text in species_texts.items():
# we should trim gaps from both sides, since these are not positions in this species genome (sequence)
text = text.rstrip(gap_chars_str)
gap_offset = 0
# while text.startswith( gap_chars_tuple ):
while True in [
text.startswith(gap_char) for gap_char in GAP_CHARS
]: # python2.4 doesn't accept a tuple for .startswith()
gap_offset += 1
text = text[1:]
if not text:
break
if text:
if overwrite_with_gaps:
alignment.set_range(start_offset + gap_offset, spec, text)
else:
for i, char in enumerate(text):
if char not in GAP_CHARS:
alignment.set_position(start_offset + gap_offset + i, spec, char)
return alignment
# returns a filled spliced region alignment for specified region with start and end lists
[docs]def get_spliced_region_alignment(
index,
primary_species,
chrom,
starts,
ends,
strand="+",
species=None,
mincols=0,
overwrite_with_gaps=True,
temp_file_handler=None,
):
# create spliced alignment object
if species is not None:
alignment = SplicedAlignment(starts, ends, species, temp_file_handler=temp_file_handler)
else:
alignment = SplicedAlignment(starts, ends, [primary_species], temp_file_handler=temp_file_handler)
for exon in alignment.exons:
fill_region_alignment(
exon, index, primary_species, chrom, exon.start, exon.end, strand, species, mincols, overwrite_with_gaps
)
return alignment
# loop through string array, only return non-commented lines
[docs]def line_enumerator(lines, comment_start="#"):
i = 0
for line in lines:
if not line.startswith(comment_start):
i += 1
yield (i, line)
# read a GeneBed file, return list of starts, ends, raw fields
[docs]def get_starts_ends_fields_from_gene_bed(line):
# Starts and ends for exons
starts = []
ends = []
fields = line.split()
# Requires atleast 12 BED columns
if len(fields) < 12:
raise Exception(f"Not a proper 12 column BED line ({line}).")
tx_start = int(fields[1])
cds_start = int(fields[6])
cds_end = int(fields[7])
# Calculate and store starts and ends of coding exons
region_start, region_end = cds_start, cds_end
exon_starts = list(map(int, fields[11].rstrip(",\n").split(",")))
exon_starts = [x + tx_start for x in exon_starts]
exon_ends = list(map(int, fields[10].rstrip(",").split(",")))
exon_ends = [x + y for x, y in zip(exon_starts, exon_ends)]
for start, end in zip(exon_starts, exon_ends):
start = max(start, region_start)
end = min(end, region_end)
if start < end:
starts.append(start)
ends.append(end)
return (starts, ends, fields)
[docs]def get_components_by_src(block, src):
return [value for value in iter_components_by_src(block, src)]
[docs]def iter_components_by_src_start(block, src):
for c in block.components:
if c.src.startswith(src):
yield c
[docs]def get_components_by_src_start(block, src):
return [value for value in iter_components_by_src_start(block, src)]
[docs]def sort_block_components_by_block(block1, block2):
# orders the components in block1 by the index of the component in block2
# block1 must be a subset of block2
# occurs in-place
return block1.components.sort(
key=functools.cmp_to_key(lambda x, y: block2.components.index(x) - block2.components.index(y))
)
[docs]def get_species_in_maf(maf_filename):
species = []
for block in bx.align.maf.Reader(open(maf_filename)):
for spec in get_species_in_block(block):
if spec not in species:
species.append(spec)
return species
[docs]def parse_species_option(species):
if species:
species = species.split(",")
if "None" not in species:
return species
return None # provided species was '', None, or had 'None' in it
[docs]def remove_temp_index_file(index_filename):
try:
os.unlink(index_filename)
except Exception:
pass
# Below are methods to deal with FASTA files
[docs]def get_fasta_header(component, attributes=None, suffix=None):
attributes = attributes or {}
header = ">%s(%s):%i-%i|" % (
component.src,
component.strand,
component.get_forward_strand_start(),
component.get_forward_strand_end(),
)
for key, value in attributes.items():
header = f"{header}{key}={value}|"
if suffix:
header = f"{header}{suffix}"
else:
header = f"{header}{src_split(component.src)[0]}"
return header
[docs]def get_attributes_from_fasta_header(header):
if not header:
return {}
attributes = {}
header = header.lstrip(">")
header = header.strip()
fields = header.split("|")
try:
region = fields[0]
region = region.split("(", 1)
temp = region[0].split(".", 1)
attributes["species"] = temp[0]
if len(temp) == 2:
attributes["chrom"] = temp[1]
else:
attributes["chrom"] = temp[0]
region = region[1].split(")", 1)
attributes["strand"] = region[0]
region = region[1].lstrip(":").split("-")
attributes["start"] = int(region[0])
attributes["end"] = int(region[1])
except Exception:
# fields 0 is not a region coordinate
pass
if len(fields) > 2:
for i in range(1, len(fields) - 1):
prop = fields[i].split("=", 1)
if len(prop) == 2:
attributes[prop[0]] = prop[1]
if len(fields) > 1:
attributes["__suffix__"] = fields[-1]
return attributes
[docs]def iter_fasta_alignment(filename):
class fastaComponent:
def __init__(self, species, text=""):
self.species = species
self.text = text
def extend(self, text):
self.text = self.text + text.replace("\n", "").replace("\r", "").strip()
# yields a list of fastaComponents for a FASTA file
f = open(filename, "rb")
components = []
# cur_component = None
while True:
line = f.readline()
if not line:
if components:
yield components
return
line = line.strip()
if not line:
if components:
yield components
components = []
elif line.startswith(">"):
attributes = get_attributes_from_fasta_header(line)
components.append(fastaComponent(attributes["species"]))
elif components:
components[-1].extend(line)