Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.datatypes.converters.interval_to_coverage
#!/usr/bin/env python
"""
Converter to generate 3 (or 4) column base-pair coverage from an interval file.
usage: %prog bed_file out_file
-1, --cols1=N,N,N,N: Columns for chrom, start, end, strand in interval file
-2, --cols2=N,N,N,N: Columns for chrom, start, end, strand in coverage file
"""
import subprocess
import tempfile
from bisect import bisect
from os import environ
from bx.cookbook import doc_optparse
from bx.intervals import io
INTERVAL_METADATA = ('chromCol',
'startCol',
'endCol',
'strandCol',)
COVERAGE_METADATA = ('chromCol',
'positionCol',
'forwardCol',
'reverseCol',)
[docs]def main(interval, coverage):
"""
Uses a sliding window of partitions to count coverages.
Every interval record adds its start and end to the partitions. The result
is a list of partitions, or every position that has a (maybe) different
number of basepairs covered. We don't worry about merging because we pop
as the sorted intervals are read in. As the input start positions exceed
the partition positions in partitions, coverages are kicked out in bulk.
"""
partitions = []
forward_covs = []
reverse_covs = []
chrom = None
lastchrom = None
for record in interval:
chrom = record.chrom
if lastchrom and not lastchrom == chrom and partitions:
for partition in range(0, len(partitions) - 1):
forward = forward_covs[partition]
reverse = reverse_covs[partition]
if forward + reverse > 0:
coverage.write(chrom=chrom, position=range(partitions[partition], partitions[partition + 1]),
forward=forward, reverse=reverse)
partitions = []
forward_covs = []
reverse_covs = []
start_index = bisect(partitions, record.start)
forward = int(record.strand == "+")
reverse = int(record.strand == "-")
forward_base = 0
reverse_base = 0
if start_index > 0:
forward_base = forward_covs[start_index - 1]
reverse_base = reverse_covs[start_index - 1]
partitions.insert(start_index, record.start)
forward_covs.insert(start_index, forward_base)
reverse_covs.insert(start_index, reverse_base)
end_index = bisect(partitions, record.end)
for index in range(start_index, end_index):
forward_covs[index] += forward
reverse_covs[index] += reverse
partitions.insert(end_index, record.end)
forward_covs.insert(end_index, forward_covs[end_index - 1] - forward)
reverse_covs.insert(end_index, reverse_covs[end_index - 1] - reverse)
if partitions:
for partition in range(0, start_index):
forward = forward_covs[partition]
reverse = reverse_covs[partition]
if forward + reverse > 0:
coverage.write(chrom=chrom, position=range(partitions[partition], partitions[partition + 1]),
forward=forward, reverse=reverse)
partitions = partitions[start_index:]
forward_covs = forward_covs[start_index:]
reverse_covs = reverse_covs[start_index:]
lastchrom = chrom
# Finish the last chromosome
if partitions:
for partition in range(0, len(partitions) - 1):
forward = forward_covs[partition]
reverse = reverse_covs[partition]
if forward + reverse > 0:
coverage.write(chrom=chrom, position=range(partitions[partition], partitions[partition + 1]),
forward=forward, reverse=reverse)
[docs]class CoverageWriter(object):
[docs] def __init__(self, out_stream=None, chromCol=0, positionCol=1, forwardCol=2, reverseCol=3):
self.out_stream = out_stream
self.reverseCol = reverseCol
self.nlines = 0
positions = {str(chromCol): '%(chrom)s',
str(positionCol): '%(position)d',
str(forwardCol): '%(forward)d',
str(reverseCol): '%(reverse)d'}
if reverseCol < 0:
self.template = "%(0)s\t%(1)s\t%(2)s\n" % positions
else:
self.template = "%(0)s\t%(1)s\t%(2)s\t%(3)s\n" % positions
[docs] def write(self, **kwargs):
if self.reverseCol < 0:
kwargs['forward'] += kwargs['reverse']
posgen = kwargs['position']
for position in posgen:
kwargs['position'] = position
self.out_stream.write(self.template % kwargs)
if __name__ == "__main__":
options, args = doc_optparse.parse(__doc__)
try:
chr_col_1, start_col_1, end_col_1, strand_col_1 = [int(x) - 1 for x in options.cols1.split(',')]
chr_col_2, position_col_2, forward_col_2, reverse_col_2 = [int(x) - 1 for x in options.cols2.split(',')]
in_fname, out_fname = args
except Exception:
doc_optparse.exception()
# Sort through a tempfile first
with tempfile.NamedTemporaryFile(mode="r") as temp_file:
environ['LC_ALL'] = 'POSIX'
subprocess.check_call([
'sort', '-f', '-n', '-k', chr_col_1 + 1, '-k', start_col_1 + 1, '-k', end_col_1 + 1, '-o', temp_file.name, in_fname
])
coverage = CoverageWriter(out_stream=open(out_fname, "a"),
chromCol=chr_col_2, positionCol=position_col_2,
forwardCol=forward_col_2, reverseCol=reverse_col_2, )
temp_file.seek(0)
interval = io.NiceReaderWrapper(temp_file,
chrom_col=chr_col_1,
start_col=start_col_1,
end_col=end_col_1,
strand_col=strand_col_1,
fix_strand=True)
main(interval, coverage)
coverage.close()