Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.roles

"""
Manager and Serializer for Roles.
"""
import logging

from sqlalchemy import false
from sqlalchemy.orm import exc as sqlalchemy_exceptions

import galaxy.exceptions
from galaxy import model
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.managers import base
from galaxy.managers.context import ProvidesUserContext
from galaxy.model import Role
from galaxy.util import unicodify

log = logging.getLogger(__name__)


[docs]class RoleManager(base.ModelManager): """ Business logic for roles. """ model_class = model.Role foreign_key_name = 'role' user_assoc = model.UserRoleAssociation group_assoc = model.GroupRoleAssociation
[docs] def get(self, trans: ProvidesUserContext, decoded_role_id): """ Method loads the role from the DB based on the given role id. :param decoded_role_id: id of the role to load from the DB :type decoded_role_id: int :returns: the loaded Role object :rtype: galaxy.model.Role :raises: InconsistentDatabase, RequestParameterInvalidException, InternalServerError """ try: role = (self.session().query(self.model_class) .filter(self.model_class.id == decoded_role_id).one()) except sqlalchemy_exceptions.MultipleResultsFound: raise galaxy.exceptions.InconsistentDatabase('Multiple roles found with the same id.') except sqlalchemy_exceptions.NoResultFound: raise galaxy.exceptions.RequestParameterInvalidException('No accessible role found with the id provided.') except Exception as e: raise galaxy.exceptions.InternalServerError(f"Error loading from the database.{unicodify(e)}") if not (trans.user_is_admin or trans.app.security_agent.ok_to_display(trans.user, role)): raise galaxy.exceptions.RequestParameterInvalidException('No accessible role found with the id provided.') return role
[docs] def list_displayable_roles(self, trans: ProvidesUserContext): roles = [] for role in trans.sa_session.query(Role).filter(Role.deleted == false()): if trans.user_is_admin or trans.app.security_agent.ok_to_display(trans.user, role): roles.append(role) return roles
[docs] def create_role(self, trans: ProvidesUserContext, role_definition_model) -> model.Role: name = role_definition_model.name description = role_definition_model.description user_ids = role_definition_model.user_ids or [] group_ids = role_definition_model.group_ids or [] if trans.sa_session.query(Role).filter(Role.name == name).first(): raise RequestParameterInvalidException(f"A role with that name already exists [{name}]") role_type = Role.types.ADMIN # TODO: allow non-admins to create roles role = Role(name=name, description=description, type=role_type) trans.sa_session.add(role) users = [trans.sa_session.query(model.User).get(trans.security.decode_id(i)) for i in user_ids] groups = [trans.sa_session.query(model.Group).get(trans.security.decode_id(i)) for i in group_ids] # Create the UserRoleAssociations for user in users: trans.app.security_agent.associate_user_role(user, role) # Create the GroupRoleAssociations for group in groups: trans.app.security_agent.associate_group_role(group, role) trans.sa_session.flush() return role