Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.visualization.genomes

import logging
import os
import re
import sys
from json import loads
from typing import (
    Dict,
    Optional,
)

from bx.seq.twobit import TwoBitFile

from galaxy.exceptions import (
    ObjectNotFound,
    ReferenceDataError,
)
from galaxy.managers.users import get_user_by_username
from galaxy.model import (
    HistoryDatasetAssociation,
    User,
)
from galaxy.structured_app import StructuredApp
from galaxy.util.bunch import Bunch

log = logging.getLogger(__name__)

# FIXME: copied from tracks.py
# Message strings returned to browser
messages = Bunch(
    PENDING="pending",
    NO_DATA="no data",
    NO_CHROMOSOME="no chromosome",
    NO_CONVERTER="no converter",
    NO_TOOL="no tool",
    DATA="data",
    ERROR="error",
    OK="ok",
)


[docs]def decode_dbkey(dbkey): """Decodes dbkey and returns tuple ( username, dbkey )""" if isinstance(dbkey, str) and ":" in dbkey: return dbkey.split(":") else: return None, dbkey
[docs]class GenomeRegion: """ A genomic region on an individual chromosome. """
[docs] def __init__(self, chrom=None, start=0, end=0, sequence=None): self.chrom = chrom self.start = int(start) self.end = int(end) self.sequence = sequence
def __str__(self): return f"{self.chrom}:{str(self.start)}-{str(self.end)}"
[docs] @staticmethod def from_dict(obj_dict): return GenomeRegion(chrom=obj_dict["chrom"], start=obj_dict["start"], end=obj_dict["end"])
[docs] @staticmethod def from_str(obj_str): # check for gene region gene_region = obj_str.split(":") # split gene region into components if len(gene_region) == 2: gene_interval = gene_region[1].split("-") # check length if len(gene_interval) == 2: return GenomeRegion(chrom=gene_region[0], start=gene_interval[0], end=gene_interval[1]) # return genome region instance return GenomeRegion()
[docs]class Genome: """ Encapsulates information about a known genome/dbkey. """
[docs] def __init__(self, key, description, len_file=None, twobit_file=None): self.key = key self.description = description self.len_file = len_file self.twobit_file = twobit_file
[docs] def to_dict(self, num=None, chrom=None, low=None): """ Returns representation of self as a dictionary. """ # if there's no len_file, there's nothing to return if not self.len_file: raise ReferenceDataError(f"len_file not set for {self.key}") def check_int(s): if s.isdigit(): return int(s) else: return s def split_by_number(s): return [check_int(c) for c in re.split("([0-9]+)", s)] # # Parameter check, setting. # if num: num = int(num) else: num = sys.maxsize # just a big number if low: low = int(low) if low < 0: low = 0 else: low = 0 # # Get chroms data: # (a) chrom name, len; # (b) whether there are previous, next chroms; # (c) index of start chrom. # with open(self.len_file) as f: len_file_enumerate = enumerate(f) chroms = {} prev_chroms = False start_index = 0 if chrom: # Use starting chrom to start list. found = False count = 0 for line_num, line in len_file_enumerate: if line.startswith("#"): continue name, len = line.split("\t") if found: chroms[name] = int(len) count += 1 elif name == chrom: # Found starting chrom. chroms[name] = int(len) count += 1 found = True start_index = line_num if line_num != 0: prev_chroms = True if count >= num: break else: # Use low to start list. high = low + int(num) prev_chroms = low != 0 start_index = low # Read chrom data from len file. for line_num, line in len_file_enumerate: if line_num < low: continue if line_num >= high: break if line.startswith("#"): continue # LEN files have format: # <chrom_name><tab><chrom_length> fields = line.split("\t") chroms[fields[0]] = int(fields[1]) # Set flag to indicate whether there are more chroms after list. next_chroms = False try: next(len_file_enumerate) next_chroms = True except StopIteration: # No more chroms to read. pass to_sort = [{"chrom": chrm, "len": length} for chrm, length in chroms.items()] to_sort.sort(key=lambda _: split_by_number(_["chrom"])) return { "id": self.key, "reference": self.twobit_file is not None, "chrom_info": to_sort, "prev_chroms": prev_chroms, "next_chroms": next_chroms, "start_index": start_index, }
[docs]class Genomes: """ Provides information about available genome data and methods for manipulating that data. """
[docs] def __init__(self, app: StructuredApp): self.app = app # Create list of genomes from app.genome_builds self.genomes: Dict[str, Genome] = {} # Store internal versions of data tables for twobit and __dbkey__ self._table_versions = {"twobit": None, "__dbkeys__": None} self.reload_genomes()
[docs] def reload_genomes(self): self.genomes = {} # Store table versions for later for table_name in self._table_versions.keys(): table = self.app.tool_data_tables.get(table_name, None) if table is not None: self._table_versions[table_name] = table._loaded_content_version twobit_table = self.app.tool_data_tables.get("twobit", None) twobit_fields = {} if twobit_table is None: # Add genome data (twobit files) to genomes, directly from twobit.loc try: twobit_path = os.path.join(self.app.config.tool_data_path, "twobit.loc") with open(twobit_path) as f: for line in f: if line.startswith("#"): continue val = line.split() if len(val) == 2: key, path = val twobit_fields[key] = path except OSError: # Thrown if twobit.loc does not exist. log.exception("Error reading twobit.loc") for key, description in self.app.genome_builds.get_genome_build_names(): self.genomes[key] = Genome(key, description) # Add len files to genomes. self.genomes[key].len_file = self.app.genome_builds.get_chrom_info(key)[0] if self.genomes[key].len_file: if not os.path.exists(self.genomes[key].len_file): self.genomes[key].len_file = None # Add genome data (twobit files) to genomes. if twobit_table is not None: self.genomes[key].twobit_file = twobit_table.get_entry("value", key, "path", default=None) elif key in twobit_fields: self.genomes[key].twobit_file = twobit_fields[key]
[docs] def check_and_reload(self): # Check if tables have been modified, if so reload for table_name, table_version in self._table_versions.items(): table = self.app.tool_data_tables.get(table_name, None) if table is not None and not table.is_current_version(table_version): return self.reload_genomes()
[docs] def get_build(self, dbkey): """Returns build for the given key.""" self.check_and_reload() rval = None if dbkey in self.genomes: rval = self.genomes[dbkey] return rval
[docs] def get_dbkeys(self, user: Optional[User], chrom_info=False): """Returns all known dbkeys. If chrom_info is True, only dbkeys with chromosome lengths are returned.""" self.check_and_reload() dbkeys = [] # Add user's custom keys to dbkeys. if user and "dbkeys" in user.preferences: user_keys_dict = loads(user.preferences["dbkeys"]) dbkeys.extend([(attributes["name"], key) for key, attributes in user_keys_dict.items()]) # Add app keys to dbkeys. # If chrom_info is True, only include keys with len files (which contain chromosome info). if chrom_info: def filter_fn(b): return b.len_file is not None else: def filter_fn(b): return True dbkeys.extend([(genome.description, genome.key) for key, genome in self.genomes.items() if filter_fn(genome)]) return dbkeys
[docs] def chroms(self, trans, dbkey=None, num=None, chrom=None, low=None): """ Returns a naturally sorted list of chroms/contigs for a given dbkey. Use either chrom or low to specify the starting chrom in the return list. """ session = trans.sa_session self.check_and_reload() # If there is no dbkey owner, default to current user. dbkey_owner, dbkey = decode_dbkey(dbkey) if dbkey_owner: dbkey_user = get_user_by_username(session, dbkey_owner) else: dbkey_user = trans.user # # Get/create genome object. # genome = None twobit_file = None # Look first in user's custom builds. if dbkey_user and "dbkeys" in dbkey_user.preferences: user_keys = loads(dbkey_user.preferences["dbkeys"]) if dbkey in user_keys: dbkey_attributes = user_keys[dbkey] dbkey_name = dbkey_attributes["name"] # If there's a fasta for genome, convert to 2bit for later use. if "fasta" in dbkey_attributes: build_fasta = session.get(HistoryDatasetAssociation, dbkey_attributes["fasta"]) len_file = build_fasta.get_converted_dataset(trans, "len").get_file_name() build_fasta.get_converted_dataset(trans, "twobit") # HACK: set twobit_file to True rather than a file name because # get_converted_dataset returns null during conversion even though # there will eventually be a twobit file available for genome. twobit_file = True # Backwards compatibility: look for len file directly. elif "len" in dbkey_attributes: len_file = session.get(HistoryDatasetAssociation, user_keys[dbkey]["len"]).get_file_name() if len_file: genome = Genome(dbkey, dbkey_name, len_file=len_file, twobit_file=twobit_file) # Look in history and system builds. if not genome: # Look in history for chromosome len file. len_ds = trans.db_dataset_for(dbkey) if len_ds: genome = Genome(dbkey, dbkey_name, len_file=len_ds.get_file_name()) # Look in system builds. elif dbkey in self.genomes: genome = self.genomes[dbkey] if not genome: raise ObjectNotFound(f"genome not found for key {dbkey}") return genome.to_dict(num=num, chrom=chrom, low=low)
[docs] def has_reference_data(self, dbkey, dbkey_owner=None): """ Returns true if there is reference data for the specified dbkey. If dbkey is custom, dbkey_owner is needed to determine if there is reference data. """ self.check_and_reload() # Look for key in built-in builds. if dbkey in self.genomes and self.genomes[dbkey].twobit_file: # There is built-in reference data. return True # Look for key in owner's custom builds. if dbkey_owner and "dbkeys" in dbkey_owner.preferences: user_keys = loads(dbkey_owner.preferences["dbkeys"]) if dbkey in user_keys: dbkey_attributes = user_keys[dbkey] if "fasta" in dbkey_attributes: # Fasta + converted datasets can provide reference data. return True return False
[docs] def reference(self, trans, dbkey, chrom, low, high): """ Return reference data for a build. """ self.check_and_reload() # If there is no dbkey owner, default to current user. dbkey_owner, dbkey = decode_dbkey(dbkey) if dbkey_owner: dbkey_user = get_user_by_username(trans.sa_session, dbkey_owner) else: dbkey_user = trans.user if not self.has_reference_data(dbkey, dbkey_user): raise ReferenceDataError(f"No reference data for {dbkey}") # # Get twobit file with reference data. # twobit_file_name = None if dbkey in self.genomes: # Built-in twobit. twobit_file_name = self.genomes[dbkey].twobit_file else: user_keys = loads(dbkey_user.preferences["dbkeys"]) dbkey_attributes = user_keys[dbkey] fasta_dataset = trans.sa_session.get(HistoryDatasetAssociation, dbkey_attributes["fasta"]) msg = fasta_dataset.convert_dataset(trans, "twobit") if msg: return msg else: twobit_dataset = fasta_dataset.get_converted_dataset(trans, "twobit") twobit_file_name = twobit_dataset.get_file_name() return self._get_reference_data(twobit_file_name, chrom, low, high)
@staticmethod def _get_reference_data(twobit_file_name, chrom, low, high): # Read and return reference data. with open(twobit_file_name, "rb") as f: twobit = TwoBitFile(f) if chrom in twobit: seq_data = twobit[chrom].get(int(low), int(high)) return GenomeRegion(chrom=chrom, start=low, end=high, sequence=seq_data)