Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.tool_shed.security
"""Tool Shed Security"""
import logging
from sqlalchemy import and_, false
from galaxy.util import listify
from galaxy.util.bunch import Bunch
log = logging.getLogger(__name__)
[docs]class Action(object):
[docs]    def __init__(self, action, description, model):
        self.action = action
        self.description = description
        self.model = model
[docs]class RBACAgent(object):
    """Handle Galaxy Tool Shed security"""
    permitted_actions = Bunch()
[docs]    def associate_components(self, **kwd):
        raise Exception('No valid method of associating provided components: %s' % kwd)
[docs]    def associate_user_role(self, user, role):
        raise Exception('No valid method of associating a user with a role')
[docs]    def convert_permitted_action_strings(self, permitted_action_strings):
        """
        When getting permitted actions from an untrusted source like a
        form, ensure that they match our actual permitted actions.
        """
        return [x for x in [self.permitted_actions.get(action_string) for action_string in permitted_action_strings] if x is not None]
[docs]    def get_action(self, name, default=None):
        """Get a permitted action by its dict key or action name"""
        for k, v in self.permitted_actions.items():
            if k == name or v.action == name:
                return v
        return default
[docs]    def get_actions(self):
        """Get all permitted actions as a list of Action objects"""
        return list(self.permitted_actions.__dict__.values())
[docs]    def get_item_actions(self, action, item):
        raise Exception('No valid method of retrieving action (%s) for item %s.' % (action, item))
[docs]class CommunityRBACAgent(RBACAgent):
[docs]    def __init__(self, model, permitted_actions=None):
        self.model = model
        if permitted_actions:
            self.permitted_actions = permitted_actions
    @property
    def sa_session(self):
        """Returns a SQLAlchemy session"""
        return self.model.context
[docs]    def allow_action(self, roles, action, item):
        """
        Method for checking a permission for the current user ( based on roles ) to perform a
        specific action on an item
        """
        item_actions = self.get_item_actions(action, item)
        if not item_actions:
            return action.model == 'restrict'
        ret_val = False
        for item_action in item_actions:
            if item_action.role in roles:
                ret_val = True
                break
        return ret_val
[docs]    def associate_components(self, **kwd):
        if 'user' in kwd:
            if 'group' in kwd:
                return self.associate_user_group(kwd['user'], kwd['group'])
            elif 'role' in kwd:
                return self.associate_user_role(kwd['user'], kwd['role'])
        elif 'role' in kwd:
            if 'group' in kwd:
                return self.associate_group_role(kwd['group'], kwd['role'])
        elif 'repository' in kwd:
            return self.associate_repository_category(kwd['repository'], kwd['category'])
        raise Exception('No valid method of associating provided components: %s' % kwd)
[docs]    def associate_group_role(self, group, role):
        assoc = self.model.GroupRoleAssociation(group, role)
        self.sa_session.add(assoc)
        self.sa_session.flush()
        return assoc
[docs]    def associate_user_group(self, user, group):
        assoc = self.model.UserGroupAssociation(user, group)
        self.sa_session.add(assoc)
        self.sa_session.flush()
        return assoc
[docs]    def associate_user_role(self, user, role):
        assoc = self.model.UserRoleAssociation(user, role)
        self.sa_session.add(assoc)
        self.sa_session.flush()
        return assoc
[docs]    def associate_repository_category(self, repository, category):
        assoc = self.model.RepositoryCategoryAssociation(repository, category)
        self.sa_session.add(assoc)
        self.sa_session.flush()
        return assoc
[docs]    def create_private_user_role(self, user):
        # Create private role
        role = self.model.Role(name=user.email, description='Private Role for ' + user.email, type=self.model.Role.types.PRIVATE)
        self.sa_session.add(role)
        self.sa_session.flush()
        # Add user to role
        self.associate_components(role=role, user=user)
        return role
[docs]    def get_item_actions(self, action, item):
        # item must be one of: Dataset, Library, LibraryFolder, LibraryDataset, LibraryDatasetDatasetAssociation
        return [permission for permission in item.actions if permission.action == action.action]
[docs]    def get_private_user_role(self, user, auto_create=False):
        role = self.sa_session.query(self.model.Role) \
                              .filter(and_(self.model.Role.table.c.name == user.email,
                                           self.model.Role.table.c.type == self.model.Role.types.PRIVATE)) \
                              .first()
        if not role:
            if auto_create:
                return self.create_private_user_role(user)
            else:
                return None
        return role
[docs]    def get_repository_reviewer_role(self):
        return self.sa_session.query(self.model.Role) \
                              .filter(and_(self.model.Role.table.c.name == 'Repository Reviewer',
                                           self.model.Role.table.c.type == self.model.Role.types.SYSTEM)) \
                              .first()
[docs]    def set_entity_group_associations(self, groups=None, users=None, roles=None, delete_existing_assocs=True):
        if groups is None:
            groups = []
        if users is None:
            users = []
        if roles is None:
            roles = []
        for group in groups:
            if delete_existing_assocs:
                for a in group.roles + group.users:
                    self.sa_session.delete(a)
                    self.sa_session.flush()
            for role in roles:
                self.associate_components(group=group, role=role)
            for user in users:
                self.associate_components(group=group, user=user)
[docs]    def set_entity_role_associations(self, roles=None, users=None, groups=None, repositories=None, delete_existing_assocs=True):
        if roles is None:
            roles = []
        if users is None:
            users = []
        if groups is None:
            groups = []
        if repositories is None:
            repositories = []
        for role in roles:
            if delete_existing_assocs:
                for a in role.users + role.groups:
                    self.sa_session.delete(a)
                    self.sa_session.flush()
            for user in users:
                self.associate_components(user=user, role=role)
            for group in groups:
                self.associate_components(group=group, role=role)
[docs]    def set_entity_user_associations(self, users=None, roles=None, groups=None, delete_existing_assocs=True):
        if users is None:
            users = []
        if roles is None:
            roles = []
        if groups is None:
            groups = []
        for user in users:
            if delete_existing_assocs:
                for a in user.non_private_roles + user.groups:
                    self.sa_session.delete(a)
                    self.sa_session.flush()
            self.sa_session.refresh(user)
            for role in roles:
                # Make sure we are not creating an additional association with a PRIVATE role
                if role not in user.roles:
                    self.associate_components(user=user, role=role)
            for group in groups:
                self.associate_components(user=user, group=group)
[docs]    def can_push(self, app, user, repository):
        if user:
            return user.username in listify(repository.allow_push(app))
        return False
[docs]    def user_can_administer_repository(self, user, repository):
        """Return True if the received user can administer the received repository."""
        if user:
            if repository:
                repository_admin_role = repository.admin_role
                for rra in repository.roles:
                    role = rra.role
                    if role.id == repository_admin_role.id:
                        # We have the repository's admin role, so see if the user is associated with it.
                        for ura in role.users:
                            role_member = ura.user
                            if role_member.id == user.id:
                                return True
                        # The user is not directly associated with the role, so see if they are a member
                        # of a group that is associated with the role.
                        for gra in role.groups:
                            group = gra.group
                            for uga in group.members:
                                member = uga.user
                                if member.id == user.id:
                                    return True
        return False
[docs]    def user_can_import_repository_archive(self, user, archive_owner):
        # This method should be called only if the current user is not an admin.
        if user.username == archive_owner:
            return True
        # A member of the IUC is authorized to create new repositories that are owned by another user.
        iuc_group = self.sa_session.query(self.model.Group) \
                                   .filter(and_(self.model.Group.table.c.name == 'Intergalactic Utilities Commission',
                                                self.model.Group.table.c.deleted == false())) \
                                   .first()
        if iuc_group is not None:
            for uga in iuc_group.users:
                if uga.user.id == user.id:
                    return True
        return False
[docs]    def user_can_review_repositories(self, user):
        if user:
            roles = user.all_roles()
            if roles:
                repository_reviewer_role = self.get_repository_reviewer_role()
                if repository_reviewer_role:
                    return repository_reviewer_role in roles
        return False
[docs]    def user_can_browse_component_review(self, app, repository, component_review, user):
        if component_review and user:
            if self.can_push(app, user, repository):
                # A user with write permission on the repository can access private/public component reviews.
                return True
            else:
                if self.user_can_review_repositories(user):
                    # Reviewers can access private/public component reviews.
                    return True
        return False
[docs]def get_permitted_actions(filter=None):
    '''Utility method to return a subset of RBACAgent's permitted actions'''
    if filter is None:
        return RBACAgent.permitted_actions
    tmp_bunch = Bunch()
    [tmp_bunch.__dict__.__setitem__(k, v) for k, v in RBACAgent.permitted_actions.items() if k.startswith(filter)]
    return tmp_bunch