Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.repository_registry
import logging
from sqlalchemy import and_, false, or_
import tool_shed.repository_types.util as rt_util
from tool_shed.util import hg_util
from tool_shed.util import metadata_util
from tool_shed.webapp import model
log = logging.getLogger(__name__)
[docs]class Registry:
[docs] def __init__(self, app):
log.debug("Loading the repository registry...")
self.app = app
self.certified_level_one_clause_list = self.get_certified_level_one_clause_list()
# The following lists contain tuples like ( repository.name, repository.user.username, changeset_revision )
# where the changeset_revision entry is always the latest installable changeset_revision..
self.certified_level_one_repository_and_suite_tuples = []
self.certified_level_one_suite_tuples = []
# These category dictionaries contain entries where the key is the category and the value is the integer count
# of viewable repositories within that category.
self.certified_level_one_viewable_repositories_and_suites_by_category = {}
self.certified_level_one_viewable_suites_by_category = {}
self.certified_level_two_repository_and_suite_tuples = []
self.certified_level_two_suite_tuples = []
self.certified_level_two_viewable_repositories_and_suites_by_category = {}
self.certified_level_two_viewable_suites_by_category = {}
self.repository_and_suite_tuples = []
self.suite_tuples = []
self.viewable_repositories_and_suites_by_category = {}
self.viewable_suites_by_category = {}
self.viewable_valid_repositories_and_suites_by_category = {}
self.viewable_valid_suites_by_category = {}
self.load_viewable_repositories_and_suites_by_category()
self.load_repository_and_suite_tuples()
[docs] def add_category_entry(self, category):
category_name = str(category.name)
if category_name not in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] = 0
if category_name not in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] = 0
if category_name not in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if category_name not in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
[docs] def add_entry(self, repository):
try:
if repository:
is_valid = self.is_valid(repository)
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
for rca in repository.categories:
category = rca.category
category_name = str(category.name)
if category_name in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] += 1
else:
self.viewable_repositories_and_suites_by_category[category_name] = 1
if is_valid:
if category_name in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] += 1
else:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 1
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] += 1
else:
self.viewable_suites_by_category[category_name] = 1
if is_valid:
if category_name in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] += 1
else:
self.viewable_valid_suites_by_category[category_name] = 1
if is_level_one_certified:
if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 1
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] += 1
else:
self.certified_level_one_viewable_suites_by_category[category_name] = 1
self.load_repository_and_suite_tuple(repository)
if is_level_one_certified:
self.load_certified_level_one_repository_and_suite_tuple(repository)
except Exception:
# The viewable repository numbers and the categorized (filtered) lists of repository tuples
# may be slightly skewed, but that is no reason to result in a potential server error. All
# will be corrected at next server start.
log.exception("Handled error adding entry to repository registry")
[docs] def edit_category_entry(self, old_name, new_name):
if old_name in self.viewable_repositories_and_suites_by_category:
val = self.viewable_repositories_and_suites_by_category[old_name]
del self.viewable_repositories_and_suites_by_category[old_name]
self.viewable_repositories_and_suites_by_category[new_name] = val
else:
self.viewable_repositories_and_suites_by_category[new_name] = 0
if old_name in self.viewable_valid_repositories_and_suites_by_category:
val = self.viewable_valid_repositories_and_suites_by_category[old_name]
del self.viewable_valid_repositories_and_suites_by_category[old_name]
self.viewable_valid_repositories_and_suites_by_category[new_name] = val
else:
self.viewable_valid_repositories_and_suites_by_category[new_name] = 0
if old_name in self.viewable_suites_by_category:
val = self.viewable_suites_by_category[old_name]
del self.viewable_suites_by_category[old_name]
self.viewable_suites_by_category[new_name] = val
else:
self.viewable_suites_by_category[new_name] = 0
if old_name in self.viewable_valid_suites_by_category:
val = self.viewable_valid_suites_by_category[old_name]
del self.viewable_valid_suites_by_category[old_name]
self.viewable_valid_suites_by_category[new_name] = val
else:
self.viewable_valid_suites_by_category[new_name] = 0
if old_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
val = self.certified_level_one_viewable_repositories_and_suites_by_category[old_name]
del self.certified_level_one_viewable_repositories_and_suites_by_category[old_name]
self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = val
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[new_name] = 0
if old_name in self.certified_level_one_viewable_suites_by_category:
val = self.certified_level_one_viewable_suites_by_category[old_name]
del self.certified_level_one_viewable_suites_by_category[old_name]
self.certified_level_one_viewable_suites_by_category[new_name] = val
else:
self.certified_level_one_viewable_suites_by_category[new_name] = 0
[docs] def get_certified_level_one_clause_list(self):
certified_level_one_tuples = []
clause_list = []
for repository in self.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())):
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
if is_level_one_certified:
certified_level_one_tuples.append(certified_level_one_tuple)
clause_list.append(and_(
model.RepositoryMetadata.table.c.repository_id == repository.id,
model.RepositoryMetadata.table.c.changeset_revision == latest_installable_changeset_revision))
return clause_list
[docs] def get_certified_level_one_tuple(self, repository):
"""
Return True if the latest installable changeset_revision of the received repository is level one certified.
"""
if repository is None:
return (None, False)
if repository.deleted or repository.deprecated:
return (None, False)
# Get the latest installable changeset revision since that is all that is currently configured for testing.
latest_installable_changeset_revision = metadata_util.get_latest_downloadable_changeset_revision(self.app, repository)
if latest_installable_changeset_revision not in [None, hg_util.INITIAL_CHANGELOG_HASH]:
encoded_repository_id = self.app.security.encode_id(repository.id)
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(self.app,
encoded_repository_id,
latest_installable_changeset_revision)
if repository_metadata:
# No repository_metadata.
return (latest_installable_changeset_revision, True)
else:
# No installable changeset_revision.
return (None, False)
[docs] def is_level_one_certified(self, repository_metadata):
if repository_metadata:
repository = repository_metadata.repository
if repository:
if repository.deprecated or repository.deleted:
return False
tuple = (str(repository.name), str(repository.user.username), str(repository_metadata.changeset_revision))
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
return tuple in self.certified_level_one_suite_tuples
else:
return tuple in self.certified_level_one_repository_and_suite_tuples
return False
[docs] def is_valid(self, repository):
if repository and not repository.deleted and not repository.deprecated and repository.downloadable_revisions:
return True
return False
[docs] def load_certified_level_one_repository_and_suite_tuple(self, repository):
# The received repository has been determined to be level one certified.
name = str(repository.name)
owner = str(repository.user.username)
tip_changeset_hash = repository.tip()
if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH:
certified_level_one_tuple = (name, owner, tip_changeset_hash)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if certified_level_one_tuple not in self.certified_level_one_suite_tuples:
self.certified_level_one_suite_tuples.append(certified_level_one_tuple)
else:
if certified_level_one_tuple not in self.certified_level_one_repository_and_suite_tuples:
self.certified_level_one_repository_and_suite_tuples.append(certified_level_one_tuple)
[docs] def load_repository_and_suite_tuple(self, repository):
name = str(repository.name)
owner = str(repository.user.username)
for repository_metadata in repository.metadata_revisions:
changeset_revision = str(repository_metadata.changeset_revision)
tuple = (name, owner, changeset_revision)
if tuple not in self.repository_and_suite_tuples:
self.repository_and_suite_tuples.append(tuple)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if tuple not in self.suite_tuples:
self.suite_tuples.append(tuple)
[docs] def load_repository_and_suite_tuples(self):
# Load self.certified_level_one_repository_and_suite_tuples and self.certified_level_one_suite_tuples.
for repository in self.sa_session.query(model.Repository) \
.join(model.RepositoryMetadata.table) \
.filter(or_(*self.certified_level_one_clause_list)) \
.join(model.User.table):
self.load_certified_level_one_repository_and_suite_tuple(repository)
# Load self.repository_and_suite_tuples and self.suite_tuples
for repository in self.sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.deleted == false(),
model.Repository.table.c.deprecated == false())) \
.join(model.User.table):
self.load_repository_and_suite_tuple(repository)
[docs] def load_viewable_repositories_and_suites_by_category(self):
# Clear all dictionaries just in case they were previously loaded.
self.certified_level_one_viewable_repositories_and_suites_by_category = {}
self.certified_level_one_viewable_suites_by_category = {}
self.certified_level_two_viewable_repositories_and_suites_by_category = {}
self.certified_level_two_viewable_suites_by_category = {}
self.viewable_repositories_and_suites_by_category = {}
self.viewable_suites_by_category = {}
self.viewable_valid_repositories_and_suites_by_category = {}
self.viewable_valid_suites_by_category = {}
for category in self.sa_session.query(model.Category):
category_name = str(category.name)
if category not in self.certified_level_one_viewable_repositories_and_suites_by_category:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if category not in self.certified_level_one_viewable_suites_by_category:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
if category not in self.viewable_repositories_and_suites_by_category:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if category not in self.viewable_suites_by_category:
self.viewable_suites_by_category[category_name] = 0
if category not in self.viewable_valid_repositories_and_suites_by_category:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if category not in self.viewable_valid_suites_by_category:
self.viewable_valid_suites_by_category[category_name] = 0
for rca in category.repositories:
repository = rca.repository
if not repository.deleted and not repository.deprecated:
is_valid = self.is_valid(repository)
encoded_repository_id = self.app.security.encode_id(repository.id)
tip_changeset_hash = repository.tip()
repository_metadata = metadata_util.get_repository_metadata_by_changeset_revision(self.app,
encoded_repository_id,
tip_changeset_hash)
self.viewable_repositories_and_suites_by_category[category_name] += 1
if is_valid:
self.viewable_valid_repositories_and_suites_by_category[category_name] += 1
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
self.viewable_suites_by_category[category_name] += 1
if is_valid:
self.viewable_valid_suites_by_category[category_name] += 1
if self.is_level_one_certified(repository_metadata):
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] += 1
if repository.type in [rt_util.REPOSITORY_SUITE_DEFINITION]:
self.certified_level_one_viewable_suites_by_category[category_name] += 1
[docs] def remove_category_entry(self, category):
catgeory_name = str(category.name)
if catgeory_name in self.viewable_repositories_and_suites_by_category:
del self.viewable_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_valid_repositories_and_suites_by_category:
del self.viewable_valid_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_suites_by_category:
del self.viewable_suites_by_category[catgeory_name]
if catgeory_name in self.viewable_valid_suites_by_category:
del self.viewable_valid_suites_by_category[catgeory_name]
if catgeory_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
del self.certified_level_one_viewable_repositories_and_suites_by_category[catgeory_name]
if catgeory_name in self.certified_level_one_viewable_suites_by_category:
del self.certified_level_one_viewable_suites_by_category[catgeory_name]
[docs] def remove_entry(self, repository):
try:
if repository:
is_valid = self.is_valid(repository)
certified_level_one_tuple = self.get_certified_level_one_tuple(repository)
latest_installable_changeset_revision, is_level_one_certified = certified_level_one_tuple
for rca in repository.categories:
category = rca.category
category_name = str(category.name)
if category_name in self.viewable_repositories_and_suites_by_category:
if self.viewable_repositories_and_suites_by_category[category_name] > 0:
self.viewable_repositories_and_suites_by_category[category_name] -= 1
else:
self.viewable_repositories_and_suites_by_category[category_name] = 0
if is_valid:
if category_name in self.viewable_valid_repositories_and_suites_by_category:
if self.viewable_valid_repositories_and_suites_by_category[category_name] > 0:
self.viewable_valid_repositories_and_suites_by_category[category_name] -= 1
else:
self.viewable_valid_repositories_and_suites_by_category[category_name] = 0
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.viewable_suites_by_category:
if self.viewable_suites_by_category[category_name] > 0:
self.viewable_suites_by_category[category_name] -= 1
else:
self.viewable_suites_by_category[category_name] = 0
if is_valid:
if category_name in self.viewable_valid_suites_by_category:
if self.viewable_valid_suites_by_category[category_name] > 0:
self.viewable_valid_suites_by_category[category_name] -= 1
else:
self.viewable_valid_suites_by_category[category_name] = 0
if is_level_one_certified:
if category_name in self.certified_level_one_viewable_repositories_and_suites_by_category:
if self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] > 0:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] -= 1
else:
self.certified_level_one_viewable_repositories_and_suites_by_category[category_name] = 0
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if category_name in self.certified_level_one_viewable_suites_by_category:
if self.certified_level_one_viewable_suites_by_category[category_name] > 0:
self.certified_level_one_viewable_suites_by_category[category_name] -= 1
else:
self.certified_level_one_viewable_suites_by_category[category_name] = 0
self.unload_repository_and_suite_tuple(repository)
if is_level_one_certified:
self.unload_certified_level_one_repository_and_suite_tuple(repository)
except Exception:
# The viewable repository numbers and the categorized (filtered) lists of repository tuples
# may be slightly skewed, but that is no reason to result in a potential server error. All
# will be corrected at next server start.
log.exception("Handled error removing entry from repository registry")
@property
def sa_session(self):
return self.app.model.context.current
[docs] def unload_certified_level_one_repository_and_suite_tuple(self, repository):
# The received repository has been determined to be level one certified.
name = str(repository.name)
owner = str(repository.user.username)
tip_changeset_hash = repository.tip()
if tip_changeset_hash != hg_util.INITIAL_CHANGELOG_HASH:
certified_level_one_tuple = (name, owner, tip_changeset_hash)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if certified_level_one_tuple in self.certified_level_one_suite_tuples:
self.certified_level_one_suite_tuples.remove(certified_level_one_tuple)
else:
if certified_level_one_tuple in self.certified_level_one_repository_and_suite_tuples:
self.certified_level_one_repository_and_suite_tuples.remove(certified_level_one_tuple)
[docs] def unload_repository_and_suite_tuple(self, repository):
name = str(repository.name)
owner = str(repository.user.username)
for repository_metadata in repository.metadata_revisions:
changeset_revision = str(repository_metadata.changeset_revision)
tuple = (name, owner, changeset_revision)
if tuple in self.repository_and_suite_tuples:
self.repository_and_suite_tuples.remove(tuple)
if repository.type == rt_util.REPOSITORY_SUITE_DEFINITION:
if tuple in self.suite_tuples:
self.suite_tuples.remove(tuple)