Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.dbkeys

"""
Functionality for dealing with dbkeys.
"""
import logging
import os.path
import re
from json import loads

from galaxy.util import (
    galaxy_directory,
    sanitize_lists_to_string,
    unicodify,
)

log = logging.getLogger(__name__)


[docs]def read_dbnames(filename): """Read build names from file""" db_names = [] try: ucsc_builds = {} man_builds = [] # assume these are integers name_to_db_base = {} if filename is None: # Should only be happening with the galaxy.tools.parameters.basic:GenomeBuildParameter docstring unit test filename = os.path.join(galaxy_directory(), "tool-data", "shared", "ucsc", "builds.txt.sample") for line in open(filename): try: if line[0:1] == "#": continue fields = line.replace("\r", "").replace("\n", "").split("\t") # Special case of unspecified build is at top of list if fields[0] == "?": db_names.insert(0, (fields[0], fields[1])) continue try: # manual build (i.e. microbes) int(fields[0]) man_builds.append((fields[1], fields[0])) except Exception: # UCSC build db_base = fields[0].rstrip("0123456789") if db_base not in ucsc_builds: ucsc_builds[db_base] = [] name_to_db_base[fields[1]] = db_base # we want to sort within a species numerically by revision number build_rev = re.compile(r"\d+$") try: build_rev = int(build_rev.findall(fields[0])[0]) except Exception: build_rev = 0 ucsc_builds[db_base].append((build_rev, fields[0], fields[1])) except Exception: continue sort_names = sorted(name_to_db_base.keys()) for name in sort_names: db_base = name_to_db_base[name] ucsc_builds[db_base].sort() ucsc_builds[db_base].reverse() ucsc_builds[db_base] = [(build, name) for _, build, name in ucsc_builds[db_base]] db_names = list(db_names + ucsc_builds[db_base]) man_builds.sort() man_builds = [(build, name) for name, build in man_builds] db_names = list(db_names + man_builds) except Exception as e: log.error("ERROR: Unable to read builds file: %s", unicodify(e)) return db_names
[docs]class GenomeBuilds: default_value = "?" default_name = "unspecified (?)"
[docs] def __init__(self, app, data_table_name="__dbkeys__", load_old_style=True): self._app = app self._data_table_name = data_table_name self._static_chrom_info_path = app.config.len_file_path # A dbkey can be listed multiple times, but with different names, so we can't use dictionaries for lookups if load_old_style: self._static_dbkeys = list(read_dbnames(app.config.builds_file_path)) else: self._static_dbkeys = []
[docs] def get_genome_build_names(self, trans=None): # FIXME: how to deal with key duplicates? rval = [(self.default_value, self.default_name)] # load user custom genome builds if trans is not None: if trans.history: # This is a little bit Odd. We are adding every .len file in the current history to dbkey list, # but this is previous behavior from trans.db_names, so we'll continue to do it. # It does allow one-off, history specific dbkeys to be created by a user. But we are not filtering, # so a len file will be listed twice (as the build name and again as dataset name), # if custom dbkey creation/conversion occurred within the current history. datasets = trans.sa_session.query(self._app.model.HistoryDatasetAssociation).filter_by( deleted=False, history_id=trans.history.id, extension="len" ) for dataset in datasets: rval.append((dataset.dbkey, f"{dataset.name} ({dataset.dbkey}) [History]")) user = trans.user if user and hasattr(user, "preferences") and "dbkeys" in user.preferences: user_keys = loads(user.preferences["dbkeys"]) for key, chrom_dict in user_keys.items(): rval.append((key, f"{chrom_dict['name']} ({key}) [Custom]")) # Load old builds.txt static keys rval.extend(self._static_dbkeys) # load dbkeys from dbkey data table dbkey_table = self._app.tool_data_tables.get(self._data_table_name, None) if dbkey_table is not None: for field_dict in dbkey_table.get_named_fields_list(): rval.append((field_dict["value"], field_dict["name"])) return rval
[docs] def get_chrom_info(self, dbkey, trans=None, custom_build_hack_get_len_from_fasta_conversion=True): # FIXME: flag to turn off custom_build_hack_get_len_from_fasta_conversion should not be required chrom_info = None db_dataset = None # Collect chromInfo from custom builds if trans: db_dataset = trans.db_dataset_for(dbkey) if db_dataset: chrom_info = db_dataset.file_name else: # Do Custom Build handling if ( trans.user and ("dbkeys" in trans.user.preferences) and (dbkey in loads(trans.user.preferences["dbkeys"])) ): custom_build_dict = loads(trans.user.preferences["dbkeys"])[dbkey] # HACK: the attempt to get chrom_info below will trigger the # fasta-to-len converter if the dataset is not available or, # which will in turn create a recursive loop when # running the fasta-to-len tool. So, use a hack in the second # condition below to avoid getting chrom_info when running the # fasta-to-len converter. if "fasta" in custom_build_dict and custom_build_hack_get_len_from_fasta_conversion: # Build is defined by fasta; get len file, which is obtained from converting fasta. build_fasta_dataset = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get( custom_build_dict["fasta"] ) chrom_info = build_fasta_dataset.get_converted_dataset(trans, "len").file_name elif "len" in custom_build_dict: # Build is defined by len file, so use it. chrom_info = ( trans.sa_session.query(trans.app.model.HistoryDatasetAssociation) .get(custom_build_dict["len"]) .file_name ) # Check Data table if not chrom_info: dbkey_table = self._app.tool_data_tables.get(self._data_table_name, None) if dbkey_table is not None: chrom_info = dbkey_table.get_entry("value", dbkey, "len_path", default=None) # use configured server len path if not chrom_info: # Default to built-in build. # Since we are using an unverified dbkey, we will sanitize the dbkey before use chrom_info = os.path.join(self._static_chrom_info_path, f"{sanitize_lists_to_string(dbkey)}.len") chrom_info = os.path.abspath(chrom_info) return (chrom_info, db_dataset)