Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.roles
"""
Manager and Serializer for Roles.
"""
import logging
from sqlalchemy import false
from sqlalchemy.orm import exc as sqlalchemy_exceptions
import galaxy.exceptions
from galaxy import model
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.managers import base
from galaxy.managers.context import ProvidesUserContext
from galaxy.model import Role
from galaxy.util import unicodify
log = logging.getLogger(__name__)
[docs]class RoleManager(base.ModelManager):
"""
Business logic for roles.
"""
model_class = model.Role
foreign_key_name = 'role'
user_assoc = model.UserRoleAssociation
group_assoc = model.GroupRoleAssociation
[docs] def get(self, trans: ProvidesUserContext, decoded_role_id):
"""
Method loads the role from the DB based on the given role id.
:param decoded_role_id: id of the role to load from the DB
:type decoded_role_id: int
:returns: the loaded Role object
:rtype: galaxy.model.Role
:raises: InconsistentDatabase, RequestParameterInvalidException, InternalServerError
"""
try:
role = (self.session().query(self.model_class)
.filter(self.model_class.id == decoded_role_id).one())
except sqlalchemy_exceptions.MultipleResultsFound:
raise galaxy.exceptions.InconsistentDatabase('Multiple roles found with the same id.')
except sqlalchemy_exceptions.NoResultFound:
raise galaxy.exceptions.RequestParameterInvalidException('No accessible role found with the id provided.')
except Exception as e:
raise galaxy.exceptions.InternalServerError(f"Error loading from the database.{unicodify(e)}")
if not (trans.user_is_admin or trans.app.security_agent.ok_to_display(trans.user, role)):
raise galaxy.exceptions.RequestParameterInvalidException('No accessible role found with the id provided.')
return role
[docs] def list_displayable_roles(self, trans: ProvidesUserContext):
roles = []
for role in trans.sa_session.query(Role).filter(Role.deleted == false()):
if trans.user_is_admin or trans.app.security_agent.ok_to_display(trans.user, role):
roles.append(role)
return roles
[docs] def create_role(self, trans: ProvidesUserContext, role_definition_model) -> model.Role:
name = role_definition_model.name
description = role_definition_model.description
user_ids = role_definition_model.user_ids or []
group_ids = role_definition_model.group_ids or []
if trans.sa_session.query(Role).filter(Role.name == name).first():
raise RequestParameterInvalidException(f"A role with that name already exists [{name}]")
role_type = Role.types.ADMIN # TODO: allow non-admins to create roles
role = Role(name=name, description=description, type=role_type)
trans.sa_session.add(role)
users = [trans.sa_session.query(model.User).get(trans.security.decode_id(i)) for i in user_ids]
groups = [trans.sa_session.query(model.Group).get(trans.security.decode_id(i)) for i in group_ids]
# Create the UserRoleAssociations
for user in users:
trans.app.security_agent.associate_user_role(user, role)
# Create the GroupRoleAssociations
for group in groups:
trans.app.security_agent.associate_group_role(group, role)
trans.sa_session.flush()
return role