Source code for tool_shed.test.base.test_db_util

import logging

from sqlalchemy import (
    and_,
    false,
    true,
)

import galaxy.model
import galaxy.model.tool_shed_install
import tool_shed.webapp.model as model
from galaxy_test.driver.driver_util import galaxy_context as ga_session
from galaxy_test.driver.driver_util import install_context as install_session
from galaxy_test.driver.driver_util import tool_shed_context as sa_session

log = logging.getLogger("test.tool_shed.test_db_util")


[docs]def delete_obj(obj): sa_session.delete(obj) sa_session.flush()
[docs]def delete_user_roles(user): for ura in user.roles: sa_session.delete(ura) sa_session.flush()
[docs]def flush(obj): sa_session.add(obj) sa_session.flush()
[docs]def get_all_repositories(): return sa_session.query(model.Repository).all()
[docs]def get_all_installed_repositories(actually_installed=False): if actually_installed: return ( install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) .filter( and_( galaxy.model.tool_shed_install.ToolShedRepository.table.c.deleted == false(), galaxy.model.tool_shed_install.ToolShedRepository.table.c.uninstalled == false(), galaxy.model.tool_shed_install.ToolShedRepository.table.c.status == galaxy.model.tool_shed_install.ToolShedRepository.installation_status.INSTALLED, ) ) .all() ) else: return install_session.query(galaxy.model.tool_shed_install.ToolShedRepository).all()
[docs]def get_category_by_name(name): return sa_session.query(model.Category).filter(model.Category.table.c.name == name).first()
[docs]def get_default_user_permissions_by_role(role): return ( sa_session.query(model.DefaultUserPermissions) .filter(model.DefaultUserPermissions.table.c.role_id == role.id) .all() )
[docs]def get_default_user_permissions_by_user(user): return ( sa_session.query(model.DefaultUserPermissions) .filter(model.DefaultUserPermissions.table.c.user_id == user.id) .all() )
[docs]def get_galaxy_repository_by_name_owner_changeset_revision(repository_name, owner, changeset_revision): return ( install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) .filter( and_( galaxy.model.tool_shed_install.ToolShedRepository.table.c.name == repository_name, galaxy.model.tool_shed_install.ToolShedRepository.table.c.owner == owner, galaxy.model.tool_shed_install.ToolShedRepository.table.c.changeset_revision == changeset_revision, ) ) .first() )
[docs]def get_installed_repository_by_id(repository_id): return ( install_session.query(galaxy.model.tool_shed_install.ToolShedRepository) .filter(galaxy.model.tool_shed_install.ToolShedRepository.table.c.id == repository_id) .first() )
[docs]def get_installed_repository_by_name_owner(repository_name, owner, return_multiple=False): query = install_session.query(galaxy.model.tool_shed_install.ToolShedRepository).filter( and_( galaxy.model.tool_shed_install.ToolShedRepository.table.c.name == repository_name, galaxy.model.tool_shed_install.ToolShedRepository.table.c.owner == owner, ) ) if return_multiple: return query.all() return query.first()
[docs]def get_private_role(user): for role in user.all_roles(): if role.name == user.email and role.description == f"Private Role for {user.email}": return role raise AssertionError(f"Private role not found for user '{user.email}'")
[docs]def get_role(user, role_name): for role in user.all_roles(): if role.name == role_name: return role return None
[docs]def get_repository_role_association(repository_id, role_id): rra = ( sa_session.query(model.RepositoryRoleAssociation) .filter( and_( model.RepositoryRoleAssociation.table.c.role_id == role_id, model.RepositoryRoleAssociation.table.c.repository_id == repository_id, ) ) .first() ) return rra
[docs]def get_repository_reviews(repository_id, reviewer_user_id=None, changeset_revision=None): if reviewer_user_id and changeset_revision: reviews = ( sa_session.query(model.RepositoryReview) .filter( and_( model.RepositoryReview.table.c.repository_id == repository_id, model.RepositoryReview.table.c.deleted == false(), model.RepositoryReview.table.c.changeset_revision == changeset_revision, model.RepositoryReview.table.c.user_id == reviewer_user_id, ) ) .all() ) elif reviewer_user_id: reviews = ( sa_session.query(model.RepositoryReview) .filter( and_( model.RepositoryReview.table.c.repository_id == repository_id, model.RepositoryReview.table.c.deleted == false(), model.RepositoryReview.table.c.user_id == reviewer_user_id, ) ) .all() ) else: reviews = ( sa_session.query(model.RepositoryReview) .filter( and_( model.RepositoryReview.table.c.repository_id == repository_id, model.RepositoryReview.table.c.deleted == false(), ) ) .all() ) return reviews
[docs]def get_reviews_ordered_by_changeset_revision(repository_id, changelog_tuples, reviewer_user_id=None): reviews = get_repository_reviews(repository_id, reviewer_user_id=reviewer_user_id) ordered_reviews = [] for _ctx_rev, changeset_hash in changelog_tuples: for review in reviews: if str(review.changeset_revision) == str(changeset_hash): ordered_reviews.append(review) return ordered_reviews
[docs]def get_repository_by_id(repository_id): return sa_session.query(model.Repository).filter(model.Repository.table.c.id == repository_id).first()
[docs]def get_repository_downloadable_revisions(repository_id): revisions = ( sa_session.query(model.RepositoryMetadata) .filter( and_( model.RepositoryMetadata.table.c.repository_id == repository_id, model.RepositoryMetadata.table.c.downloadable == true(), ) ) .all() ) return revisions
[docs]def get_repository_metadata_for_changeset_revision(repository_id, changeset_revision): repository_metadata = ( sa_session.query(model.RepositoryMetadata) .filter( and_( model.RepositoryMetadata.table.c.repository_id == repository_id, model.RepositoryMetadata.table.c.changeset_revision == changeset_revision, ) ) .first() ) return repository_metadata
[docs]def get_repository_review_by_user_id_changeset_revision(user_id, repository_id, changeset_revision): review = ( sa_session.query(model.RepositoryReview) .filter( and_( model.RepositoryReview.table.c.user_id == user_id, model.RepositoryReview.table.c.repository_id == repository_id, model.RepositoryReview.table.c.changeset_revision == changeset_revision, ) ) .first() ) return review
[docs]def get_role_by_name(role_name): return sa_session.query(model.Role).filter(model.Role.table.c.name == role_name).first()
[docs]def get_user(email): return sa_session.query(model.User).filter(model.User.table.c.email == email).first()
[docs]def get_user_by_name(username): return sa_session.query(model.User).filter(model.User.table.c.username == username).first()
[docs]def mark_obj_deleted(obj): obj.deleted = True sa_session.add(obj) sa_session.flush()
[docs]def refresh(obj): sa_session.refresh(obj)
[docs]def ga_refresh(obj): install_session.refresh(obj)
[docs]def get_galaxy_private_role(user): for role in user.all_roles(): if role.name == user.email and role.description == f"Private Role for {user.email}": return role raise AssertionError(f"Private role not found for user '{user.email}'")
[docs]def get_galaxy_user(email): return ga_session.query(galaxy.model.User).filter(galaxy.model.User.table.c.email == email).first()
[docs]def get_repository_by_name_and_owner(name, owner_username, return_multiple=False): owner = get_user_by_name(owner_username) repository = ( sa_session.query(model.Repository) .filter(and_(model.Repository.table.c.name == name, model.Repository.table.c.user_id == owner.id)) .first() ) return repository
[docs]def get_repository_metadata_by_repository_id_changeset_revision(repository_id, changeset_revision): repository_metadata = ( sa_session.query(model.RepositoryMetadata) .filter( and_( model.RepositoryMetadata.table.c.repository_id == repository_id, model.RepositoryMetadata.table.c.changeset_revision == changeset_revision, ) ) .first() ) return repository_metadata