Source code for galaxy.datatypes.msa

import abc
import logging
import os
import re

from galaxy.datatypes.binary import Binary
from galaxy.datatypes.data import (
    get_file_peek,
    Text,
)
from galaxy.datatypes.metadata import MetadataElement
from galaxy.datatypes.sniff import (
    build_sniff_from_prefix,
    FilePrefix,
)
from galaxy.datatypes.util import generic_util
from galaxy.util import (
    nice_size,
    unicodify,
)

log = logging.getLogger(__name__)

STOCKHOLM_SEARCH_PATTERN = re.compile(r"#\s+STOCKHOLM\s+1\.0")


[docs]@build_sniff_from_prefix class InfernalCM(Text): file_ext = "cm" MetadataElement( name="number_of_models", default=0, desc="Number of covariance models", readonly=True, visible=True, optional=True, no_value=0, ) MetadataElement( name="cm_version", default="1/a", desc="Infernal Covariance Model version", readonly=True, visible=True, optional=True, no_value=0, )
[docs] def set_peek(self, dataset): if not dataset.dataset.purged: dataset.peek = get_file_peek(dataset.file_name) if dataset.metadata.number_of_models == 1: dataset.blurb = "1 model" else: dataset.blurb = f"{dataset.metadata.number_of_models} models" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def sniff_prefix(self, file_prefix: FilePrefix): """ >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname( 'infernal_model.cm' ) >>> InfernalCM().sniff( fname ) True >>> fname = get_test_fname( '2.txt' ) >>> InfernalCM().sniff( fname ) False """ return file_prefix.startswith("INFERNAL")
[docs] def set_meta(self, dataset, **kwd): """ Set the number of models and the version of CM file in dataset. """ dataset.metadata.number_of_models = generic_util.count_special_lines("^INFERNAL", dataset.file_name) with open(dataset.file_name) as f: first_line = f.readline() if first_line.startswith("INFERNAL"): dataset.metadata.cm_version = (first_line.split()[0]).replace("INFERNAL", "")
[docs]@build_sniff_from_prefix class Hmmer(Text): edam_data = "data_1364" edam_format = "format_1370"
[docs] def set_peek(self, dataset): if not dataset.dataset.purged: dataset.peek = get_file_peek(dataset.file_name) dataset.blurb = "HMMER Database" else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def display_peek(self, dataset): try: return dataset.peek except Exception: return f"HMMER database ({nice_size(dataset.get_size())})"
[docs] @abc.abstractmethod def sniff_prefix(self, filename): raise NotImplementedError
[docs]class Hmmer2(Hmmer): edam_format = "format_3328" file_ext = "hmm2"
[docs] def sniff_prefix(self, file_prefix: FilePrefix): """HMMER2 files start with HMMER2.0""" return file_prefix.startswith("HMMER2.0")
[docs]class Hmmer3(Hmmer): edam_format = "format_3329" file_ext = "hmm3"
[docs] def sniff_prefix(self, file_prefix: FilePrefix): """HMMER3 files start with HMMER3/f""" return file_prefix.startswith("HMMER3/f")
[docs]class HmmerPress(Binary): """Class for hmmpress database files.""" file_ext = "hmmpress" composite_type = "basic"
[docs] def set_peek(self, dataset): """Set the peek and blurb text.""" if not dataset.dataset.purged: dataset.peek = "HMMER Binary database" dataset.blurb = "HMMER Binary database" else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disk"
[docs] def display_peek(self, dataset): """Create HTML content, used for displaying peek.""" try: return dataset.peek except Exception: return "HMMER3 database (multiple files)"
[docs] def __init__(self, **kwd): super().__init__(**kwd) # Binary model self.add_composite_file("model.hmm.h3m", is_binary=True) # SSI index for binary model self.add_composite_file("model.hmm.h3i", is_binary=True) # Profiles (MSV part) self.add_composite_file("model.hmm.h3f", is_binary=True) # Profiles (remained) self.add_composite_file("model.hmm.h3p", is_binary=True)
[docs]@build_sniff_from_prefix class Stockholm_1_0(Text): edam_data = "data_0863" edam_format = "format_1961" file_ext = "stockholm" MetadataElement( name="number_of_models", default=0, desc="Number of multiple alignments", readonly=True, visible=True, optional=True, no_value=0, )
[docs] def set_peek(self, dataset): if not dataset.dataset.purged: if dataset.metadata.number_of_models == 1: dataset.blurb = "1 alignment" else: dataset.blurb = f"{dataset.metadata.number_of_models} alignments" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def sniff_prefix(self, file_prefix: FilePrefix): return file_prefix.search(STOCKHOLM_SEARCH_PATTERN)
[docs] def set_meta(self, dataset, **kwd): """ Set the number of models in dataset. """ dataset.metadata.number_of_models = generic_util.count_special_lines( "^#[[:space:]+]STOCKHOLM[[:space:]+]1.0", dataset.file_name )
[docs] @classmethod def split(cls, input_datasets, subdir_generator_function, split_params): """ Split the input files by model records. """ if split_params is None: return None if len(input_datasets) > 1: raise Exception("STOCKHOLM-file splitting does not support multiple files") input_files = [ds.file_name for ds in input_datasets] chunk_size = None if split_params["split_mode"] == "number_of_parts": raise Exception( f"Split mode \"{split_params['split_mode']}\" is currently not implemented for STOCKHOLM-files." ) elif split_params["split_mode"] == "to_size": chunk_size = int(split_params["split_size"]) else: raise Exception(f"Unsupported split mode {split_params['split_mode']}") def _read_stockholm_records(filename): lines = [] with open(filename) as handle: for line in handle: lines.append(line) if line.strip() == "//": yield lines lines = [] def _write_part_stockholm_file(accumulated_lines): part_dir = subdir_generator_function() part_path = os.path.join(part_dir, os.path.basename(input_files[0])) with open(part_path, "w") as part_file: part_file.writelines(accumulated_lines) try: stockholm_records = _read_stockholm_records(input_files[0]) stockholm_lines_accumulated = [] for counter, stockholm_record in enumerate(stockholm_records, start=1): stockholm_lines_accumulated.extend(stockholm_record) if counter % chunk_size == 0: _write_part_stockholm_file(stockholm_lines_accumulated) stockholm_lines_accumulated = [] if stockholm_lines_accumulated: _write_part_stockholm_file(stockholm_lines_accumulated) except Exception as e: log.error("Unable to split files: %s", unicodify(e)) raise
[docs]@build_sniff_from_prefix class MauveXmfa(Text): file_ext = "xmfa" MetadataElement( name="number_of_models", default=0, desc="Number of alignmened sequences", readonly=True, visible=True, optional=True, no_value=0, )
[docs] def set_peek(self, dataset): if not dataset.dataset.purged: if dataset.metadata.number_of_models == 1: dataset.blurb = "1 alignment" else: dataset.blurb = f"{dataset.metadata.number_of_models} alignments" dataset.peek = get_file_peek(dataset.file_name) else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def sniff_prefix(self, file_prefix: FilePrefix): return file_prefix.startswith("#FormatVersion Mauve1")
[docs] def set_meta(self, dataset, **kwd): dataset.metadata.number_of_models = generic_util.count_special_lines( "^#Sequence([[:digit:]]+)Entry", dataset.file_name )
[docs]class Msf(Text): """ Multiple sequence alignment format produced by the Accelrys GCG suite and other programs. """ edam_data = "data_0863" edam_format = "format_1947" file_ext = "msf"