Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.repository_registry
import logging
from sqlalchemy import and_, false, or_
import tool_shed.repository_types.util as rt_util
from tool_shed.util import hg_util
from tool_shed.util import metadata_util
from tool_shed.webapp import model
log = logging.getLogger(__name__)
[docs]class Registry:
[docs] def __init__(self, app):
log.debug("Loading the repository registry...")
self.app = app
self.certified_level_one_clause_list = self.get_certified_level_one_clause_list()
# The following lists contain tuples like ( repository.name, repository.user.username, changeset_revision )
# where the changeset_revision entry is always the latest installable changeset_revision..
self.certified_level_one_repository_and_suite_tuples = []
self.certified_level_one_suite_tuples = []
# These category dictionaries contain entries where the key is the category and the value is the integer count
# of viewable repositories within that category.
self.certified_level_one_viewable_repositories_and_suites_by_category = {}
self.certified_level_one_viewable_suites_by_category = {}
self.certified_level_two_repository_and_suite_tuples = []
self.certified_level_two_suite_tuples = []
self.certified_level_two_viewable_repositories_and_suites_by_category = {}
self.certified_level_two_viewable_suites_by_category = {}
self.repository_and_suite_tuples = []
self.suite_tuples = []
self.viewable_repositories_and_suites_by_category = {}
self.viewable_suites_by_category = {}
self.viewable_valid_repositories_and_suites_by_category = {}
self.viewable_valid_suites_by_category = {}
self.load_viewable_repositories_and_suites_by_category()
self.load_repository_and_suite_tuples()
[docs] def add_category_entry(self, category):
category_name = str(category.name)
if category_name not in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] = 0
if category_name not in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] = 0
if category_name not in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
[docs] def add_entry(self, repository):
try:
if repository:
is_valid = self.is_valid(repository)
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
for rca in repository.categories:
category = rca.category
category_name = str(category.name)
if category_name in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] += 1
else:
self.viewable_repositories_and_suites_by_category[category_name] = 1
if is_valid:
if category_name in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] += 1
else:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 1
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] += 1
else:
self.viewable_suites_by_category[category_name] = 1
if is_valid:
if category_name in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] += 1
else:
self.viewable_valid_suites_by_category[category_name] = 1
if is_level_one_certified:
if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 1
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] += 1
else:
self.certified_level_one_viewable_suites_by_category[category_name] = 1
self.load_repository_and_suite_tuple(repository)
if is_level_one_certified:
self.load_certified_level_one_repository_and_suite_tuple(repository)
except Exception:
# The viewable repository numbers and the categorized (filtered) lists of repository tuples
# may be slightly skewed, but that is no reason to result in a potential server error. All
# will be corrected at next server start.
log.exception("Handled error adding entry to repository registry")
[docs] def edit_category_entry(self, old_name, new_name):
if old_name in self.viewable_repositories_and_suites_by_category:
val = self.viewable_repositories_and_suites_by_category[old_name]
del self.viewable_repositories_and_suites_by_category[old_name]
self.viewable_repositories_and_suites_by_category[new_name] = val
else:
self.viewable_repositories_and_suites_by_category[new_name] = 0
if old_name in self.viewable_valid_repositories_and_suites_by_category:
val = self.viewable_valid_repositories_and_suites_by_category[old_name]
del self.viewable_valid_repositories_and_suites_by_category[old_name]
self.viewable_valid_repositories_and_suites_by_category[new_name] = val
else:
self.viewable_valid_repositories_and_suites_by_category[new_name] = 0
if old_name in self.viewable_suites_by_category:
val = self.viewable_suites_by_category[old_name]
del self.viewable_suites_by_category[old_name]
self.viewable_suites_by_category[new_name] = val
else:
self.viewable_suites_by_category[new_name] = 0
if old_name in self.viewable_valid_suites_by_category:
val = self.viewable_valid_suites_by_category[old_name]
del self.viewable_valid_suites_by_category[old_name]
self.viewable_valid_suites_by_category[new_name] = val
else:
self.viewable_valid_suites_by_category[new_name] = 0
if old_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
val = self.certified_level_one_viewable_repositories_and_suites_by_category[old_name]
del self.certified_level_one_viewable_repositories_and_suites_by_category[old_name]
self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = val
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = 0
if old_name in self.certified_level_one_viewable_suites_by_category:
val = self.certified_level_one_viewable_suites_by_category[old_name]
del self.certified_level_one_viewable_suites_by_category[old_name]
self.certified_level_one_viewable_suites_by_category[new_name] = val
else:
self.certified_level_one_viewable_suites_by_category[new_name] = 0
[docs] def get_certified_level_one_clause_list(self):
certified_level_one_tuples = []
clause_list = []
for repository in self.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())):
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
if is_level_one_certified:
certified_level_one_tuples.append(certified_level_one_tuple)
clause_list.append(and_(
model.RepositoryMetadata.table.c.repository_id == repository.id,
model.RepositoryMetadata.table.c.changeset_revision == latest_installable_changeset_revision))
return clause_list
[docs] def get_certified_level_one_tuple(self, repository):
"""
Return True if the latest installable changeset_revision of the received repository is level one certified.
"""
if repository is None:
return (None, False)
if repository.deleted or repository.deprecated:
return (None, False)
# Get the latest installable changeset revision since that is all that is currently configured for testing.
latest_installable_changeset_revision = metadata_util.get_latest_downloadable_changeset_revision(self.app, repository)
if latest_installable_changeset_revision not in [None, hg_util.INITIAL_CHANGELOG_HASH]:
encoded_repository_id = self.app.security.encode_id(repository.id)
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(self.app,
encoded_repository_id,
latest_installable_changeset_revision)
if repository_metadata:
# No repository_metadata.
return (latest_installable_changeset_revision, True)
else:
# No installable changeset_revision.
return (None, False)
[docs] def is_level_one_certified(self, repository_metadata):
if repository_metadata:
repository = repository_metadata.repository
if repository:
if repository.deprecated or repository.deleted:
return False
tuple = (str(repository.name), str(repository.user.username), str(repository_metadata.changeset_revision))
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
return tuple in self.certified_level_one_suite_tuples
else:
return tuple in self.certified_level_one_repository_and_suite_tuples
return False
[docs] def is_valid(self, repository):
if repository and not repository.deleted and not repository.deprecated and repository.downloadable_revisions:
return True
return False
[docs] def load_certified_level_one_repository_and_suite_tuple(self, repository):
# The received repository has been determined to be level one certified.
name = str(repository.name)
owner = str(repository.user.username)
tip_changeset_hash = repository.tip()
if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH:
certified_level_one_tuple = (name, owner, tip_changeset_hash)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if certified_level_one_tuple not in self.certified_level_one_suite_tuples:
self.certified_level_one_suite_tuples.append(certified_level_one_tuple)
else:
if certified_level_one_tuple not in self.certified_level_one_repository_and_suite_tuples:
self.certified_level_one_repository_and_suite_tuples.append(certified_level_one_tuple)
[docs] def load_repository_and_suite_tuple(self, repository):
name = str(repository.name)
owner = str(repository.user.username)
for repository_metadata in repository.metadata_revisions:
changeset_revision = str(repository_metadata.changeset_revision)
tuple = (name, owner, changeset_revision)
if tuple not in self.repository_and_suite_tuples:
self.repository_and_suite_tuples.append(tuple)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if tuple not in self.suite_tuples:
self.suite_tuples.append(tuple)
[docs] def load_repository_and_suite_tuples(self):
# Load self.certified_level_one_repository_and_suite_tuples and self.certified_level_one_suite_tuples.
for repository in self.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*self.certified_level_one_clause_list)) \
.join(model.User.table):
self.load_certified_level_one_repository_and_suite_tuple(repository)
# Load self.repository_and_suite_tuples and self.suite_tuples
for repository in self.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table):
self.load_repository_and_suite_tuple(repository)
[docs] def load_viewable_repositories_and_suites_by_category(self):
# Clear all dictionaries just in case they were previously loaded.
self.certified_level_one_viewable_repositories_and_suites_by_category = {}
self.certified_level_one_viewable_suites_by_category = {}
self.certified_level_two_viewable_repositories_and_suites_by_category = {}
self.certified_level_two_viewable_suites_by_category = {}
self.viewable_repositories_and_suites_by_category = {}
self.viewable_suites_by_category = {}
self.viewable_valid_repositories_and_suites_by_category = {}
self.viewable_valid_suites_by_category = {}
for category in self.sa_session.query(model.Category):
category_name = str(category.name)
if category not in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if category not in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
if category not in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if category not in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] = 0
if category not in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if category not in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] = 0
for rca in category.repositories:
repository = rca.repository
if not repository.deleted and not repository.deprecated:
is_valid = self.is_valid(repository)
encoded_repository_id = self.app.security.encode_id(repository.id)
tip_changeset_hash = repository.tip()
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(self.app,
encoded_repository_id,
tip_changeset_hash)
self.viewable_repositories_and_suites_by_category[category_name] += 1
if is_valid:
self.viewable_valid_repositories_and_suites_by_category[category_name] += 1
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
self.viewable_suites_by_category[category_name] += 1
if is_valid:
self.viewable_valid_suites_by_category[category_name] += 1
if self.is_level_one_certified(repository_metadata):
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
self.certified_level_one_viewable_suites_by_category[category_name] += 1
[docs] def remove_category_entry(self, category):
catgeory_name = str(category.name)
if catgeory_name in self.viewable_repositories_and_suites_by_category:
del self.viewable_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_valid_repositories_and_suites_by_category:
del self.viewable_valid_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_suites_by_category:
del self.viewable_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_valid_suites_by_category:
del self.viewable_valid_suites_by_category[catgeory_name]
if catgeory_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
del self.certified_level_one_viewable_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.certified_level_one_viewable_suites_by_category:
del self.certified_level_one_viewable_suites_by_category[catgeory_name]
[docs] def remove_entry(self, repository):
try:
if repository:
is_valid = self.is_valid(repository)
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
for rca in repository.categories:
category = rca.category
category_name = str(category.name)
if category_name in self.viewable_repositories_and_suites_by_category:
if self.viewable_repositories_and_suites_by_category[category_name] > 0:
self.viewable_repositories_and_suites_by_category[category_name] -= 1
else:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if is_valid:
if category_name in self.viewable_valid_repositories_and_suites_by_category:
if self.viewable_valid_repositories_and_suites_by_category[category_name] > 0:
self.viewable_valid_repositories_and_suites_by_category[category_name] -= 1
else:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.viewable_suites_by_category:
if self.viewable_suites_by_category[category_name] > 0:
self.viewable_suites_by_category[category_name] -= 1
else:
self.viewable_suites_by_category[category_name] = 0
if is_valid:
if category_name in self.viewable_valid_suites_by_category:
if self.viewable_valid_suites_by_category[category_name] > 0:
self.viewable_valid_suites_by_category[category_name] -= 1
else:
self.viewable_valid_suites_by_category[category_name] = 0
if is_level_one_certified:
if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
if self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] > 0:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] -= 1
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.certified_level_one_viewable_suites_by_category:
if self.certified_level_one_viewable_suites_by_category[category_name] > 0:
self.certified_level_one_viewable_suites_by_category[category_name] -= 1
else:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
self.unload_repository_and_suite_tuple(repository)
if is_level_one_certified:
self.unload_certified_level_one_repository_and_suite_tuple(repository)
except Exception:
# The viewable repository numbers and the categorized (filtered) lists of repository tuples
# may be slightly skewed, but that is no reason to result in a potential server error. All
# will be corrected at next server start.
log.exception("Handled error removing entry from repository registry")
@property
def sa_session(self):
return self.app.model.context.current
[docs] def unload_certified_level_one_repository_and_suite_tuple(self, repository):
# The received repository has been determined to be level one certified.
name = str(repository.name)
owner = str(repository.user.username)
tip_changeset_hash = repository.tip()
if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH:
certified_level_one_tuple = (name, owner, tip_changeset_hash)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if certified_level_one_tuple in self.certified_level_one_suite_tuples:
self.certified_level_one_suite_tuples.remove(certified_level_one_tuple)
else:
if certified_level_one_tuple in self.certified_level_one_repository_and_suite_tuples:
self.certified_level_one_repository_and_suite_tuples.remove(certified_level_one_tuple)
[docs] def unload_repository_and_suite_tuple(self, repository):
name = str(repository.name)
owner = str(repository.user.username)
for repository_metadata in repository.metadata_revisions:
changeset_revision = str(repository_metadata.changeset_revision)
tuple = (name, owner, changeset_revision)
if tuple in self.repository_and_suite_tuples:
self.repository_and_suite_tuples.remove(tuple)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if tuple in self.suite_tuples:
self.suite_tuples.remove(tuple)