Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.converters.interval_to_bedstrict_converter

#!/usr/bin/env python
# Dan Blankenberg

import sys

import bx.intervals.io

assert sys.version_info[:2] >= (2, 6)


[docs]def stop_err(msg): sys.exit(msg)
[docs]def force_bed_field_count(fields, region_count, force_num_columns): if force_num_columns >= 4 and len(fields) < 4: fields.append("region_%i" % (region_count)) if force_num_columns >= 5 and len(fields) < 5: fields.append("0") if force_num_columns >= 6 and len(fields) < 6: fields.append("+") if force_num_columns >= 7 and len(fields) < 7: fields.append(fields[1]) if force_num_columns >= 8 and len(fields) < 8: fields.append(fields[2]) if force_num_columns >= 9 and len(fields) < 9: fields.append("0") if force_num_columns >= 10 and len(fields) < 10: fields.append("0") if force_num_columns >= 11 and len(fields) < 11: fields.append(",") if force_num_columns >= 12 and len(fields) < 12: fields.append(",") return fields[:force_num_columns]
def __main__(): output_name = sys.argv[1] input_name = sys.argv[2] try: chromCol = int(sys.argv[3]) - 1 except Exception: stop_err( f"'{str(sys.argv[3])}' is an invalid chrom column, correct the column settings before attempting to convert the data format." ) try: startCol = int(sys.argv[4]) - 1 except Exception: stop_err( f"'{str(sys.argv[4])}' is an invalid start column, correct the column settings before attempting to convert the data format." ) try: endCol = int(sys.argv[5]) - 1 except Exception: stop_err( f"'{str(sys.argv[5])}' is an invalid end column, correct the column settings before attempting to convert the data format." ) try: strandCol = int(sys.argv[6]) - 1 except Exception: strandCol = -1 try: nameCol = int(sys.argv[7]) - 1 except Exception: nameCol = -1 try: extension = sys.argv[8] except IndexError: extension = "interval" # default extension try: force_num_columns = int(sys.argv[9]) except Exception: force_num_columns = None skipped_lines = 0 first_skipped_line = None count = 0 # does file already conform to bed strict? # if so, we want to keep extended columns, otherwise we'll create a generic 6 column bed file strict_bed = True if ( extension in ["bed", "bedstrict", "bed6", "bed12"] and (chromCol, startCol, endCol) == (0, 1, 2) and (nameCol < 0 or nameCol == 3) and (strandCol < 0 or strandCol == 5) ): with open(input_name) as fh, open(output_name, "w") as out: for count, line in enumerate(fh): line = line.rstrip("\n\r") if line == "" or line.startswith("#"): skipped_lines += 1 if first_skipped_line is None: first_skipped_line = count + 1 continue fields = line.split("\t") try: assert len(fields) >= 3, "A BED file requires at least 3 columns" # we can't fix this if len(fields) > 12: strict_bed = False break # name (fields[3]) can be anything, no verification needed if len(fields) > 4: float( fields[4] ) # score - A score between 0 and 1000. If the track line useScore attribute is set to 1 for this annotation data set, the score value will determine the level of gray in which this feature is displayed (higher numbers = darker gray). if len(fields) > 5: assert fields[5] in [ "+", "-", ], "Invalid strand" # strand - Defines the strand - either '+' or '-'. if len(fields) > 6: int( fields[6] ) # thickStart - The starting position at which the feature is drawn thickly (for example, the start codon in gene displays). if len(fields) > 7: int( fields[7] ) # thickEnd - The ending position at which the feature is drawn thickly (for example, the stop codon in gene displays). if len(fields) > 8: if ( fields[8] != "0" ): # itemRgb - An RGB value of the form R,G,B (e.g. 255,0,0). If the track line itemRgb attribute is set to "On", this RBG value will determine the display color of the data contained in this BED line. NOTE: It is recommended that a simple color scheme (eight colors or less) be used with this attribute to avoid overwhelming the color resources of the Genome Browser and your Internet browser. fields2 = fields[8].split(",") assert len(fields2) == 3, "RGB value must be 0 or have length of 3" for field in fields2: int(field) # rgb values are integers if len(fields) > 9: int(fields[9]) # blockCount - The number of blocks (exons) in the BED line. if len(fields) > 10: if ( fields[10] != "," ): # blockSizes - A comma-separated list of the block sizes. The number of items in this list should correspond to blockCount. fields2 = ( fields[10].rstrip(",").split(",") ) # remove trailing comma and split on comma for field in fields2: int(field) if len(fields) > 11: if ( fields[11] != "," ): # blockStarts - A comma-separated list of block starts. All of the blockStart positions should be calculated relative to chromStart. The number of items in this list should correspond to blockCount. fields2 = ( fields[11].rstrip(",").split(",") ) # remove trailing comma and split on comma for field in fields2: int(field) except Exception: strict_bed = False break if force_num_columns is not None and len(fields) != force_num_columns: line = "\t".join(force_bed_field_count(fields, count, force_num_columns)) out.write(f"{line}\n") else: strict_bed = False if not strict_bed: skipped_lines = 0 first_skipped_line = None count = 0 with open(input_name) as fh, open(output_name, "w") as out: for count, region in enumerate( bx.intervals.io.NiceReaderWrapper( fh, chrom_col=chromCol, start_col=startCol, end_col=endCol, strand_col=strandCol, fix_strand=True, return_header=False, return_comments=False, ) ): try: if nameCol >= 0: name = region.fields[nameCol] else: raise IndexError except Exception: name = "region_%i" % count try: fields = [str(item) for item in (region.chrom, region.start, region.end, name, 0, region.strand)] if force_num_columns is not None and len(fields) != force_num_columns: fields = force_bed_field_count(fields, count, force_num_columns) out.write("{}\n".format("\t".join(fields))) except Exception: skipped_lines += 1 if first_skipped_line is None: first_skipped_line = count + 1 print("%i regions converted to BED." % (count + 1 - skipped_lines)) if skipped_lines > 0: print("Skipped %d blank or invalid lines starting with line # %d." % (skipped_lines, first_skipped_line)) if __name__ == "__main__": __main__()