Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.datatypes.msa

import abc
import logging
import os
import re
from typing import (
    Callable,
    Dict,
    List,
    Optional,
)

from galaxy.datatypes.binary import Binary
from galaxy.datatypes.data import (
    get_file_peek,
    Text,
)
from galaxy.datatypes.metadata import MetadataElement
from galaxy.datatypes.protocols import DatasetProtocol
from galaxy.datatypes.sniff import (
    build_sniff_from_prefix,
    FilePrefix,
)
from galaxy.datatypes.util import generic_util
from galaxy.util import (
    nice_size,
    unicodify,
)

log = logging.getLogger(__name__)

STOCKHOLM_SEARCH_PATTERN = re.compile(r"#\s+STOCKHOLM\s+1\.0")


[docs]@build_sniff_from_prefix class InfernalCM(Text): file_ext = "cm" MetadataElement( name="number_of_models", default=0, desc="Number of covariance models", readonly=True, visible=True, optional=True, no_value=0, ) MetadataElement( name="cm_version", default="1/a", desc="Infernal Covariance Model version", readonly=True, visible=True, optional=True, no_value=0, )
[docs] def set_peek(self, dataset: DatasetProtocol, **kwd) -> None: if not dataset.dataset.purged: dataset.peek = get_file_peek(dataset.get_file_name()) if dataset.metadata.number_of_models == 1: dataset.blurb = "1 model" else: dataset.blurb = f"{dataset.metadata.number_of_models} models" dataset.peek = get_file_peek(dataset.get_file_name()) else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def sniff_prefix(self, file_prefix: FilePrefix) -> bool: """ >>> from galaxy.datatypes.sniff import get_test_fname >>> fname = get_test_fname( 'infernal_model.cm' ) >>> InfernalCM().sniff( fname ) True >>> fname = get_test_fname( '2.txt' ) >>> InfernalCM().sniff( fname ) False """ return file_prefix.startswith("INFERNAL")
[docs] def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None: """ Set the number of models and the version of CM file in dataset. """ dataset.metadata.number_of_models = generic_util.count_special_lines("^INFERNAL", dataset.get_file_name()) with open(dataset.get_file_name()) as f: first_line = f.readline() if first_line.startswith("INFERNAL"): dataset.metadata.cm_version = (first_line.split()[0]).replace("INFERNAL", "")
[docs]@build_sniff_from_prefix class Hmmer(Text): edam_data = "data_1364" edam_format = "format_1370"
[docs] def set_peek(self, dataset: DatasetProtocol, **kwd) -> None: if not dataset.dataset.purged: dataset.peek = get_file_peek(dataset.get_file_name()) dataset.blurb = "HMMER Database" else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def display_peek(self, dataset: DatasetProtocol) -> str: try: return dataset.peek except Exception: return f"HMMER database ({nice_size(dataset.get_size())})"
[docs] @abc.abstractmethod def sniff_prefix(self, file_prefix: FilePrefix) -> bool: raise NotImplementedError
[docs]class Hmmer2(Hmmer): edam_format = "format_3328" file_ext = "hmm2"
[docs] def sniff_prefix(self, file_prefix: FilePrefix) -> bool: """HMMER2 files start with HMMER2.0""" return file_prefix.startswith("HMMER2.0")
[docs]class Hmmer3(Hmmer): edam_format = "format_3329" file_ext = "hmm3"
[docs] def sniff_prefix(self, file_prefix: FilePrefix) -> bool: """HMMER3 files start with HMMER3/f""" return file_prefix.startswith("HMMER3/f")
[docs]class HmmerPress(Binary): """Class for hmmpress database files.""" file_ext = "hmmpress" composite_type = "basic"
[docs] def set_peek(self, dataset: DatasetProtocol, **kwd) -> None: """Set the peek and blurb text.""" if not dataset.dataset.purged: dataset.peek = "HMMER Binary database" dataset.blurb = "HMMER Binary database" else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disk"
[docs] def display_peek(self, dataset: DatasetProtocol) -> str: """Create HTML content, used for displaying peek.""" try: return dataset.peek except Exception: return "HMMER3 database (multiple files)"
[docs] def __init__(self, **kwd): super().__init__(**kwd) # Binary model self.add_composite_file("model.hmm.h3m", is_binary=True) # SSI index for binary model self.add_composite_file("model.hmm.h3i", is_binary=True) # Profiles (MSV part) self.add_composite_file("model.hmm.h3f", is_binary=True) # Profiles (remained) self.add_composite_file("model.hmm.h3p", is_binary=True)
[docs]@build_sniff_from_prefix class Stockholm_1_0(Text): edam_data = "data_0863" edam_format = "format_1961" file_ext = "stockholm" MetadataElement( name="number_of_models", default=0, desc="Number of multiple alignments", readonly=True, visible=True, optional=True, no_value=0, )
[docs] def set_peek(self, dataset: DatasetProtocol, **kwd) -> None: if not dataset.dataset.purged: if dataset.metadata.number_of_models == 1: dataset.blurb = "1 alignment" else: dataset.blurb = f"{dataset.metadata.number_of_models} alignments" dataset.peek = get_file_peek(dataset.get_file_name()) else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def sniff_prefix(self, file_prefix: FilePrefix) -> bool: return file_prefix.search(STOCKHOLM_SEARCH_PATTERN)
[docs] def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None: """ Set the number of models in dataset. """ dataset.metadata.number_of_models = generic_util.count_special_lines( "^#[[:space:]+]STOCKHOLM[[:space:]+]1.0", dataset.get_file_name() )
[docs] @classmethod def split(cls, input_datasets: List, subdir_generator_function: Callable, split_params: Optional[Dict]) -> None: """ Split the input files by model records. """ if split_params is None: return None if len(input_datasets) > 1: raise Exception("STOCKHOLM-file splitting does not support multiple files") input_files = [ds.get_file_name() for ds in input_datasets] chunk_size = None if split_params["split_mode"] == "number_of_parts": raise Exception( f"Split mode \"{split_params['split_mode']}\" is currently not implemented for STOCKHOLM-files." ) elif split_params["split_mode"] == "to_size": chunk_size = int(split_params["split_size"]) else: raise Exception(f"Unsupported split mode {split_params['split_mode']}") def _read_stockholm_records(filename): lines = [] with open(filename) as handle: for line in handle: lines.append(line) if line.strip() == "//": yield lines lines = [] def _write_part_stockholm_file(accumulated_lines): part_dir = subdir_generator_function() part_path = os.path.join(part_dir, os.path.basename(input_files[0])) with open(part_path, "w") as part_file: part_file.writelines(accumulated_lines) try: stockholm_records = _read_stockholm_records(input_files[0]) stockholm_lines_accumulated = [] for counter, stockholm_record in enumerate(stockholm_records, start=1): stockholm_lines_accumulated.extend(stockholm_record) if counter % chunk_size == 0: _write_part_stockholm_file(stockholm_lines_accumulated) stockholm_lines_accumulated = [] if stockholm_lines_accumulated: _write_part_stockholm_file(stockholm_lines_accumulated) except Exception as e: log.error("Unable to split files: %s", unicodify(e)) raise
[docs]@build_sniff_from_prefix class MauveXmfa(Text): file_ext = "xmfa" MetadataElement( name="number_of_models", default=0, desc="Number of alignmened sequences", readonly=True, visible=True, optional=True, no_value=0, )
[docs] def set_peek(self, dataset: DatasetProtocol, **kwd) -> None: if not dataset.dataset.purged: if dataset.metadata.number_of_models == 1: dataset.blurb = "1 alignment" else: dataset.blurb = f"{dataset.metadata.number_of_models} alignments" dataset.peek = get_file_peek(dataset.get_file_name()) else: dataset.peek = "file does not exist" dataset.blurb = "file purged from disc"
[docs] def sniff_prefix(self, file_prefix: FilePrefix) -> bool: return file_prefix.startswith("#FormatVersion Mauve1")
[docs] def set_meta(self, dataset: DatasetProtocol, overwrite: bool = True, **kwd) -> None: dataset.metadata.number_of_models = generic_util.count_special_lines( "^#Sequence([[:digit:]]+)Entry", dataset.get_file_name() )
[docs]class Msf(Text): """ Multiple sequence alignment format produced by the Accelrys GCG suite and other programs. """ edam_data = "data_0863" edam_format = "format_1947" file_ext = "msf"