import logging
import galaxy.exceptions
from galaxy import (
model,
security,
)
from galaxy.managers import users
from galaxy.model.base import transaction
log = logging.getLogger(__name__)
[docs]class RBACPermissionFailedException(galaxy.exceptions.InsufficientPermissionsException):
pass
[docs]class RBACPermission:
"""
Base class for wrangling/controlling the permissions ORM models (Permissions, Roles)
that control which users can perform certain actions on their associated models
(Libraries, Datasets).
"""
permissions_class: type
permission_failed_error_class = RBACPermissionFailedException
[docs] def __init__(self, app):
self.app = app
self.user_manager = users.UserManager(app)
[docs] def session(self):
return self.app.model.context
# TODO: implement group
# TODO: how does admin play into this?
[docs] def is_permitted(self, item, user, trans=None):
raise NotImplementedError("abstract parent class")
[docs] def error_unless_permitted(self, item, user, trans=None):
if not self.is_permitted(item, user, trans=trans):
error_info = dict(model_class=item.__class__, id=getattr(item, "id", None))
raise self.permission_failed_error_class(**error_info)
[docs] def grant(self, item, user, flush=True):
raise NotImplementedError("abstract parent class")
[docs] def revoke(self, item, user, flush=True):
raise NotImplementedError("abstract parent class")
def _role_is_permitted(self, item, role):
raise NotImplementedError("abstract parent class")
def _error_unless_role_permitted(self, item, role):
if not self._role_is_permitted(item, role):
error_info = dict(model_class=item.__class__, id=getattr(item, "id", None))
raise self.permission_failed_error_class(**error_info)
def _grant_role(self, item, role, flush=True):
raise NotImplementedError("abstract parent class")
def _revoke_role(self, item, role, flush=True):
raise NotImplementedError("abstract parent class")
[docs]class DatasetRBACPermission(RBACPermission):
"""
Base class for the manage and access RBAC permissions used by dataset security.
The DatasetPermissions used by the RBAC agent are associations between a Dataset
and a single Role.
DatasetPermissions are typed (but not polymorphic themselves) by a string 'action'.
There are two types:
- manage permissions : can a role manage the permissions on a dataset
- access : can a role read/look at/copy a dataset
"""
permissions_class = model.DatasetPermissions
action_name = None
# ---- double secrect probation
def __assert_action(self):
if not self.action_name:
raise NotImplementedError("abstract parent class needs action_name")
# ---- interface
[docs] def by_dataset(self, dataset):
self.__assert_action()
all_permissions = self._all_types_by_dataset(dataset)
return list(filter(lambda p: p.action == self.action_name, all_permissions))
[docs] def by_roles(self, dataset, roles):
permissions = self.by_dataset(dataset)
return list(filter(lambda p: p.role in roles, permissions))
[docs] def by_role(self, dataset, role):
permissions = self.by_dataset(dataset)
found = list(filter(lambda p: p.role == role, permissions))
if not found:
return None
if len(found) > 1:
raise galaxy.exceptions.InconsistentDatabase(dataset=dataset.id, role=role.id)
return found[0]
[docs] def set(self, dataset, roles, flush=True):
# NOTE: this removes all previous permissions of this type
self.clear(dataset, flush=False)
permissions = []
for role in roles:
permissions.append(self._create(dataset, role, flush=False))
if flush:
session = self.session()
with transaction(session):
session.commit()
return permissions
[docs] def clear(self, dataset, flush=True):
permissions = self.by_dataset(dataset)
return self._delete(permissions, flush=flush)
# ---- private
def _create(self, dataset, role, flush=True):
permission = self.permissions_class(self.action_name, dataset, role)
self.session().add(permission)
if flush:
session = self.session()
with transaction(session):
session.commit()
return permission
def _roles(self, dataset):
return [permission.role for permission in self.by_dataset(dataset)]
def _all_types_by_dataset(self, dataset):
return dataset.actions
# as a general rule, DatasetPermissions are considered disposable
# and there is no reason to update the models
# TODO: list?
def _delete(self, permissions, flush=True):
for permission in permissions:
if permission in self.session().new:
self.session().expunge(permission)
else:
self.session().delete(permission)
if flush:
session = self.session()
with transaction(session):
session.commit()
def _revoke_role(self, dataset, role, flush=True):
role_permissions = self.by_roles(dataset, [role])
return self._delete(role_permissions, flush=flush)
[docs]def iterable_has_all(iterable, has_these):
for item in has_these:
if item not in iterable:
return False
return True
[docs]class DatasetManagePermissionFailedException(RBACPermissionFailedException):
pass
[docs]class ManageDatasetRBACPermission(DatasetRBACPermission):
"""
A class that controls the dataset permissions that control
who can manage that dataset's permissions.
When checking permissions for a user, if any of the user's roles
have permission on the dataset
"""
# TODO: We may also be able to infer/record the dataset 'owner' as well.
action_name = security.RBACAgent.permitted_actions.get("DATASET_MANAGE_PERMISSIONS").action
permission_failed_error_class = DatasetManagePermissionFailedException
# ---- interface
[docs] def is_permitted(self, dataset, user, trans=None):
if trans and trans.user_is_admin:
return True
# anonymous users cannot manage permissions on datasets
if self.user_manager.is_anonymous(user):
return False
# admin is always permitted
# TODO: could probably move this into RBACPermission and call that first
if self.user_manager.is_admin(user):
return True
for role in user.all_roles():
if self._role_is_permitted(dataset, role):
return True
return False
[docs] def grant(self, dataset, user, flush=True):
private_role = self._user_private_role(user)
return self._grant_role(dataset, private_role, flush=flush)
[docs] def revoke(self, dataset, user, flush=True):
private_role = self._user_private_role(user)
return self._revoke_role(dataset, private_role, flush=flush)
# ---- private
def _role_is_permitted(self, dataset, role):
return role in self._roles(dataset)
def _user_private_role(self, user):
# error with 401 if no user
self.user_manager.error_if_anonymous(user)
return self.user_manager.private_role(user)
def _grant_role(self, dataset, role, flush=True):
if existing := self.by_role(dataset, role):
return existing
return self._create(dataset, role, flush=flush)
def _revoke_role(self, dataset, role, flush=True):
permission = self.by_roles(dataset, [role])
return self._delete([permission], flush=flush)
[docs]class DatasetAccessPermissionFailedException(RBACPermissionFailedException):
pass
[docs]class AccessDatasetRBACPermission(DatasetRBACPermission):
"""
A class to manage access permissions on a dataset.
An user must have all the Roles of all the access permissions associated
with a dataset in order to access it.
"""
action_name = security.RBACAgent.permitted_actions.get("DATASET_ACCESS").action
permission_failed_error_class = DatasetAccessPermissionFailedException
# ---- interface
[docs] def is_permitted(self, dataset, user, trans=None):
if trans and trans.user_is_admin:
return True
current_roles = self._roles(dataset)
# NOTE: that because of short circuiting this allows
# anonymous access to public datasets
return (
self._is_public_based_on_roles(current_roles)
or self.user_manager.is_admin(user) # admin is always permitted
or self._user_has_all_roles(user, current_roles)
)
[docs] def grant(self, item, user):
pass
# not so easy
# need to check for a sharing role
# then add the new user to it
[docs] def revoke(self, item, user):
pass
# not so easy
# TODO: these are a lil off message
[docs] def is_public(self, dataset):
current_roles = self._roles(dataset)
return self._is_public_based_on_roles(current_roles)
[docs] def set_private(self, dataset, user, flush=True):
private_role = self.user_manager.private_role(user)
return self.set(dataset, [private_role], flush=flush)
# ---- private
def _is_public_based_on_roles(self, roles):
return len(roles) == 0
def _user_has_all_roles(self, user, roles):
user_roles = []
if not self.user_manager.is_anonymous(user):
user_roles = user.all_roles()
return iterable_has_all(user_roles, roles)
def _role_is_permitted(self, dataset, role):
current_roles = self._roles(dataset)
return (
self._is_public_based_on_roles(current_roles)
# if there's only one role and this is it, let em in
or ((len(current_roles) == 1) and (role == current_roles[0]))
)