Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.repository_registry

import logging

from sqlalchemy import (
    and_,
    false,
    or_,
    select,
)
from typing_extensions import Protocol

import tool_shed.repository_types.util as rt_util
from tool_shed.util import (
    hg_util,
    metadata_util,
)
from tool_shed.webapp import model
from tool_shed.webapp.model import (
    Category,
    Repository,
    RepositoryMetadata,
)
from .structured_app import ToolShedApp

log = logging.getLogger(__name__)


[docs]class RegistryInterface(Protocol): # only used in legacy controllers - can drop when drop old tool shed webapp
[docs] def add_category_entry(self, category): ...
[docs] def add_entry(self, repository): # used when creating a repository # Used in legacy add_repository_registry_entry API endpoint - already deemed not worth pulling over to 2.0 # used in legacy undelete_repository admin controller # used in legacy deprecate controller ...
[docs] def edit_category_entry(self, old_name, new_name): # used in legacy admin controller ...
[docs] def is_valid(self, repository) -> bool: # probably not used outside this class but also not hurting anything ...
[docs] def remove_category_entry(self, category): ...
[docs] def remove_entry(self, repository): ...
# stop gap to implement the same general interface as repository_registry but do nothing, # I don't think this stuff is needed - outside potentially old test cases?
[docs]class NullRepositoryRegistry(RegistryInterface):
[docs] def __init__(self, app: ToolShedApp): self.app = app # all of these are only used by repository_grids - which I think the tool shed 2.0 # does not use at all - but they are part of the "public interface" consumed by # the full registry. self.certified_level_one_viewable_repositories_and_suites_by_category: dict = {} self.certified_level_one_viewable_suites_by_category: dict = {} self.viewable_repositories_and_suites_by_category: dict = {} self.viewable_suites_by_category: dict = {} self.viewable_valid_repositories_and_suites_by_category: dict = {} self.viewable_valid_suites_by_category: dict = {}
[docs] def add_category_entry(self, category): # only used in legacy controllers - can drop when drop old tool shed webapp pass
[docs] def add_entry(self, repository): # used when creating a repository, and maybe more? pass
[docs] def edit_category_entry(self, old_name, new_name): pass
[docs] def is_valid(self, repository) -> bool: if repository and not repository.deleted and not repository.deprecated and repository.downloadable_revisions: return True return False
[docs] def remove_category_entry(self, category): pass
[docs] def remove_entry(self, repository): pass
[docs]class Registry(RegistryInterface):
[docs] def __init__(self, app): log.debug("Loading the repository registry...") self.app = app # The following lists contain tuples like ( repository.name, repository.user.username, changeset_revision ) # where the changeset_revision entry is always the latest installable changeset_revision.. self.certified_level_one_repository_and_suite_tuples = [] self.certified_level_one_suite_tuples = [] # These category dictionaries contain entries where the key is the category and the value is the integer count # of viewable repositories within that category. # only used internally to class and by repository_grids self.certified_level_one_viewable_repositories_and_suites_by_category = {} # only used internally to class and by repository_grids self.certified_level_one_viewable_suites_by_category = {} # not even used in legacy shed code... # self.certified_level_two_repository_and_suite_tuples = [] # self.certified_level_two_suite_tuples = [] # self.certified_level_two_viewable_repositories_and_suites_by_category = {} # self.certified_level_two_viewable_suites_by_category = {} # only used internally to class self.repository_and_suite_tuples = [] # only used internally to class self.suite_tuples = [] # only used internally to class and by repository_grids self.viewable_repositories_and_suites_by_category = {} # only used internally to class and by repository_grids self.viewable_suites_by_category = {} # only used internally to class and by repository_grids self.viewable_valid_repositories_and_suites_by_category = {} # only used internally to class and by repository_grids self.viewable_valid_suites_by_category = {} self.load()
[docs] def load(self): with self.sa_session.begin(): self.load_viewable_repositories_and_suites_by_category() self.load_repository_and_suite_tuples()
[docs] def add_category_entry(self, category): category_name = str(category.name) if category_name not in self.viewable_repositories_and_suites_by_category: self.viewable_repositories_and_suites_by_category[category_name] = 0 if category_name not in self.viewable_suites_by_category: self.viewable_suites_by_category[category_name] = 0 if category_name not in self.viewable_valid_repositories_and_suites_by_category: self.viewable_valid_repositories_and_suites_by_category[category_name] = 0 if category_name not in self.viewable_valid_suites_by_category: self.viewable_valid_suites_by_category[category_name] = 0 if category_name not in self.certified_level_one_viewable_repositories_and_suites_by_category: self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0 if category_name not in self.certified_level_one_viewable_suites_by_category: self.certified_level_one_viewable_suites_by_category[category_name] = 0
[docs] def add_entry(self, repository): try: if repository: is_valid = self.is_valid(repository) certified_level_one_tuple = self.get_certified_level_one_tuple(repository) latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple for rca in repository.categories: category = rca.category category_name = str(category.name) if category_name in self.viewable_repositories_and_suites_by_category: self.viewable_repositories_and_suites_by_category[category_name] += 1 else: self.viewable_repositories_and_suites_by_category[category_name] = 1 if is_valid: if category_name in self.viewable_valid_repositories_and_suites_by_category: self.viewable_valid_repositories_and_suites_by_category[category_name] += 1 else: self.viewable_valid_repositories_and_suites_by_category[category_name] = 1 if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if category_name in self.viewable_suites_by_category: self.viewable_suites_by_category[category_name] += 1 else: self.viewable_suites_by_category[category_name] = 1 if is_valid: if category_name in self.viewable_valid_suites_by_category: self.viewable_valid_suites_by_category[category_name] += 1 else: self.viewable_valid_suites_by_category[category_name] = 1 if is_level_one_certified: if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category: self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1 else: self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 1 if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if category_name in self.certified_level_one_viewable_suites_by_category: self.certified_level_one_viewable_suites_by_category[category_name] += 1 else: self.certified_level_one_viewable_suites_by_category[category_name] = 1 self.load_repository_and_suite_tuple(repository) if is_level_one_certified: self.load_certified_level_one_repository_and_suite_tuple(repository) except Exception: # The viewable repository numbers and the categorized (filtered) lists of repository tuples # may be slightly skewed, but that is no reason to result in a potential server error. All # will be corrected at next server start. log.exception("Handled error adding entry to repository registry")
[docs] def edit_category_entry(self, old_name, new_name): if old_name in self.viewable_repositories_and_suites_by_category: val = self.viewable_repositories_and_suites_by_category[old_name] del self.viewable_repositories_and_suites_by_category[old_name] self.viewable_repositories_and_suites_by_category[new_name] = val else: self.viewable_repositories_and_suites_by_category[new_name] = 0 if old_name in self.viewable_valid_repositories_and_suites_by_category: val = self.viewable_valid_repositories_and_suites_by_category[old_name] del self.viewable_valid_repositories_and_suites_by_category[old_name] self.viewable_valid_repositories_and_suites_by_category[new_name] = val else: self.viewable_valid_repositories_and_suites_by_category[new_name] = 0 if old_name in self.viewable_suites_by_category: val = self.viewable_suites_by_category[old_name] del self.viewable_suites_by_category[old_name] self.viewable_suites_by_category[new_name] = val else: self.viewable_suites_by_category[new_name] = 0 if old_name in self.viewable_valid_suites_by_category: val = self.viewable_valid_suites_by_category[old_name] del self.viewable_valid_suites_by_category[old_name] self.viewable_valid_suites_by_category[new_name] = val else: self.viewable_valid_suites_by_category[new_name] = 0 if old_name in self.certified_level_one_viewable_repositories_and_suites_by_category: val = self.certified_level_one_viewable_repositories_and_suites_by_category[old_name] del self.certified_level_one_viewable_repositories_and_suites_by_category[old_name] self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = val else: self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = 0 if old_name in self.certified_level_one_viewable_suites_by_category: val = self.certified_level_one_viewable_suites_by_category[old_name] del self.certified_level_one_viewable_suites_by_category[old_name] self.certified_level_one_viewable_suites_by_category[new_name] = val else: self.certified_level_one_viewable_suites_by_category[new_name] = 0
[docs] def get_certified_level_one_clause_list(self): # only used internally to class clause_list = [] for repository in get_repositories(self.sa_session): certified_level_one_tuple = self.get_certified_level_one_tuple(repository) latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple if is_level_one_certified: clause_list.append( and_( RepositoryMetadata.repository_id == repository.id, RepositoryMetadata.changeset_revision == latest_installable_changeset_revision, ) ) return clause_list
[docs] def get_certified_level_one_tuple(self, repository): """ Return True if the latest installable changeset_revision of the received repository is level one certified. """ # only used internally to class if repository is None: return (None, False) if repository.deleted or repository.deprecated: return (None, False) # Get the latest installable changeset revision since that is all that is currently configured for testing. latest_installable_changeset_revision = metadata_util.get_latest_downloadable_changeset_revision( self.app, repository ) if latest_installable_changeset_revision not in [None, hg_util.INITIAL_CHANGELOG_HASH]: encoded_repository_id = self.app.security.encode_id(repository.id) repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision( self.app, encoded_repository_id, latest_installable_changeset_revision ) if repository_metadata: # No repository_metadata. return (latest_installable_changeset_revision, True) else: # No installable changeset_revision. return (None, False)
[docs] def is_level_one_certified(self, repository_metadata): # only used internally to class if repository_metadata: repository = repository_metadata.repository if repository: if repository.deprecated or repository.deleted: return False tuple = ( str(repository.name), str(repository.user.username), str(repository_metadata.changeset_revision), ) if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]: return tuple in self.certified_level_one_suite_tuples else: return tuple in self.certified_level_one_repository_and_suite_tuples return False
[docs] def is_valid(self, repository) -> bool: if repository and not repository.deleted and not repository.deprecated and repository.downloadable_revisions: return True return False
[docs] def load_certified_level_one_repository_and_suite_tuple(self, repository): # only used internally to class # The received repository has been determined to be level one certified. name = str(repository.name) owner = str(repository.user.username) tip_changeset_hash = repository.tip() if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH: certified_level_one_tuple = (name, owner, tip_changeset_hash) if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if certified_level_one_tuple not in self.certified_level_one_suite_tuples: self.certified_level_one_suite_tuples.append(certified_level_one_tuple) else: if certified_level_one_tuple not in self.certified_level_one_repository_and_suite_tuples: self.certified_level_one_repository_and_suite_tuples.append(certified_level_one_tuple)
[docs] def load_repository_and_suite_tuple(self, repository): # only used internally to class name = str(repository.name) owner = str(repository.user.username) for repository_metadata in repository.metadata_revisions: changeset_revision = str(repository_metadata.changeset_revision) tuple = (name, owner, changeset_revision) if tuple not in self.repository_and_suite_tuples: self.repository_and_suite_tuples.append(tuple) if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if tuple not in self.suite_tuples: self.suite_tuples.append(tuple)
[docs] def load_repository_and_suite_tuples(self): # only used internally to class # Load self.certified_level_one_repository_and_suite_tuples and self.certified_level_one_suite_tuples. clauses = self.get_certified_level_one_clause_list() for repository in get_certified_repositories_with_user(self.sa_session, clauses, model.User): self.load_certified_level_one_repository_and_suite_tuple(repository) # Load self.repository_and_suite_tuples and self.suite_tuples for repository in get_repositories_with_user(self.sa_session, model.User): self.load_repository_and_suite_tuple(repository)
[docs] def load_viewable_repositories_and_suites_by_category(self): # only used internally to class # Clear all dictionaries just in case they were previously loaded. self.certified_level_one_viewable_repositories_and_suites_by_category = {} self.certified_level_one_viewable_suites_by_category = {} # self.certified_level_two_viewable_repositories_and_suites_by_category = {} # self.certified_level_two_viewable_suites_by_category = {} self.viewable_repositories_and_suites_by_category = {} self.viewable_suites_by_category = {} self.viewable_valid_repositories_and_suites_by_category = {} self.viewable_valid_suites_by_category = {} for category in self.sa_session.scalars(select(Category)): category_name = str(category.name) if category not in self.certified_level_one_viewable_repositories_and_suites_by_category: self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0 if category not in self.certified_level_one_viewable_suites_by_category: self.certified_level_one_viewable_suites_by_category[category_name] = 0 if category not in self.viewable_repositories_and_suites_by_category: self.viewable_repositories_and_suites_by_category[category_name] = 0 if category not in self.viewable_suites_by_category: self.viewable_suites_by_category[category_name] = 0 if category not in self.viewable_valid_repositories_and_suites_by_category: self.viewable_valid_repositories_and_suites_by_category[category_name] = 0 if category not in self.viewable_valid_suites_by_category: self.viewable_valid_suites_by_category[category_name] = 0 for rca in category.repositories: repository = rca.repository if not repository.deleted and not repository.deprecated: is_valid = self.is_valid(repository) encoded_repository_id = self.app.security.encode_id(repository.id) tip_changeset_hash = repository.tip() repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision( self.app, encoded_repository_id, tip_changeset_hash ) self.viewable_repositories_and_suites_by_category[category_name] += 1 if is_valid: self.viewable_valid_repositories_and_suites_by_category[category_name] += 1 if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]: self.viewable_suites_by_category[category_name] += 1 if is_valid: self.viewable_valid_suites_by_category[category_name] += 1 if self.is_level_one_certified(repository_metadata): self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1 if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]: self.certified_level_one_viewable_suites_by_category[category_name] += 1
[docs] def remove_category_entry(self, category): catgeory_name = str(category.name) if catgeory_name in self.viewable_repositories_and_suites_by_category: del self.viewable_repositories_and_suites_by_category[catgeory_name] if catgeory_name in self.viewable_valid_repositories_and_suites_by_category: del self.viewable_valid_repositories_and_suites_by_category[catgeory_name] if catgeory_name in self.viewable_suites_by_category: del self.viewable_suites_by_category[catgeory_name] if catgeory_name in self.viewable_valid_suites_by_category: del self.viewable_valid_suites_by_category[catgeory_name] if catgeory_name in self.certified_level_one_viewable_repositories_and_suites_by_category: del self.certified_level_one_viewable_repositories_and_suites_by_category[catgeory_name] if catgeory_name in self.certified_level_one_viewable_suites_by_category: del self.certified_level_one_viewable_suites_by_category[catgeory_name]
[docs] def remove_entry(self, repository): try: if repository: is_valid = self.is_valid(repository) certified_level_one_tuple = self.get_certified_level_one_tuple(repository) latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple for rca in repository.categories: category = rca.category category_name = str(category.name) if category_name in self.viewable_repositories_and_suites_by_category: if self.viewable_repositories_and_suites_by_category[category_name] > 0: self.viewable_repositories_and_suites_by_category[category_name] -= 1 else: self.viewable_repositories_and_suites_by_category[category_name] = 0 if is_valid: if category_name in self.viewable_valid_repositories_and_suites_by_category: if self.viewable_valid_repositories_and_suites_by_category[category_name] > 0: self.viewable_valid_repositories_and_suites_by_category[category_name] -= 1 else: self.viewable_valid_repositories_and_suites_by_category[category_name] = 0 if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if category_name in self.viewable_suites_by_category: if self.viewable_suites_by_category[category_name] > 0: self.viewable_suites_by_category[category_name] -= 1 else: self.viewable_suites_by_category[category_name] = 0 if is_valid: if category_name in self.viewable_valid_suites_by_category: if self.viewable_valid_suites_by_category[category_name] > 0: self.viewable_valid_suites_by_category[category_name] -= 1 else: self.viewable_valid_suites_by_category[category_name] = 0 if is_level_one_certified: if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category: if self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] > 0: self.certified_level_one_viewable_repositories_and_suites_by_category[ category_name ] -= 1 else: self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0 if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if category_name in self.certified_level_one_viewable_suites_by_category: if self.certified_level_one_viewable_suites_by_category[category_name] > 0: self.certified_level_one_viewable_suites_by_category[category_name] -= 1 else: self.certified_level_one_viewable_suites_by_category[category_name] = 0 self.unload_repository_and_suite_tuple(repository) if is_level_one_certified: self.unload_certified_level_one_repository_and_suite_tuple(repository) except Exception: # The viewable repository numbers and the categorized (filtered) lists of repository tuples # may be slightly skewed, but that is no reason to result in a potential server error. All # will be corrected at next server start. log.exception("Handled error removing entry from repository registry")
@property def sa_session(self): # only used internally to class return self.app.model.session
[docs] def unload_certified_level_one_repository_and_suite_tuple(self, repository): # The received repository has been determined to be level one certified. name = str(repository.name) owner = str(repository.user.username) tip_changeset_hash = repository.tip() if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH: certified_level_one_tuple = (name, owner, tip_changeset_hash) if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if certified_level_one_tuple in self.certified_level_one_suite_tuples: self.certified_level_one_suite_tuples.remove(certified_level_one_tuple) else: if certified_level_one_tuple in self.certified_level_one_repository_and_suite_tuples: self.certified_level_one_repository_and_suite_tuples.remove(certified_level_one_tuple)
[docs] def unload_repository_and_suite_tuple(self, repository): name = str(repository.name) owner = str(repository.user.username) for repository_metadata in repository.metadata_revisions: changeset_revision = str(repository_metadata.changeset_revision) tuple = (name, owner, changeset_revision) if tuple in self.repository_and_suite_tuples: self.repository_and_suite_tuples.remove(tuple) if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION: if tuple in self.suite_tuples: self.suite_tuples.remove(tuple)
[docs]def get_repositories(session): stmt = select(Repository).where(Repository.deleted == false()).where(Repository.deprecated == false()) return session.scalars(stmt)
[docs]def get_repositories_with_user(session, user_model): stmt = ( select(Repository).where(Repository.deleted == false()).where(Repository.deprecated == false()).join(user_model) ) return session.scalars(stmt)
[docs]def get_certified_repositories_with_user(session, where_clauses, user_model): stmt = select(Repository).join(RepositoryMetadata).where(or_(*where_clauses)).join(user_model) return session.scalars(stmt)