Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.webapp.security

"""Tool Shed Security"""
import logging

from sqlalchemy import (
    and_,
    false,
)

from galaxy.util import listify
from galaxy.util.bunch import Bunch

log = logging.getLogger(__name__)


[docs]class Action:
[docs] def __init__(self, action, description, model): self.action = action self.description = description self.model = model
[docs]class RBACAgent: """Handle Galaxy Tool Shed security""" permitted_actions = Bunch()
[docs] def associate_components(self, **kwd): raise Exception(f"No valid method of associating provided components: {kwd}")
[docs] def associate_user_role(self, user, role): raise Exception("No valid method of associating a user with a role")
[docs] def convert_permitted_action_strings(self, permitted_action_strings): """ When getting permitted actions from an untrusted source like a form, ensure that they match our actual permitted actions. """ return [ x for x in [self.permitted_actions.get(action_string) for action_string in permitted_action_strings] if x is not None ]
[docs] def create_user_role(self, user, app): raise Exception("Unimplemented Method")
[docs] def create_private_user_role(self, user): raise Exception("Unimplemented Method")
[docs] def get_action(self, name, default=None): """Get a permitted action by its dict key or action name""" for k, v in self.permitted_actions.items(): if k == name or v.action == name: return v return default
[docs] def get_actions(self): """Get all permitted actions as a list of Action objects""" return list(self.permitted_actions.__dict__.values())
[docs] def get_item_actions(self, action, item): raise Exception(f"No valid method of retrieving action ({action}) for item {item}.")
[docs] def get_private_user_role(self, user): raise Exception("Unimplemented Method")
[docs]class CommunityRBACAgent(RBACAgent):
[docs] def __init__(self, model, permitted_actions=None): self.model = model if permitted_actions: self.permitted_actions = permitted_actions
@property def sa_session(self): """Returns a SQLAlchemy session""" return self.model.context
[docs] def allow_action(self, roles, action, item): """ Method for checking a permission for the current user ( based on roles ) to perform a specific action on an item """ item_actions = self.get_item_actions(action, item) if not item_actions: return action.model == "restrict" ret_val = False for item_action in item_actions: if item_action.role in roles: ret_val = True break return ret_val
[docs] def associate_components(self, **kwd): if "user" in kwd: if "group" in kwd: return self.associate_user_group(kwd["user"], kwd["group"]) elif "role" in kwd: return self.associate_user_role(kwd["user"], kwd["role"]) elif "role" in kwd: if "group" in kwd: return self.associate_group_role(kwd["group"], kwd["role"]) elif "repository" in kwd: return self.associate_repository_category(kwd["repository"], kwd["category"]) raise Exception(f"No valid method of associating provided components: {kwd}")
[docs] def associate_group_role(self, group, role): assoc = self.model.GroupRoleAssociation(group, role) self.sa_session.add(assoc) self.sa_session.flush() return assoc
[docs] def associate_user_group(self, user, group): assoc = self.model.UserGroupAssociation(user, group) self.sa_session.add(assoc) self.sa_session.flush() return assoc
[docs] def associate_user_role(self, user, role): assoc = self.model.UserRoleAssociation(user, role) self.sa_session.add(assoc) self.sa_session.flush() return assoc
[docs] def associate_repository_category(self, repository, category): assoc = self.model.RepositoryCategoryAssociation(repository, category) self.sa_session.add(assoc) self.sa_session.flush() return assoc
[docs] def create_private_user_role(self, user): # Create private role role = self.model.Role( name=user.email, description=f"Private Role for {user.email}", type=self.model.Role.types.PRIVATE ) self.sa_session.add(role) self.sa_session.flush() # Add user to role self.associate_components(role=role, user=user) return role
[docs] def create_user_role(self, user, app): self.get_private_user_role(user, auto_create=True)
[docs] def get_item_actions(self, action, item): # item must be one of: Dataset, Library, LibraryFolder, LibraryDataset, LibraryDatasetDatasetAssociation return [permission for permission in item.actions if permission.action == action.action]
[docs] def get_private_user_role(self, user, auto_create=False): role = ( self.sa_session.query(self.model.Role) .filter( and_( self.model.Role.table.c.name == user.email, self.model.Role.table.c.type == self.model.Role.types.PRIVATE, ) ) .first() ) if not role: if auto_create: return self.create_private_user_role(user) else: return None return role
[docs] def set_entity_group_associations(self, groups=None, users=None, roles=None, delete_existing_assocs=True): if groups is None: groups = [] if users is None: users = [] if roles is None: roles = [] for group in groups: if delete_existing_assocs: for a in group.roles + group.users: self.sa_session.delete(a) self.sa_session.flush() for role in roles: self.associate_components(group=group, role=role) for user in users: self.associate_components(group=group, user=user)
[docs] def set_entity_role_associations( self, roles=None, users=None, groups=None, repositories=None, delete_existing_assocs=True ): if roles is None: roles = [] if users is None: users = [] if groups is None: groups = [] if repositories is None: repositories = [] for role in roles: if delete_existing_assocs: for a in role.users + role.groups: self.sa_session.delete(a) self.sa_session.flush() for user in users: self.associate_components(user=user, role=role) for group in groups: self.associate_components(group=group, role=role)
[docs] def set_entity_user_associations(self, users=None, roles=None, groups=None, delete_existing_assocs=True): if users is None: users = [] if roles is None: roles = [] if groups is None: groups = [] for user in users: if delete_existing_assocs: for a in user.non_private_roles + user.groups: self.sa_session.delete(a) self.sa_session.flush() self.sa_session.refresh(user) for role in roles: # Make sure we are not creating an additional association with a PRIVATE role if role not in user.roles: self.associate_components(user=user, role=role) for group in groups: self.associate_components(user=user, group=group)
[docs] def can_push(self, app, user, repository): if user: return user.username in listify(repository.allow_push()) return False
[docs] def user_can_administer_repository(self, user, repository): """Return True if the received user can administer the received repository.""" if user: if repository: repository_admin_role = repository.admin_role for rra in repository.roles: role = rra.role if role.id == repository_admin_role.id: # We have the repository's admin role, so see if the user is associated with it. for ura in role.users: role_member = ura.user if role_member.id == user.id: return True # The user is not directly associated with the role, so see if they are a member # of a group that is associated with the role. for gra in role.groups: group = gra.group for uga in group.users: member = uga.user if member.id == user.id: return True return False
[docs] def user_can_import_repository_archive(self, user, archive_owner): # This method should be called only if the current user is not an admin. if user.username == archive_owner: return True # A member of the IUC is authorized to create new repositories that are owned by another user. iuc_group = ( self.sa_session.query(self.model.Group) .filter( and_( self.model.Group.table.c.name == "Intergalactic Utilities Commission", self.model.Group.table.c.deleted == false(), ) ) .first() ) if iuc_group is not None: for uga in iuc_group.users: if uga.user.id == user.id: return True return False
[docs]def get_permitted_actions(filter=None): """Utility method to return a subset of RBACAgent's permitted actions""" if filter is None: return RBACAgent.permitted_actions tmp_bunch = Bunch() [tmp_bunch.__dict__.__setitem__(k, v) for k, v in RBACAgent.permitted_actions.items() if k.startswith(filter)] return tmp_bunch