Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.webapp.security
"""Tool Shed Security"""
import logging
from typing import List
from sqlalchemy import (
false,
select,
)
from galaxy.model.base import transaction
from galaxy.util import listify
from galaxy.util.bunch import Bunch
from tool_shed.webapp.model import (
Group,
Role,
)
IUC_NAME = "Intergalactic Utilities Commission"
log = logging.getLogger(__name__)
[docs]class Action:
[docs] def __init__(self, action, description, model):
self.action = action
self.description = description
self.model = model
[docs]class RBACAgent:
"""Handle Galaxy Tool Shed security"""
permitted_actions = Bunch()
[docs] def associate_components(self, **kwd):
raise Exception(f"No valid method of associating provided components: {kwd}")
[docs] def associate_user_role(self, user, role):
raise Exception("No valid method of associating a user with a role")
[docs] def convert_permitted_action_strings(self, permitted_action_strings):
"""
When getting permitted actions from an untrusted source like a
form, ensure that they match our actual permitted actions.
"""
return [
x
for x in [self.permitted_actions.get(action_string) for action_string in permitted_action_strings]
if x is not None
]
[docs] def get_action(self, name, default=None):
"""Get a permitted action by its dict key or action name"""
for k, v in self.permitted_actions.items():
if k == name or v.action == name:
return v
return default
[docs] def get_actions(self):
"""Get all permitted actions as a list of Action objects"""
return list(self.permitted_actions.__dict__.values())
[docs] def get_item_actions(self, action, item):
raise Exception(f"No valid method of retrieving action ({action}) for item {item}.")
[docs]class CommunityRBACAgent(RBACAgent):
[docs] def __init__(self, model, permitted_actions=None):
self.model = model
if permitted_actions:
self.permitted_actions = permitted_actions
@property
def sa_session(self):
"""Returns a SQLAlchemy scoped session"""
return self.model.context
[docs] def allow_action(self, roles, action, item):
"""
Method for checking a permission for the current user ( based on roles ) to perform a
specific action on an item
"""
item_actions = self.get_item_actions(action, item)
if not item_actions:
return action.model == "restrict"
ret_val = False
for item_action in item_actions:
if item_action.role in roles:
ret_val = True
break
return ret_val
[docs] def associate_components(self, **kwd):
if "user" in kwd:
if "group" in kwd:
return self.associate_user_group(kwd["user"], kwd["group"])
elif "role" in kwd:
return self.associate_user_role(kwd["user"], kwd["role"])
elif "role" in kwd:
if "group" in kwd:
return self.associate_group_role(kwd["group"], kwd["role"])
elif "repository" in kwd:
return self.associate_repository_category(kwd["repository"], kwd["category"])
raise Exception(f"No valid method of associating provided components: {kwd}")
[docs] def associate_group_role(self, group, role):
assoc = self.model.GroupRoleAssociation(group, role)
self.sa_session.add(assoc)
session = self.sa_session()
with transaction(session):
session.commit()
return assoc
[docs] def associate_user_group(self, user, group):
assoc = self.model.UserGroupAssociation(user, group)
self.sa_session.add(assoc)
session = self.sa_session()
with transaction(session):
session.commit()
return assoc
[docs] def associate_user_role(self, user, role):
assoc = self.model.UserRoleAssociation(user, role)
self.sa_session.add(assoc)
session = self.sa_session()
with transaction(session):
session.commit()
return assoc
[docs] def associate_repository_category(self, repository, category):
assoc = self.model.RepositoryCategoryAssociation(repository, category)
self.sa_session.add(assoc)
session = self.sa_session()
with transaction(session):
session.commit()
return assoc
[docs] def create_private_user_role(self, user):
# Create private role
role = self.model.Role(
name=user.email, description=f"Private Role for {user.email}", type=self.model.Role.types.PRIVATE
)
self.sa_session.add(role)
session = self.sa_session()
with transaction(session):
session.commit()
# Add user to role
self.associate_components(role=role, user=user)
return role
[docs] def get_item_actions(self, action, item):
# item must be one of: Dataset, Library, LibraryFolder, LibraryDataset, LibraryDatasetDatasetAssociation
return [permission for permission in item.actions if permission.action == action.action]
[docs] def get_private_user_role(self, user, auto_create=False):
role = _get_private_user_role(self.sa_session, user.email)
if not role:
if auto_create:
return self.create_private_user_role(user)
else:
return None
return role
[docs] def set_entity_group_associations(self, groups=None, users=None, roles=None, delete_existing_assocs=True):
if groups is None:
groups = []
if users is None:
users = []
if roles is None:
roles = []
for group in groups:
if delete_existing_assocs:
for a in group.roles + group.users:
self.sa_session.delete(a)
session = self.sa_session()
with transaction(session):
session.commit()
for role in roles:
self.associate_components(group=group, role=role)
for user in users:
self.associate_components(group=group, user=user)
[docs] def set_entity_role_associations(
self, roles=None, users=None, groups=None, repositories=None, delete_existing_assocs=True
):
if roles is None:
roles = []
if users is None:
users = []
if groups is None:
groups = []
if repositories is None:
repositories = []
for role in roles:
if delete_existing_assocs:
for a in role.users + role.groups:
self.sa_session.delete(a)
session = self.sa_session()
with transaction(session):
session.commit()
for user in users:
self.associate_components(user=user, role=role)
for group in groups:
self.associate_components(group=group, role=role)
[docs] def set_entity_user_associations(self, users=None, roles=None, groups=None, delete_existing_assocs=True):
if users is None:
users = []
if roles is None:
roles = []
if groups is None:
groups = []
for user in users:
if delete_existing_assocs:
for a in user.non_private_roles + user.groups:
self.sa_session.delete(a)
session = self.sa_session()
with transaction(session):
session.commit()
self.sa_session.refresh(user)
for role in roles:
# Make sure we are not creating an additional association with a PRIVATE role
if role not in user.roles:
self.associate_components(user=user, role=role)
for group in groups:
self.associate_components(user=user, group=group)
[docs] def usernames_that_can_push(self, repository) -> List[str]:
return listify(repository.allow_push())
[docs] def can_push(self, app, user, repository):
if user:
return user.username in self.usernames_that_can_push(repository)
return False
[docs] def user_can_administer_repository(self, user, repository):
"""Return True if the received user can administer the received repository."""
if user:
if repository:
repository_admin_role = repository.admin_role
for rra in repository.roles:
role = rra.role
if role.id == repository_admin_role.id:
# We have the repository's admin role, so see if the user is associated with it.
for ura in role.users:
role_member = ura.user
if role_member.id == user.id:
return True
# The user is not directly associated with the role, so see if they are a member
# of a group that is associated with the role.
for gra in role.groups:
group = gra.group
for uga in group.users:
member = uga.user
if member.id == user.id:
return True
return False
[docs] def user_can_import_repository_archive(self, user, archive_owner):
# This method should be called only if the current user is not an admin.
if user.username == archive_owner:
return True
# A member of the IUC is authorized to create new repositories that are owned by another user.
iuc_group = get_iuc_group(self.sa_session)
if iuc_group is not None:
for uga in iuc_group.users:
if uga.user.id == user.id:
return True
return False
[docs]def get_permitted_actions(filter=None):
"""Utility method to return a subset of RBACAgent's permitted actions"""
if filter is None:
return RBACAgent.permitted_actions
tmp_bunch = Bunch()
[tmp_bunch.__dict__.__setitem__(k, v) for k, v in RBACAgent.permitted_actions.items() if k.startswith(filter)]
return tmp_bunch
[docs]def get_iuc_group(session):
stmt = select(Group).where(Group.name == IUC_NAME).where(Group.deleted == false()).limit(1)
return session.scalars(stmt).first()
def _get_private_user_role(session, user_email):
stmt = select(Role).where(Role.name == user_email).where(Role.type == Role.types.PRIVATE).limit(1)
return session.scalars(stmt).first()