This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.genomes

from typing import (

from sqlalchemy import func

from galaxy import model as m
from galaxy.exceptions import (
from galaxy.managers.context import ProvidesUserContext
from galaxy.structured_app import StructuredApp
from .base import raise_filter_err

    from galaxy.managers.base import OrmFilterParsersType

[docs]class GenomesManager:
[docs] def __init__(self, app: StructuredApp): self._app = app self.genomes = app.genomes
[docs] def get_dbkeys(self, user: m.User, chrom_info: bool) -> List[List[str]]: return self.genomes.get_dbkeys(user, chrom_info)
[docs] def is_registered_dbkey(self, dbkey: str, user: m.User) -> bool: dbkeys = self.get_dbkeys(user, chrom_info=False) for _, key in dbkeys: if dbkey == key: return True return False
[docs] def get_genome( self, trans: ProvidesUserContext, id: str, num: int, chrom: str, low: int, high: int, reference: bool ) -> Any: if reference: region = self.genomes.reference(trans, dbkey=id, chrom=chrom, low=low, high=high) return {"dataset_type": "refseq", "data": region.sequence} else: return self.genomes.chroms(trans, dbkey=id, num=num, chrom=chrom, low=low)
[docs] def get_sequence(self, trans: ProvidesUserContext, id: str, chrom: str, low: int, high: int) -> Any: region = self.genomes.reference(trans, dbkey=id, chrom=chrom, low=low, high=high) return region.sequence
[docs] def get_indexes(self, id: str, index_type: str) -> Any: index_extensions = {"fasta_indexes": ".fai"} if index_type not in index_extensions: raise RequestParameterInvalidException(f"Invalid index type: {index_type}") tbl_entries = self._app.tool_data_tables.data_tables[index_type].data ext = index_extensions[index_type] index_filename = self._get_index_filename(id, tbl_entries, ext, index_type) try: with open(index_filename) as f: return f.read() except OSError: raise ReferenceDataError(f"Failed to load index file for {id}")
def _get_index_filename(self, id, tbl_entries, ext, index_type): try: paths = [x[-1] for x in tbl_entries if id in x] file_name = paths.pop() except TypeError: raise ReferenceDataError(f"Data tables not found for {index_type}") except IndexError: raise ReferenceDataError(f"Data tables not found for {index_type} for {id}") else: return f"{file_name}{ext}"
[docs]class GenomeFilterMixin: orm_filter_parsers: "OrmFilterParsersType" valid_ops = ("eq", "contains", "has")
[docs] def create_genome_filter(self, attr, op, val): def _create_genome_filter(model_class=None): if op not in GenomeFilterMixin.valid_ops: raise_filter_err(attr, op, val, "bad op in filter") if model_class is None: return True # Doesn't filter genome_build for collections if model_class.__name__ == "HistoryDatasetCollectionAssociation": return False column = func.json_extract(model_class.table.c._metadata, "$.dbkey") lower_val = val.lower() # Ignore case if op == "eq": cond = func.lower(column) == lower_val else: cond = func.lower(column).contains(lower_val, autoescape=True) return cond return _create_genome_filter
def _add_parsers(self): self.orm_filter_parsers.update({"genome_build": self.create_genome_filter})