import logging
import os
import re
import sys
from json import loads
from typing import (
Dict,
Optional,
)
from bx.seq.twobit import TwoBitFile
from galaxy.exceptions import (
ObjectNotFound,
ReferenceDataError,
)
from galaxy.managers.users import get_user_by_username
from galaxy.model import (
HistoryDatasetAssociation,
User,
)
from galaxy.structured_app import StructuredApp
from galaxy.util.bunch import Bunch
log = logging.getLogger(__name__)
# FIXME: copied from tracks.py
# Message strings returned to browser
messages = Bunch(
PENDING="pending",
NO_DATA="no data",
NO_CHROMOSOME="no chromosome",
NO_CONVERTER="no converter",
NO_TOOL="no tool",
DATA="data",
ERROR="error",
OK="ok",
)
[docs]def decode_dbkey(dbkey):
"""Decodes dbkey and returns tuple ( username, dbkey )"""
if isinstance(dbkey, str) and ":" in dbkey:
return dbkey.split(":")
else:
return None, dbkey
[docs]class GenomeRegion:
"""
A genomic region on an individual chromosome.
"""
[docs] def __init__(self, chrom=None, start=0, end=0, sequence=None):
self.chrom = chrom
self.start = int(start)
self.end = int(end)
self.sequence = sequence
def __str__(self):
return f"{self.chrom}:{str(self.start)}-{str(self.end)}"
[docs] @staticmethod
def from_dict(obj_dict):
return GenomeRegion(chrom=obj_dict["chrom"], start=obj_dict["start"], end=obj_dict["end"])
[docs] @staticmethod
def from_str(obj_str):
# check for gene region
gene_region = obj_str.split(":")
# split gene region into components
if len(gene_region) == 2:
gene_interval = gene_region[1].split("-")
# check length
if len(gene_interval) == 2:
return GenomeRegion(chrom=gene_region[0], start=gene_interval[0], end=gene_interval[1])
# return genome region instance
return GenomeRegion()
[docs]class Genome:
"""
Encapsulates information about a known genome/dbkey.
"""
[docs] def __init__(self, key, description, len_file=None, twobit_file=None):
self.key = key
self.description = description
self.len_file = len_file
self.twobit_file = twobit_file
[docs] def to_dict(self, num=None, chrom=None, low=None):
"""
Returns representation of self as a dictionary.
"""
# if there's no len_file, there's nothing to return
if not self.len_file:
raise ReferenceDataError(f"len_file not set for {self.key}")
def check_int(s):
if s.isdigit():
return int(s)
else:
return s
def split_by_number(s):
return [check_int(c) for c in re.split("([0-9]+)", s)]
#
# Parameter check, setting.
#
if num:
num = int(num)
else:
num = sys.maxsize # just a big number
if low:
low = int(low)
if low < 0:
low = 0
else:
low = 0
#
# Get chroms data:
# (a) chrom name, len;
# (b) whether there are previous, next chroms;
# (c) index of start chrom.
#
with open(self.len_file) as f:
len_file_enumerate = enumerate(f)
chroms = {}
prev_chroms = False
start_index = 0
if chrom:
# Use starting chrom to start list.
found = False
count = 0
for line_num, line in len_file_enumerate:
if line.startswith("#"):
continue
name, len = line.split("\t")
if found:
chroms[name] = int(len)
count += 1
elif name == chrom:
# Found starting chrom.
chroms[name] = int(len)
count += 1
found = True
start_index = line_num
if line_num != 0:
prev_chroms = True
if count >= num:
break
else:
# Use low to start list.
high = low + int(num)
prev_chroms = low != 0
start_index = low
# Read chrom data from len file.
for line_num, line in len_file_enumerate:
if line_num < low:
continue
if line_num >= high:
break
if line.startswith("#"):
continue
# LEN files have format:
# <chrom_name><tab><chrom_length>
fields = line.split("\t")
chroms[fields[0]] = int(fields[1])
# Set flag to indicate whether there are more chroms after list.
next_chroms = False
try:
next(len_file_enumerate)
next_chroms = True
except StopIteration:
# No more chroms to read.
pass
to_sort = [{"chrom": chrm, "len": length} for chrm, length in chroms.items()]
to_sort.sort(key=lambda _: split_by_number(_["chrom"]))
return {
"id": self.key,
"reference": self.twobit_file is not None,
"chrom_info": to_sort,
"prev_chroms": prev_chroms,
"next_chroms": next_chroms,
"start_index": start_index,
}
[docs]class Genomes:
"""
Provides information about available genome data and methods for manipulating that data.
"""
[docs] def __init__(self, app: StructuredApp):
self.app = app
# Create list of genomes from app.genome_builds
self.genomes: Dict[str, Genome] = {}
# Store internal versions of data tables for twobit and __dbkey__
self._table_versions = {"twobit": None, "__dbkeys__": None}
self.reload_genomes()
[docs] def reload_genomes(self):
self.genomes = {}
# Store table versions for later
for table_name in self._table_versions.keys():
table = self.app.tool_data_tables.get(table_name, None)
if table is not None:
self._table_versions[table_name] = table._loaded_content_version
twobit_table = self.app.tool_data_tables.get("twobit", None)
twobit_fields = {}
if twobit_table is None:
# Add genome data (twobit files) to genomes, directly from twobit.loc
try:
twobit_path = os.path.join(self.app.config.tool_data_path, "twobit.loc")
with open(twobit_path) as f:
for line in f:
if line.startswith("#"):
continue
val = line.split()
if len(val) == 2:
key, path = val
twobit_fields[key] = path
except OSError:
# Thrown if twobit.loc does not exist.
log.exception("Error reading twobit.loc")
for key, description in self.app.genome_builds.get_genome_build_names():
self.genomes[key] = Genome(key, description)
# Add len files to genomes.
self.genomes[key].len_file = self.app.genome_builds.get_chrom_info(key)[0]
if self.genomes[key].len_file:
if not os.path.exists(self.genomes[key].len_file):
self.genomes[key].len_file = None
# Add genome data (twobit files) to genomes.
if twobit_table is not None:
self.genomes[key].twobit_file = twobit_table.get_entry("value", key, "path", default=None)
elif key in twobit_fields:
self.genomes[key].twobit_file = twobit_fields[key]
[docs] def check_and_reload(self):
# Check if tables have been modified, if so reload
for table_name, table_version in self._table_versions.items():
table = self.app.tool_data_tables.get(table_name, None)
if table is not None and not table.is_current_version(table_version):
return self.reload_genomes()
[docs] def get_build(self, dbkey):
"""Returns build for the given key."""
self.check_and_reload()
rval = None
if dbkey in self.genomes:
rval = self.genomes[dbkey]
return rval
[docs] def get_dbkeys(self, user: Optional[User], chrom_info=False):
"""Returns all known dbkeys. If chrom_info is True, only dbkeys with
chromosome lengths are returned."""
self.check_and_reload()
dbkeys = []
# Add user's custom keys to dbkeys.
if user and user.preferences and "dbkeys" in user.preferences:
user_keys_dict = loads(user.preferences["dbkeys"])
dbkeys.extend([(attributes["name"], key) for key, attributes in user_keys_dict.items()])
# Add app keys to dbkeys.
# If chrom_info is True, only include keys with len files (which contain chromosome info).
if chrom_info:
def filter_fn(b):
return b.len_file is not None
else:
def filter_fn(b):
return True
dbkeys.extend([(genome.description, genome.key) for key, genome in self.genomes.items() if filter_fn(genome)])
return dbkeys
[docs] def chroms(self, trans, dbkey=None, num=None, chrom=None, low=None):
"""
Returns a naturally sorted list of chroms/contigs for a given dbkey.
Use either chrom or low to specify the starting chrom in the return list.
"""
session = trans.sa_session
self.check_and_reload()
# If there is no dbkey owner, default to current user.
dbkey_owner, dbkey = decode_dbkey(dbkey)
if dbkey_owner:
dbkey_user = get_user_by_username(session, dbkey_owner)
else:
dbkey_user = trans.user
#
# Get/create genome object.
#
genome = None
twobit_file = None
# Look first in user's custom builds.
if dbkey_user and "dbkeys" in dbkey_user.preferences:
user_keys = loads(dbkey_user.preferences["dbkeys"])
if dbkey in user_keys:
dbkey_attributes = user_keys[dbkey]
dbkey_name = dbkey_attributes["name"]
# If there's a fasta for genome, convert to 2bit for later use.
if "fasta" in dbkey_attributes:
build_fasta = session.get(HistoryDatasetAssociation, dbkey_attributes["fasta"])
len_file = build_fasta.get_converted_dataset(trans, "len").get_file_name()
build_fasta.get_converted_dataset(trans, "twobit")
# HACK: set twobit_file to True rather than a file name because
# get_converted_dataset returns null during conversion even though
# there will eventually be a twobit file available for genome.
twobit_file = True
# Backwards compatibility: look for len file directly.
elif "len" in dbkey_attributes:
len_file = session.get(HistoryDatasetAssociation, user_keys[dbkey]["len"]).get_file_name()
if len_file:
genome = Genome(dbkey, dbkey_name, len_file=len_file, twobit_file=twobit_file)
# Look in history and system builds.
if not genome:
# Look in history for chromosome len file.
len_ds = trans.db_dataset_for(dbkey)
if len_ds:
genome = Genome(dbkey, dbkey_name, len_file=len_ds.get_file_name())
# Look in system builds.
elif dbkey in self.genomes:
genome = self.genomes[dbkey]
if not genome:
raise ObjectNotFound(f"genome not found for key {dbkey}")
return genome.to_dict(num=num, chrom=chrom, low=low)
[docs] def has_reference_data(self, dbkey, dbkey_owner=None):
"""
Returns true if there is reference data for the specified dbkey. If dbkey is custom,
dbkey_owner is needed to determine if there is reference data.
"""
self.check_and_reload()
# Look for key in built-in builds.
if dbkey in self.genomes and self.genomes[dbkey].twobit_file:
# There is built-in reference data.
return True
# Look for key in owner's custom builds.
if dbkey_owner and "dbkeys" in dbkey_owner.preferences:
user_keys = loads(dbkey_owner.preferences["dbkeys"])
if dbkey in user_keys:
dbkey_attributes = user_keys[dbkey]
if "fasta" in dbkey_attributes:
# Fasta + converted datasets can provide reference data.
return True
return False
[docs] def reference(self, trans, dbkey, chrom, low, high):
"""
Return reference data for a build.
"""
self.check_and_reload()
# If there is no dbkey owner, default to current user.
dbkey_owner, dbkey = decode_dbkey(dbkey)
if dbkey_owner:
dbkey_user = get_user_by_username(trans.sa_session, dbkey_owner)
else:
dbkey_user = trans.user
if not self.has_reference_data(dbkey, dbkey_user):
raise ReferenceDataError(f"No reference data for {dbkey}")
#
# Get twobit file with reference data.
#
twobit_file_name = None
if dbkey in self.genomes:
# Built-in twobit.
twobit_file_name = self.genomes[dbkey].twobit_file
else:
user_keys = loads(dbkey_user.preferences["dbkeys"])
dbkey_attributes = user_keys[dbkey]
fasta_dataset = trans.sa_session.get(HistoryDatasetAssociation, dbkey_attributes["fasta"])
msg = fasta_dataset.convert_dataset(trans, "twobit")
if msg:
return msg
else:
twobit_dataset = fasta_dataset.get_converted_dataset(trans, "twobit")
twobit_file_name = twobit_dataset.get_file_name()
return self._get_reference_data(twobit_file_name, chrom, low, high)
@staticmethod
def _get_reference_data(twobit_file_name, chrom, low, high):
# Read and return reference data.
with open(twobit_file_name, "rb") as f:
twobit = TwoBitFile(f)
if chrom in twobit:
seq_data = twobit[chrom].get(int(low), int(high))
return GenomeRegion(chrom=chrom, start=low, end=high, sequence=seq_data)