Source code for galaxy.datatypes.converters.interval_to_coverage

#!/usr/bin/env python
Converter to generate 3 (or 4) column base-pair coverage from an interval file.

usage: %prog bed_file out_file
    -1, --cols1=N,N,N,N: Columns for chrom, start, end, strand in interval file
    -2, --cols2=N,N,N,N: Columns for chrom, start, end, strand in coverage file
import subprocess
import tempfile
from bisect import bisect
from os import environ

from bx.cookbook import doc_optparse
from bx.intervals import io



[docs]def main(interval, coverage): """ Uses a sliding window of partitions to count coverages. Every interval record adds its start and end to the partitions. The result is a list of partitions, or every position that has a (maybe) different number of basepairs covered. We don't worry about merging because we pop as the sorted intervals are read in. As the input start positions exceed the partition positions in partitions, coverages are kicked out in bulk. """ partitions = [] forward_covs = [] reverse_covs = [] chrom = None lastchrom = None for record in interval: chrom = record.chrom if lastchrom and not lastchrom == chrom and partitions: for partition in range(0, len(partitions) - 1): forward = forward_covs[partition] reverse = reverse_covs[partition] if forward + reverse > 0: coverage.write(chrom=chrom, position=range(partitions[partition], partitions[partition + 1]), forward=forward, reverse=reverse) partitions = [] forward_covs = [] reverse_covs = [] start_index = bisect(partitions, record.start) forward = int(record.strand == "+") reverse = int(record.strand == "-") forward_base = 0 reverse_base = 0 if start_index > 0: forward_base = forward_covs[start_index - 1] reverse_base = reverse_covs[start_index - 1] partitions.insert(start_index, record.start) forward_covs.insert(start_index, forward_base) reverse_covs.insert(start_index, reverse_base) end_index = bisect(partitions, record.end) for index in range(start_index, end_index): forward_covs[index] += forward reverse_covs[index] += reverse partitions.insert(end_index, record.end) forward_covs.insert(end_index, forward_covs[end_index - 1] - forward) reverse_covs.insert(end_index, reverse_covs[end_index - 1] - reverse) if partitions: for partition in range(0, start_index): forward = forward_covs[partition] reverse = reverse_covs[partition] if forward + reverse > 0: coverage.write(chrom=chrom, position=range(partitions[partition], partitions[partition + 1]), forward=forward, reverse=reverse) partitions = partitions[start_index:] forward_covs = forward_covs[start_index:] reverse_covs = reverse_covs[start_index:] lastchrom = chrom # Finish the last chromosome if partitions: for partition in range(0, len(partitions) - 1): forward = forward_covs[partition] reverse = reverse_covs[partition] if forward + reverse > 0: coverage.write(chrom=chrom, position=range(partitions[partition], partitions[partition + 1]), forward=forward, reverse=reverse)
[docs]class CoverageWriter(object):
[docs] def __init__(self, out_stream=None, chromCol=0, positionCol=1, forwardCol=2, reverseCol=3): self.out_stream = out_stream self.reverseCol = reverseCol self.nlines = 0 positions = {str(chromCol): '%(chrom)s', str(positionCol): '%(position)d', str(forwardCol): '%(forward)d', str(reverseCol): '%(reverse)d'} if reverseCol < 0: self.template = "%(0)s\t%(1)s\t%(2)s\n" % positions else: self.template = "%(0)s\t%(1)s\t%(2)s\t%(3)s\n" % positions
[docs] def write(self, **kwargs): if self.reverseCol < 0: kwargs['forward'] += kwargs['reverse'] posgen = kwargs['position'] for position in posgen: kwargs['position'] = position self.out_stream.write(self.template % kwargs)
[docs] def close(self): self.out_stream.flush() self.out_stream.close()
if __name__ == "__main__": options, args = doc_optparse.parse(__doc__) try: chr_col_1, start_col_1, end_col_1, strand_col_1 = [int(x) - 1 for x in options.cols1.split(',')] chr_col_2, position_col_2, forward_col_2, reverse_col_2 = [int(x) - 1 for x in options.cols2.split(',')] in_fname, out_fname = args except Exception: doc_optparse.exception() # Sort through a tempfile first with tempfile.NamedTemporaryFile(mode="r") as temp_file: environ['LC_ALL'] = 'POSIX' subprocess.check_call([ 'sort', '-f', '-n', '-k', chr_col_1 + 1, '-k', start_col_1 + 1, '-k', end_col_1 + 1, '-o', temp_file.name, in_fname ]) coverage = CoverageWriter(out_stream=open(out_fname, "a"), chromCol=chr_col_2, positionCol=position_col_2, forwardCol=forward_col_2, reverseCol=reverse_col_2, ) temp_file.seek(0) interval = io.NiceReaderWrapper(temp_file, chrom_col=chr_col_1, start_col=start_col_1, end_col=end_col_1, strand_col=strand_col_1, fix_strand=True) main(interval, coverage) coverage.close()