import logging
from typing import List
from sqlalchemy import (
false,
select,
)
import galaxy.model
import galaxy.model.tool_shed_install
import tool_shed.webapp.model as model
from galaxy.managers.users import (
get_user_by_email,
get_user_by_username,
)
log = logging.getLogger("test.tool_shed.test_db_util")
[docs]def sa_session():
from .driver import tool_shed_context as sa_session
return sa_session
[docs]def install_session():
from galaxy_test.driver.driver_util import install_context as install_session
return install_session
[docs]def flush(obj):
sa_session().add(obj)
sa_session().flush()
[docs]def get_all_repositories():
return sa_session().scalars(select(model.Repository)).all()
[docs]def get_all_installed_repositories(session=None) -> List[galaxy.model.tool_shed_install.ToolShedRepository]:
if session is None:
session = install_session()
ToolShedRepository = galaxy.model.tool_shed_install.ToolShedRepository
stmt = (
select(ToolShedRepository)
.where(ToolShedRepository.deleted == false())
.where(ToolShedRepository.uninstalled == false())
.where(ToolShedRepository.status == ToolShedRepository.installation_status.INSTALLED)
)
return session.scalars(stmt).all()
[docs]def get_installed_repository_by_id(repository_id):
return install_session().get(galaxy.model.tool_shed_install.ToolShedRepository, repository_id)
[docs]def get_installed_repository_by_name_owner(repository_name, owner, return_multiple=False, session=None):
if session is None:
session = install_session()
ToolShedRepository = galaxy.model.tool_shed_install.ToolShedRepository
stmt = (
select(ToolShedRepository)
.where(ToolShedRepository.name == repository_name)
.where(ToolShedRepository.owner == owner)
)
if return_multiple:
return session.scalars(stmt).all()
return session.scalars(stmt.limit(1)).first()
[docs]def get_role(user, role_name):
for role in user.all_roles():
if role.name == role_name:
return role
return None
[docs]def get_repository_role_association(repository_id, role_id):
stmt = (
select(model.RepositoryRoleAssociation)
.where(model.RepositoryRoleAssociation.role_id == role_id)
.where(model.RepositoryRoleAssociation.repository_id == repository_id)
.limit(1)
)
return sa_session().scalars(stmt).first()
[docs]def get_repository_by_id(repository_id):
return sa_session().get(model.Repository, repository_id)
[docs]def get_user(email):
return get_user_by_email(sa_session(), email, model.User)
[docs]def refresh(obj):
sa_session().refresh(obj)
[docs]def ga_refresh(obj):
install_session().refresh(obj)
[docs]def get_repository_by_name_and_owner(name, owner_username, return_multiple=False):
owner = get_user_by_username(sa_session(), owner_username, model.User)
stmt = (
select(model.Repository)
.where(model.Repository.name == name)
.where(model.Repository.user_id == owner.id)
.limit(1)
)
return sa_session().scalars(stmt).first()