Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.test.base.test_db_util
import logging
from sqlalchemy import and_, false, true
import galaxy.model
import galaxy.model.tool_shed_install
import tool_shed.webapp.model as model
from galaxy_test.driver.driver_util import (
galaxy_context as ga_session,
install_context as install_session,
tool_shed_context as sa_session
)
log = logging.getLogger('test.tool_shed.test_db_util')
[docs]def get_all_installed_repositories(actually_installed=False):
if actually_installed:
return install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) \
.filter(and_(galaxy.model.tool_shed_install.ToolShedRepository.table.c.deleted == false(),
galaxy.model.tool_shed_install.ToolShedRepository.table.c.uninstalled == false(),
galaxy.model.tool_shed_install.ToolShedRepository.table.c.status == galaxy.model.tool_shed_install.ToolShedRepository.installation_status.INSTALLED)) \
.all()
else:
return install_session.query(galaxy.model.tool_shed_install.ToolShedRepository).all()
[docs]def get_category_by_name(name):
return sa_session.query(model.Category) \
.filter(model.Category.table.c.name == name) \
.first()
[docs]def get_default_user_permissions_by_role(role):
return sa_session.query(model.DefaultUserPermissions) \
.filter(model.DefaultUserPermissions.table.c.role_id == role.id) \
.all()
[docs]def get_default_user_permissions_by_user(user):
return sa_session.query(model.DefaultUserPermissions) \
.filter(model.DefaultUserPermissions.table.c.user_id == user.id) \
.all()
[docs]def get_galaxy_repository_by_name_owner_changeset_revision(repository_name, owner, changeset_revision):
return install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) \
.filter(and_(galaxy.model.tool_shed_install.ToolShedRepository.table.c.name == repository_name,
galaxy.model.tool_shed_install.ToolShedRepository.table.c.owner == owner,
galaxy.model.tool_shed_install.ToolShedRepository.table.c.changeset_revision == changeset_revision)) \
.first()
[docs]def get_installed_repository_by_id(repository_id):
return install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) \
.filter(galaxy.model.tool_shed_install.ToolShedRepository.table.c.id == repository_id) \
.first()
[docs]def get_installed_repository_by_name_owner(repository_name, owner, return_multiple=False):
query = install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) \
.filter(and_(galaxy.model.tool_shed_install.ToolShedRepository.table.c.name == repository_name,
galaxy.model.tool_shed_install.ToolShedRepository.table.c.owner == owner))
if return_multiple:
return query.all()
return query.first()
[docs]def get_private_role(user):
for role in user.all_roles():
if role.name == user.email and role.description == 'Private Role for %s' % user.email:
return role
raise AssertionError("Private role not found for user '%s'" % user.email)
[docs]def get_role(user, role_name):
for role in user.all_roles():
if role.name == role_name:
return role
return None
[docs]def get_repository_role_association(repository_id, role_id):
rra = sa_session.query(model.RepositoryRoleAssociation) \
.filter(and_(model.RepositoryRoleAssociation.table.c.role_id == role_id,
model.RepositoryRoleAssociation.table.c.repository_id == repository_id)) \
.first()
return rra
[docs]def get_repository_reviews(repository_id, reviewer_user_id=None, changeset_revision=None):
if reviewer_user_id and changeset_revision:
reviews = sa_session.query(model.RepositoryReview) \
.filter(and_(model.RepositoryReview.table.c.repository_id == repository_id,
model.RepositoryReview.table.c.deleted == false(),
model.RepositoryReview.table.c.changeset_revision == changeset_revision,
model.RepositoryReview.table.c.user_id == reviewer_user_id)) \
.all()
elif reviewer_user_id:
reviews = sa_session.query(model.RepositoryReview) \
.filter(and_(model.RepositoryReview.table.c.repository_id == repository_id,
model.RepositoryReview.table.c.deleted == false(),
model.RepositoryReview.table.c.user_id == reviewer_user_id)) \
.all()
else:
reviews = sa_session.query(model.RepositoryReview) \
.filter(and_(model.RepositoryReview.table.c.repository_id == repository_id,
model.RepositoryReview.table.c.deleted == false())) \
.all()
return reviews
[docs]def get_reviews_ordered_by_changeset_revision(repository_id, changelog_tuples, reviewer_user_id=None):
reviews = get_repository_reviews(repository_id, reviewer_user_id=reviewer_user_id)
ordered_reviews = []
for ctx_rev, changeset_hash in changelog_tuples:
for review in reviews:
if str(review.changeset_revision) == str(changeset_hash):
ordered_reviews.append(review)
return ordered_reviews
[docs]def get_repository_by_id(repository_id):
return sa_session.query(model.Repository) \
.filter(model.Repository.table.c.id == repository_id) \
.first()
[docs]def get_repository_downloadable_revisions(repository_id):
revisions = sa_session.query(model.RepositoryMetadata) \
.filter(and_(model.RepositoryMetadata.table.c.repository_id == repository_id,
model.RepositoryMetadata.table.c.downloadable == true())) \
.all()
return revisions
[docs]def get_repository_metadata_for_changeset_revision(repository_id, changeset_revision):
repository_metadata = sa_session.query(model.RepositoryMetadata) \
.filter(and_(model.RepositoryMetadata.table.c.repository_id == repository_id,
model.RepositoryMetadata.table.c.changeset_revision == changeset_revision)) \
.first()
return repository_metadata
[docs]def get_repository_review_by_user_id_changeset_revision(user_id, repository_id, changeset_revision):
review = sa_session.query(model.RepositoryReview) \
.filter(and_(model.RepositoryReview.table.c.user_id == user_id,
model.RepositoryReview.table.c.repository_id == repository_id,
model.RepositoryReview.table.c.changeset_revision == changeset_revision)) \
.first()
return review
[docs]def get_role_by_name(role_name):
return sa_session.query(model.Role) \
.filter(model.Role.table.c.name == role_name) \
.first()
[docs]def get_user(email):
return sa_session.query(model.User) \
.filter(model.User.table.c.email == email) \
.first()
[docs]def get_user_by_name(username):
return sa_session.query(model.User) \
.filter(model.User.table.c.username == username) \
.first()
[docs]def get_galaxy_private_role(user):
for role in user.all_roles():
if role.name == user.email and role.description == 'Private Role for %s' % user.email:
return role
raise AssertionError("Private role not found for user '%s'" % user.email)
[docs]def get_galaxy_user(email):
return ga_session.query(galaxy.model.User) \
.filter(galaxy.model.User.table.c.email == email) \
.first()
[docs]def get_repository_by_name_and_owner(name, owner_username, return_multiple=False):
owner = get_user_by_name(owner_username)
repository = sa_session.query(model.Repository) \
.filter(and_(model.Repository.table.c.name == name,
model.Repository.table.c.user_id == owner.id)) \
.first()
return repository
[docs]def get_repository_metadata_by_repository_id_changeset_revision(repository_id, changeset_revision):
repository_metadata = sa_session.query(model.RepositoryMetadata) \
.filter(and_(model.RepositoryMetadata.table.c.repository_id == repository_id,
model.RepositoryMetadata.table.c.changeset_revision == changeset_revision)) \
.first()
return repository_metadata