Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.repository_registry
import logging
from sqlalchemy import (
and_,
false,
or_,
select,
)
from typing_extensions import Protocol
import tool_shed.repository_types.util as rt_util
from tool_shed.util import (
hg_util,
metadata_util,
)
from tool_shed.webapp import model
from tool_shed.webapp.model import (
Category,
Repository,
RepositoryMetadata,
)
from .structured_app import ToolShedApp
log = logging.getLogger(__name__)
[docs]class RegistryInterface(Protocol):
# only used in legacy controllers - can drop when drop old tool shed webapp
[docs] def add_entry(self, repository):
# used when creating a repository
# Used in legacy add_repository_registry_entry API endpoint - already deemed not worth pulling over to 2.0
# used in legacy undelete_repository admin controller
# used in legacy deprecate controller
...
[docs] def is_valid(self, repository) -> bool:
# probably not used outside this class but also not hurting anything
...
# stop gap to implement the same general interface as repository_registry but do nothing,
# I don't think this stuff is needed - outside potentially old test cases?
[docs]class NullRepositoryRegistry(RegistryInterface):
[docs] def __init__(self, app: ToolShedApp):
self.app = app
# all of these are only used by repository_grids - which I think the tool shed 2.0
# does not use at all - but they are part of the "public interface" consumed by
# the full registry.
self.certified_level_one_viewable_repositories_and_suites_by_category: dict = {}
self.certified_level_one_viewable_suites_by_category: dict = {}
self.viewable_repositories_and_suites_by_category: dict = {}
self.viewable_suites_by_category: dict = {}
self.viewable_valid_repositories_and_suites_by_category: dict = {}
self.viewable_valid_suites_by_category: dict = {}
[docs] def add_category_entry(self, category):
# only used in legacy controllers - can drop when drop old tool shed webapp
pass
[docs] def is_valid(self, repository) -> bool:
if repository and not repository.deleted and not repository.deprecated and repository.downloadable_revisions:
return True
return False
[docs]class Registry(RegistryInterface):
[docs] def __init__(self, app):
log.debug("Loading the repository registry...")
self.app = app
# The following lists contain tuples like ( repository.name, repository.user.username, changeset_revision )
# where the changeset_revision entry is always the latest installable changeset_revision..
self.certified_level_one_repository_and_suite_tuples = []
self.certified_level_one_suite_tuples = []
# These category dictionaries contain entries where the key is the category and the value is the integer count
# of viewable repositories within that category.
# only used internally to class and by repository_grids
self.certified_level_one_viewable_repositories_and_suites_by_category = {}
# only used internally to class and by repository_grids
self.certified_level_one_viewable_suites_by_category = {}
# not even used in legacy shed code...
# self.certified_level_two_repository_and_suite_tuples = []
# self.certified_level_two_suite_tuples = []
# self.certified_level_two_viewable_repositories_and_suites_by_category = {}
# self.certified_level_two_viewable_suites_by_category = {}
# only used internally to class
self.repository_and_suite_tuples = []
# only used internally to class
self.suite_tuples = []
# only used internally to class and by repository_grids
self.viewable_repositories_and_suites_by_category = {}
# only used internally to class and by repository_grids
self.viewable_suites_by_category = {}
# only used internally to class and by repository_grids
self.viewable_valid_repositories_and_suites_by_category = {}
# only used internally to class and by repository_grids
self.viewable_valid_suites_by_category = {}
self.load()
[docs] def load(self):
with self.sa_session.begin():
self.load_viewable_repositories_and_suites_by_category()
self.load_repository_and_suite_tuples()
[docs] def add_category_entry(self, category):
category_name = str(category.name)
if category_name not in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] = 0
if category_name not in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] = 0
if category_name not in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
[docs] def add_entry(self, repository):
try:
if repository:
is_valid = self.is_valid(repository)
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
for rca in repository.categories:
category = rca.category
category_name = str(category.name)
if category_name in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] += 1
else:
self.viewable_repositories_and_suites_by_category[category_name] = 1
if is_valid:
if category_name in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] += 1
else:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 1
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] += 1
else:
self.viewable_suites_by_category[category_name] = 1
if is_valid:
if category_name in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] += 1
else:
self.viewable_valid_suites_by_category[category_name] = 1
if is_level_one_certified:
if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 1
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] += 1
else:
self.certified_level_one_viewable_suites_by_category[category_name] = 1
self.load_repository_and_suite_tuple(repository)
if is_level_one_certified:
self.load_certified_level_one_repository_and_suite_tuple(repository)
except Exception:
# The viewable repository numbers and the categorized (filtered) lists of repository tuples
# may be slightly skewed, but that is no reason to result in a potential server error. All
# will be corrected at next server start.
log.exception("Handled error adding entry to repository registry")
[docs] def edit_category_entry(self, old_name, new_name):
if old_name in self.viewable_repositories_and_suites_by_category:
val = self.viewable_repositories_and_suites_by_category[old_name]
del self.viewable_repositories_and_suites_by_category[old_name]
self.viewable_repositories_and_suites_by_category[new_name] = val
else:
self.viewable_repositories_and_suites_by_category[new_name] = 0
if old_name in self.viewable_valid_repositories_and_suites_by_category:
val = self.viewable_valid_repositories_and_suites_by_category[old_name]
del self.viewable_valid_repositories_and_suites_by_category[old_name]
self.viewable_valid_repositories_and_suites_by_category[new_name] = val
else:
self.viewable_valid_repositories_and_suites_by_category[new_name] = 0
if old_name in self.viewable_suites_by_category:
val = self.viewable_suites_by_category[old_name]
del self.viewable_suites_by_category[old_name]
self.viewable_suites_by_category[new_name] = val
else:
self.viewable_suites_by_category[new_name] = 0
if old_name in self.viewable_valid_suites_by_category:
val = self.viewable_valid_suites_by_category[old_name]
del self.viewable_valid_suites_by_category[old_name]
self.viewable_valid_suites_by_category[new_name] = val
else:
self.viewable_valid_suites_by_category[new_name] = 0
if old_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
val = self.certified_level_one_viewable_repositories_and_suites_by_category[old_name]
del self.certified_level_one_viewable_repositories_and_suites_by_category[old_name]
self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = val
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = 0
if old_name in self.certified_level_one_viewable_suites_by_category:
val = self.certified_level_one_viewable_suites_by_category[old_name]
del self.certified_level_one_viewable_suites_by_category[old_name]
self.certified_level_one_viewable_suites_by_category[new_name] = val
else:
self.certified_level_one_viewable_suites_by_category[new_name] = 0
[docs] def get_certified_level_one_clause_list(self):
# only used internally to class
clause_list = []
for repository in get_repositories(self.sa_session):
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
if is_level_one_certified:
clause_list.append(
and_(
RepositoryMetadata.repository_id == repository.id,
RepositoryMetadata.changeset_revision == latest_installable_changeset_revision,
)
)
return clause_list
[docs] def get_certified_level_one_tuple(self, repository):
"""
Return True if the latest installable changeset_revision of the received repository is level one certified.
"""
# only used internally to class
if repository is None:
return (None, False)
if repository.deleted or repository.deprecated:
return (None, False)
# Get the latest installable changeset revision since that is all that is currently configured for testing.
latest_installable_changeset_revision = metadata_util.get_latest_downloadable_changeset_revision(
self.app, repository
)
if latest_installable_changeset_revision not in [None, hg_util.INITIAL_CHANGELOG_HASH]:
encoded_repository_id = self.app.security.encode_id(repository.id)
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
self.app, encoded_repository_id, latest_installable_changeset_revision
)
if repository_metadata:
# No repository_metadata.
return (latest_installable_changeset_revision, True)
else:
# No installable changeset_revision.
return (None, False)
[docs] def is_level_one_certified(self, repository_metadata):
# only used internally to class
if repository_metadata:
repository = repository_metadata.repository
if repository:
if repository.deprecated or repository.deleted:
return False
tuple = (
str(repository.name),
str(repository.user.username),
str(repository_metadata.changeset_revision),
)
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
return tuple in self.certified_level_one_suite_tuples
else:
return tuple in self.certified_level_one_repository_and_suite_tuples
return False
[docs] def is_valid(self, repository) -> bool:
if repository and not repository.deleted and not repository.deprecated and repository.downloadable_revisions:
return True
return False
[docs] def load_certified_level_one_repository_and_suite_tuple(self, repository):
# only used internally to class
# The received repository has been determined to be level one certified.
name = str(repository.name)
owner = str(repository.user.username)
tip_changeset_hash = repository.tip()
if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH:
certified_level_one_tuple = (name, owner, tip_changeset_hash)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if certified_level_one_tuple not in self.certified_level_one_suite_tuples:
self.certified_level_one_suite_tuples.append(certified_level_one_tuple)
else:
if certified_level_one_tuple not in self.certified_level_one_repository_and_suite_tuples:
self.certified_level_one_repository_and_suite_tuples.append(certified_level_one_tuple)
[docs] def load_repository_and_suite_tuple(self, repository):
# only used internally to class
name = str(repository.name)
owner = str(repository.user.username)
for repository_metadata in repository.metadata_revisions:
changeset_revision = str(repository_metadata.changeset_revision)
tuple = (name, owner, changeset_revision)
if tuple not in self.repository_and_suite_tuples:
self.repository_and_suite_tuples.append(tuple)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if tuple not in self.suite_tuples:
self.suite_tuples.append(tuple)
[docs] def load_repository_and_suite_tuples(self):
# only used internally to class
# Load self.certified_level_one_repository_and_suite_tuples and self.certified_level_one_suite_tuples.
clauses = self.get_certified_level_one_clause_list()
for repository in get_certified_repositories_with_user(self.sa_session, clauses, model.User):
self.load_certified_level_one_repository_and_suite_tuple(repository)
# Load self.repository_and_suite_tuples and self.suite_tuples
for repository in get_repositories_with_user(self.sa_session, model.User):
self.load_repository_and_suite_tuple(repository)
[docs] def load_viewable_repositories_and_suites_by_category(self):
# only used internally to class
# Clear all dictionaries just in case they were previously loaded.
self.certified_level_one_viewable_repositories_and_suites_by_category = {}
self.certified_level_one_viewable_suites_by_category = {}
# self.certified_level_two_viewable_repositories_and_suites_by_category = {}
# self.certified_level_two_viewable_suites_by_category = {}
self.viewable_repositories_and_suites_by_category = {}
self.viewable_suites_by_category = {}
self.viewable_valid_repositories_and_suites_by_category = {}
self.viewable_valid_suites_by_category = {}
for category in self.sa_session.scalars(select(Category)):
category_name = str(category.name)
if category not in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if category not in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
if category not in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if category not in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] = 0
if category not in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if category not in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] = 0
for rca in category.repositories:
repository = rca.repository
if not repository.deleted and not repository.deprecated:
is_valid = self.is_valid(repository)
encoded_repository_id = self.app.security.encode_id(repository.id)
tip_changeset_hash = repository.tip()
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(
self.app, encoded_repository_id, tip_changeset_hash
)
self.viewable_repositories_and_suites_by_category[category_name] += 1
if is_valid:
self.viewable_valid_repositories_and_suites_by_category[category_name] += 1
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
self.viewable_suites_by_category[category_name] += 1
if is_valid:
self.viewable_valid_suites_by_category[category_name] += 1
if self.is_level_one_certified(repository_metadata):
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
self.certified_level_one_viewable_suites_by_category[category_name] += 1
[docs] def remove_category_entry(self, category):
catgeory_name = str(category.name)
if catgeory_name in self.viewable_repositories_and_suites_by_category:
del self.viewable_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_valid_repositories_and_suites_by_category:
del self.viewable_valid_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_suites_by_category:
del self.viewable_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_valid_suites_by_category:
del self.viewable_valid_suites_by_category[catgeory_name]
if catgeory_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
del self.certified_level_one_viewable_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.certified_level_one_viewable_suites_by_category:
del self.certified_level_one_viewable_suites_by_category[catgeory_name]
[docs] def remove_entry(self, repository):
try:
if repository:
is_valid = self.is_valid(repository)
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
for rca in repository.categories:
category = rca.category
category_name = str(category.name)
if category_name in self.viewable_repositories_and_suites_by_category:
if self.viewable_repositories_and_suites_by_category[category_name] > 0:
self.viewable_repositories_and_suites_by_category[category_name] -= 1
else:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if is_valid:
if category_name in self.viewable_valid_repositories_and_suites_by_category:
if self.viewable_valid_repositories_and_suites_by_category[category_name] > 0:
self.viewable_valid_repositories_and_suites_by_category[category_name] -= 1
else:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.viewable_suites_by_category:
if self.viewable_suites_by_category[category_name] > 0:
self.viewable_suites_by_category[category_name] -= 1
else:
self.viewable_suites_by_category[category_name] = 0
if is_valid:
if category_name in self.viewable_valid_suites_by_category:
if self.viewable_valid_suites_by_category[category_name] > 0:
self.viewable_valid_suites_by_category[category_name] -= 1
else:
self.viewable_valid_suites_by_category[category_name] = 0
if is_level_one_certified:
if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
if self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] > 0:
self.certified_level_one_viewable_repositories_and_suites_by_category[
category_name
] -= 1
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.certified_level_one_viewable_suites_by_category:
if self.certified_level_one_viewable_suites_by_category[category_name] > 0:
self.certified_level_one_viewable_suites_by_category[category_name] -= 1
else:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
self.unload_repository_and_suite_tuple(repository)
if is_level_one_certified:
self.unload_certified_level_one_repository_and_suite_tuple(repository)
except Exception:
# The viewable repository numbers and the categorized (filtered) lists of repository tuples
# may be slightly skewed, but that is no reason to result in a potential server error. All
# will be corrected at next server start.
log.exception("Handled error removing entry from repository registry")
@property
def sa_session(self):
# only used internally to class
return self.app.model.session
[docs] def unload_certified_level_one_repository_and_suite_tuple(self, repository):
# The received repository has been determined to be level one certified.
name = str(repository.name)
owner = str(repository.user.username)
tip_changeset_hash = repository.tip()
if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH:
certified_level_one_tuple = (name, owner, tip_changeset_hash)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if certified_level_one_tuple in self.certified_level_one_suite_tuples:
self.certified_level_one_suite_tuples.remove(certified_level_one_tuple)
else:
if certified_level_one_tuple in self.certified_level_one_repository_and_suite_tuples:
self.certified_level_one_repository_and_suite_tuples.remove(certified_level_one_tuple)
[docs] def unload_repository_and_suite_tuple(self, repository):
name = str(repository.name)
owner = str(repository.user.username)
for repository_metadata in repository.metadata_revisions:
changeset_revision = str(repository_metadata.changeset_revision)
tuple = (name, owner, changeset_revision)
if tuple in self.repository_and_suite_tuples:
self.repository_and_suite_tuples.remove(tuple)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if tuple in self.suite_tuples:
self.suite_tuples.remove(tuple)
[docs]def get_repositories(session):
stmt = select(Repository).where(Repository.deleted == false()).where(Repository.deprecated == false())
return session.scalars(stmt)
[docs]def get_repositories_with_user(session, user_model):
stmt = (
select(Repository).where(Repository.deleted == false()).where(Repository.deprecated == false()).join(user_model)
)
return session.scalars(stmt)
[docs]def get_certified_repositories_with_user(session, where_clauses, user_model):
stmt = select(Repository).join(RepositoryMetadata).where(or_(*where_clauses)).join(user_model)
return session.scalars(stmt)