Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.visualization.genomes
import logging
import os
import re
import sys
from json import loads
from typing import (
Dict,
Optional,
)
from bx.seq.twobit import TwoBitFile
from galaxy.exceptions import (
ObjectNotFound,
ReferenceDataError,
)
from galaxy.managers.users import get_user_by_username
from galaxy.model import (
HistoryDatasetAssociation,
User,
)
from galaxy.structured_app import StructuredApp
from galaxy.util.bunch import Bunch
log = logging.getLogger(__name__)
# FIXME: copied from tracks.py
# Message strings returned to browser
messages = Bunch(
PENDING="pending",
NO_DATA="no data",
NO_CHROMOSOME="no chromosome",
NO_CONVERTER="no converter",
NO_TOOL="no tool",
DATA="data",
ERROR="error",
OK="ok",
)
[docs]def decode_dbkey(dbkey):
"""Decodes dbkey and returns tuple ( username, dbkey )"""
if isinstance(dbkey, str) and ":" in dbkey:
return dbkey.split(":")
else:
return None, dbkey
[docs]class GenomeRegion:
"""
A genomic region on an individual chromosome.
"""
[docs] def __init__(self, chrom=None, start=0, end=0, sequence=None):
self.chrom = chrom
self.start = int(start)
self.end = int(end)
self.sequence = sequence
def __str__(self):
return f"{self.chrom}:{str(self.start)}-{str(self.end)}"
[docs] @staticmethod
def from_dict(obj_dict):
return GenomeRegion(chrom=obj_dict["chrom"], start=obj_dict["start"], end=obj_dict["end"])
[docs] @staticmethod
def from_str(obj_str):
# check for gene region
gene_region = obj_str.split(":")
# split gene region into components
if len(gene_region) == 2:
gene_interval = gene_region[1].split("-")
# check length
if len(gene_interval) == 2:
return GenomeRegion(chrom=gene_region[0], start=gene_interval[0], end=gene_interval[1])
# return genome region instance
return GenomeRegion()
[docs]class Genome:
"""
Encapsulates information about a known genome/dbkey.
"""
[docs] def __init__(self, key, description, len_file=None, twobit_file=None):
self.key = key
self.description = description
self.len_file = len_file
self.twobit_file = twobit_file
[docs] def to_dict(self, num=None, chrom=None, low=None):
"""
Returns representation of self as a dictionary.
"""
# if there's no len_file, there's nothing to return
if not self.len_file:
raise ReferenceDataError(f"len_file not set for {self.key}")
def check_int(s):
if s.isdigit():
return int(s)
else:
return s
def split_by_number(s):
return [check_int(c) for c in re.split("([0-9]+)", s)]
#
# Parameter check, setting.
#
if num:
num = int(num)
else:
num = sys.maxsize # just a big number
if low:
low = int(low)
if low < 0:
low = 0
else:
low = 0
#
# Get chroms data:
# (a) chrom name, len;
# (b) whether there are previous, next chroms;
# (c) index of start chrom.
#
with open(self.len_file) as f:
len_file_enumerate = enumerate(f)
chroms = {}
prev_chroms = False
start_index = 0
if chrom:
# Use starting chrom to start list.
found = False
count = 0
for line_num, line in len_file_enumerate:
if line.startswith("#"):
continue
name, len = line.split("\t")
if found:
chroms[name] = int(len)
count += 1
elif name == chrom:
# Found starting chrom.
chroms[name] = int(len)
count += 1
found = True
start_index = line_num
if line_num != 0:
prev_chroms = True
if count >= num:
break
else:
# Use low to start list.
high = low + int(num)
prev_chroms = low != 0
start_index = low
# Read chrom data from len file.
for line_num, line in len_file_enumerate:
if line_num < low:
continue
if line_num >= high:
break
if line.startswith("#"):
continue
# LEN files have format:
# <chrom_name><tab><chrom_length>
fields = line.split("\t")
chroms[fields[0]] = int(fields[1])
# Set flag to indicate whether there are more chroms after list.
next_chroms = False
try:
next(len_file_enumerate)
next_chroms = True
except StopIteration:
# No more chroms to read.
pass
to_sort = [{"chrom": chrm, "len": length} for chrm, length in chroms.items()]
to_sort.sort(key=lambda _: split_by_number(_["chrom"]))
return {
"id": self.key,
"reference": self.twobit_file is not None,
"chrom_info": to_sort,
"prev_chroms": prev_chroms,
"next_chroms": next_chroms,
"start_index": start_index,
}
[docs]class Genomes:
"""
Provides information about available genome data and methods for manipulating that data.
"""
[docs] def __init__(self, app: StructuredApp):
self.app = app
# Create list of genomes from app.genome_builds
self.genomes: Dict[str, Genome] = {}
# Store internal versions of data tables for twobit and __dbkey__
self._table_versions = {"twobit": None, "__dbkeys__": None}
self.reload_genomes()
[docs] def reload_genomes(self):
self.genomes = {}
# Store table versions for later
for table_name in self._table_versions.keys():
table = self.app.tool_data_tables.get(table_name, None)
if table is not None:
self._table_versions[table_name] = table._loaded_content_version
twobit_table = self.app.tool_data_tables.get("twobit", None)
twobit_fields = {}
if twobit_table is None:
# Add genome data (twobit files) to genomes, directly from twobit.loc
try:
twobit_path = os.path.join(self.app.config.tool_data_path, "twobit.loc")
with open(twobit_path) as f:
for line in f:
if line.startswith("#"):
continue
val = line.split()
if len(val) == 2:
key, path = val
twobit_fields[key] = path
except OSError:
# Thrown if twobit.loc does not exist.
log.exception("Error reading twobit.loc")
for key, description in self.app.genome_builds.get_genome_build_names():
self.genomes[key] = Genome(key, description)
# Add len files to genomes.
self.genomes[key].len_file = self.app.genome_builds.get_chrom_info(key)[0]
if self.genomes[key].len_file:
if not os.path.exists(self.genomes[key].len_file):
self.genomes[key].len_file = None
# Add genome data (twobit files) to genomes.
if twobit_table is not None:
self.genomes[key].twobit_file = twobit_table.get_entry("value", key, "path", default=None)
elif key in twobit_fields:
self.genomes[key].twobit_file = twobit_fields[key]
[docs] def check_and_reload(self):
# Check if tables have been modified, if so reload
for table_name, table_version in self._table_versions.items():
table = self.app.tool_data_tables.get(table_name, None)
if table is not None and not table.is_current_version(table_version):
return self.reload_genomes()
[docs] def get_build(self, dbkey):
"""Returns build for the given key."""
self.check_and_reload()
rval = None
if dbkey in self.genomes:
rval = self.genomes[dbkey]
return rval
[docs] def get_dbkeys(self, user: Optional[User], chrom_info=False):
"""Returns all known dbkeys. If chrom_info is True, only dbkeys with
chromosome lengths are returned."""
self.check_and_reload()
dbkeys = []
# Add user's custom keys to dbkeys.
if user and "dbkeys" in user.preferences:
user_keys_dict = loads(user.preferences["dbkeys"])
dbkeys.extend([(attributes["name"], key) for key, attributes in user_keys_dict.items()])
# Add app keys to dbkeys.
# If chrom_info is True, only include keys with len files (which contain chromosome info).
if chrom_info:
def filter_fn(b):
return b.len_file is not None
else:
def filter_fn(b):
return True
dbkeys.extend([(genome.description, genome.key) for key, genome in self.genomes.items() if filter_fn(genome)])
return dbkeys
[docs] def chroms(self, trans, dbkey=None, num=None, chrom=None, low=None):
"""
Returns a naturally sorted list of chroms/contigs for a given dbkey.
Use either chrom or low to specify the starting chrom in the return list.
"""
session = trans.sa_session
self.check_and_reload()
# If there is no dbkey owner, default to current user.
dbkey_owner, dbkey = decode_dbkey(dbkey)
if dbkey_owner:
dbkey_user = get_user_by_username(session, dbkey_owner)
else:
dbkey_user = trans.user
#
# Get/create genome object.
#
genome = None
twobit_file = None
# Look first in user's custom builds.
if dbkey_user and "dbkeys" in dbkey_user.preferences:
user_keys = loads(dbkey_user.preferences["dbkeys"])
if dbkey in user_keys:
dbkey_attributes = user_keys[dbkey]
dbkey_name = dbkey_attributes["name"]
# If there's a fasta for genome, convert to 2bit for later use.
if "fasta" in dbkey_attributes:
build_fasta = session.get(HistoryDatasetAssociation, dbkey_attributes["fasta"])
len_file = build_fasta.get_converted_dataset(trans, "len").get_file_name()
build_fasta.get_converted_dataset(trans, "twobit")
# HACK: set twobit_file to True rather than a file name because
# get_converted_dataset returns null during conversion even though
# there will eventually be a twobit file available for genome.
twobit_file = True
# Backwards compatibility: look for len file directly.
elif "len" in dbkey_attributes:
len_file = session.get(HistoryDatasetAssociation, user_keys[dbkey]["len"]).get_file_name()
if len_file:
genome = Genome(dbkey, dbkey_name, len_file=len_file, twobit_file=twobit_file)
# Look in history and system builds.
if not genome:
# Look in history for chromosome len file.
len_ds = trans.db_dataset_for(dbkey)
if len_ds:
genome = Genome(dbkey, dbkey_name, len_file=len_ds.get_file_name())
# Look in system builds.
elif dbkey in self.genomes:
genome = self.genomes[dbkey]
if not genome:
raise ObjectNotFound(f"genome not found for key {dbkey}")
return genome.to_dict(num=num, chrom=chrom, low=low)
[docs] def has_reference_data(self, dbkey, dbkey_owner=None):
"""
Returns true if there is reference data for the specified dbkey. If dbkey is custom,
dbkey_owner is needed to determine if there is reference data.
"""
self.check_and_reload()
# Look for key in built-in builds.
if dbkey in self.genomes and self.genomes[dbkey].twobit_file:
# There is built-in reference data.
return True
# Look for key in owner's custom builds.
if dbkey_owner and "dbkeys" in dbkey_owner.preferences:
user_keys = loads(dbkey_owner.preferences["dbkeys"])
if dbkey in user_keys:
dbkey_attributes = user_keys[dbkey]
if "fasta" in dbkey_attributes:
# Fasta + converted datasets can provide reference data.
return True
return False
[docs] def reference(self, trans, dbkey, chrom, low, high):
"""
Return reference data for a build.
"""
self.check_and_reload()
# If there is no dbkey owner, default to current user.
dbkey_owner, dbkey = decode_dbkey(dbkey)
if dbkey_owner:
dbkey_user = get_user_by_username(trans.sa_session, dbkey_owner)
else:
dbkey_user = trans.user
if not self.has_reference_data(dbkey, dbkey_user):
raise ReferenceDataError(f"No reference data for {dbkey}")
#
# Get twobit file with reference data.
#
twobit_file_name = None
if dbkey in self.genomes:
# Built-in twobit.
twobit_file_name = self.genomes[dbkey].twobit_file
else:
user_keys = loads(dbkey_user.preferences["dbkeys"])
dbkey_attributes = user_keys[dbkey]
fasta_dataset = trans.sa_session.get(HistoryDatasetAssociation, dbkey_attributes["fasta"])
msg = fasta_dataset.convert_dataset(trans, "twobit")
if msg:
return msg
else:
twobit_dataset = fasta_dataset.get_converted_dataset(trans, "twobit")
twobit_file_name = twobit_dataset.get_file_name()
return self._get_reference_data(twobit_file_name, chrom, low, high)
@staticmethod
def _get_reference_data(twobit_file_name, chrom, low, high):
# Read and return reference data.
with open(twobit_file_name, "rb") as f:
twobit = TwoBitFile(f)
if chrom in twobit:
seq_data = twobit[chrom].get(int(low), int(high))
return GenomeRegion(chrom=chrom, start=low, end=high, sequence=seq_data)