Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.test.base.test_db_util

import logging
from typing import List

from sqlalchemy import (
    false,
    select,
)

import galaxy.model
import galaxy.model.tool_shed_install
import tool_shed.webapp.model as model
from galaxy.managers.users import (
    get_user_by_email,
    get_user_by_username,
)

log = logging.getLogger("test.tool_shed.test_db_util")


[docs]def sa_session(): from galaxy_test.driver.driver_util import tool_shed_context as sa_session return sa_session
[docs]def install_session(): from galaxy_test.driver.driver_util import install_context as install_session return install_session
[docs]def flush(obj): sa_session().add(obj) sa_session().flush()
[docs]def get_all_repositories(): return sa_session().scalars(select(model.Repository)).all()
[docs]def get_all_installed_repositories(session=None) -> List[galaxy.model.tool_shed_install.ToolShedRepository]: if session is None: session = install_session() ToolShedRepository = galaxy.model.tool_shed_install.ToolShedRepository stmt = ( select(ToolShedRepository) .where(ToolShedRepository.deleted == false()) .where(ToolShedRepository.uninstalled == false()) .where(ToolShedRepository.status == ToolShedRepository.installation_status.INSTALLED) ) return session.scalars(stmt).all()
[docs]def get_installed_repository_by_id(repository_id): return install_session().get(galaxy.model.tool_shed_install.ToolShedRepository, repository_id)
[docs]def get_installed_repository_by_name_owner(repository_name, owner, return_multiple=False, session=None): if session is None: session = install_session() ToolShedRepository = galaxy.model.tool_shed_install.ToolShedRepository stmt = ( select(ToolShedRepository) .where(ToolShedRepository.name == repository_name) .where(ToolShedRepository.owner == owner) ) if return_multiple: return session.scalars(stmt).all() return session.scalars(stmt.limit(1)).first()
[docs]def get_role(user, role_name): for role in user.all_roles(): if role.name == role_name: return role return None
[docs]def get_repository_role_association(repository_id, role_id): stmt = ( select(model.RepositoryRoleAssociation) .where(model.RepositoryRoleAssociation.role_id == role_id) .where(model.RepositoryRoleAssociation.repository_id == repository_id) .limit(1) ) return sa_session().scalars(stmt).first()
[docs]def get_repository_by_id(repository_id): return sa_session().get(model.Repository, repository_id)
[docs]def get_user(email): return get_user_by_email(sa_session(), email, model.User)
[docs]def refresh(obj): sa_session().refresh(obj)
[docs]def ga_refresh(obj): install_session().refresh(obj)
[docs]def get_repository_by_name_and_owner(name, owner_username, return_multiple=False): owner = get_user_by_username(sa_session(), owner_username, model.User) stmt = ( select(model.Repository) .where(model.Repository.name == name) .where(model.Repository.user_id == owner.id) .limit(1) ) return sa_session().scalars(stmt).first()
[docs]def get_repository_metadata_by_repository_id_changeset_revision(repository_id, changeset_revision): stmt = ( select(model.RepositoryMetadata) .where(model.RepositoryMetadata.repository_id == repository_id) .where(model.RepositoryMetadata.changeset_revision == changeset_revision) .limit(1) ) return sa_session().scalars(stmt).first()