"""
Manager and Serializer for Library Folders.
"""
import logging
from dataclasses import dataclass
from typing import (
List,
Optional,
Tuple,
Union,
)
from sqlalchemy import (
and_,
exists,
false,
func,
not_,
or_,
select,
)
from sqlalchemy.exc import (
MultipleResultsFound,
NoResultFound,
)
from sqlalchemy.orm import aliased
from galaxy import (
model,
util,
)
from galaxy.exceptions import (
AuthenticationRequired,
InconsistentDatabase,
InsufficientPermissionsException,
InternalServerError,
ItemAccessibilityException,
MalformedId,
RequestParameterInvalidException,
)
from galaxy.model import (
Dataset,
DatasetPermissions,
LibraryDataset,
LibraryDatasetDatasetAssociation,
LibraryDatasetPermissions,
LibraryFolder,
LibraryFolderPermissions,
)
from galaxy.model.base import transaction
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.schema.schema import LibraryFolderContentsIndexQueryPayload
from galaxy.security import RBACAgent
log = logging.getLogger(__name__)
[docs]@dataclass
class SecurityParams:
"""Contains security data bundled for reusability."""
user_role_ids: List[model.Role]
security_agent: RBACAgent
is_admin: bool
LDDA_SORT_COLUMN_MAP = {
"name": lambda ldda, dataset: ldda.name,
"description": lambda ldda, dataset: ldda.message,
"type": lambda ldda, dataset: ldda.extension,
"size": lambda ldda, dataset: dataset.file_size,
"update_time": lambda ldda, dataset: ldda.update_time,
}
FOLDER_SORT_COLUMN_MAP = {
"name": lambda folder: folder.name,
"description": lambda folder: folder.description,
"update_time": lambda folder: folder.update_time,
}
# =============================================================================
[docs]class FolderManager:
"""
Interface/service object for interacting with folders.
"""
[docs] def get(self, trans, decoded_folder_id: int, check_manageable: bool = False, check_accessible: bool = True):
"""
Get the folder from the DB.
:param decoded_folder_id: decoded folder id
:param check_manageable: flag whether the check that user can manage item
:param check_accessible: flag whether to check that user can access item
:returns: the requested folder
:rtype: LibraryFolder
:raises: InconsistentDatabase, RequestParameterInvalidException, InternalServerError
"""
try:
folder = get_folder(trans.sa_session, decoded_folder_id)
except MultipleResultsFound:
raise InconsistentDatabase("Multiple folders found with the same id.")
except NoResultFound:
raise RequestParameterInvalidException("No folder found with the id provided.")
except Exception as e:
raise InternalServerError(f"Error loading from the database.{util.unicodify(e)}")
folder = self.secure(trans, folder, check_manageable, check_accessible)
return folder
[docs] def secure(self, trans, folder, check_manageable=True, check_accessible=True):
"""
Check if (a) user can manage folder or (b) folder is accessible to user.
:param folder: folder item
:type folder: LibraryFolder
:param check_manageable: flag whether to check that user can manage item
:type check_manageable: bool
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the original folder
:rtype: LibraryFolder
"""
# all folders are accessible to an admin
if trans.user_is_admin:
return folder
if check_manageable:
folder = self.check_manageable(trans, folder)
if check_accessible:
folder = self.check_accessible(trans, folder)
return folder
[docs] def check_modifyable(self, trans, folder):
"""
Check whether the user can modify the folder (name and description).
:returns: the original folder
:rtype: LibraryFolder
:raises: AuthenticationRequired, InsufficientPermissionsException
"""
if not trans.user:
raise AuthenticationRequired("Must be logged in to manage Galaxy items.", type="error")
current_user_roles = trans.get_current_user_roles()
if not trans.app.security_agent.can_modify_library_item(current_user_roles, folder):
raise InsufficientPermissionsException("You don't have permissions to modify this folder.", type="error")
else:
return folder
[docs] def check_manageable(self, trans, folder):
"""
Check whether the user can manage the folder.
:returns: the original folder
:rtype: LibraryFolder
:raises: AuthenticationRequired, InsufficientPermissionsException
"""
if not trans.user:
raise AuthenticationRequired("Must be logged in to manage Galaxy items.", type="error")
current_user_roles = trans.get_current_user_roles()
if not trans.app.security_agent.can_manage_library_item(current_user_roles, folder):
raise InsufficientPermissionsException("You don't have permissions to manage this folder.", type="error")
else:
return folder
[docs] def check_accessible(self, trans, folder):
"""
Check whether the folder is accessible to current user.
By default every folder is accessible (contents have their own permissions).
"""
return folder
[docs] def get_folder_dict(self, trans, folder):
"""
Return folder data in the form of a dictionary.
:param folder: folder item
:type folder: LibraryFolder
:returns: dict with data about the folder
:rtype: dictionary
"""
folder_dict = folder.to_dict(view="element")
folder_dict["update_time"] = folder.update_time
return folder_dict
[docs] def create(self, trans, parent_folder_id, new_folder_name, new_folder_description=""):
"""
Create a new folder under the given folder.
:param parent_folder_id: decoded id
:type parent_folder_id: int
:param new_folder_name: name of the new folder
:type new_folder_name: str
:param new_folder_description: description of the folder (optional, defaults to empty string)
:type new_folder_description: str
:returns: the new folder
:rtype: LibraryFolder
:raises: InsufficientPermissionsException
"""
parent_folder = self.get(trans, parent_folder_id)
current_user_roles = trans.get_current_user_roles()
if not (
trans.user_is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, parent_folder)
):
raise InsufficientPermissionsException(
"You do not have proper permission to create folders under given folder."
)
new_folder = LibraryFolder(name=new_folder_name, description=new_folder_description)
# We are associating the last used genome build with folders, so we will always
# initialize a new folder with the first dbkey in genome builds list which is currently
# ? unspecified (?)
new_folder.genome_build = trans.app.genome_builds.default_value
parent_folder.add_folder(new_folder)
trans.sa_session.add(new_folder)
with transaction(trans.sa_session):
trans.sa_session.commit()
# New folders default to having the same permissions as their parent folder
trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder)
return new_folder
[docs] def update(self, trans, folder, name=None, description=None):
"""
Update the given folder's name or description.
:param folder: the model object
:type folder: LibraryFolder
:param name: new name for the library folder
:type name: str
:param description: new description for the library folder
:type description: str
:returns: the folder
:rtype: LibraryFolder
:raises: ItemAccessibilityException, InsufficientPermissionsException
"""
changed = False
if not trans.user_is_admin:
folder = self.check_modifyable(trans, folder)
if folder.deleted is True:
raise ItemAccessibilityException("You cannot update a deleted library folder. Undelete it first.")
if name is not None and name != folder.name:
folder.name = name
changed = True
if description is not None and description != folder.description:
folder.description = description
changed = True
if changed:
trans.sa_session.add(folder)
with transaction(trans.sa_session):
trans.sa_session.commit()
return folder
[docs] def delete(self, trans, folder, undelete=False):
"""
Mark given folder deleted/undeleted based on the flag.
:param folder: the model object
:type folder: LibraryFolder
:param undelete: flag whether to delete (when False) or undelete
:type undelete: Bool
:returns: the folder
:rtype: LibraryFolder
:raises: ItemAccessibilityException
"""
if not trans.user_is_admin:
folder = self.check_manageable(trans, folder)
if undelete:
folder.deleted = False
else:
folder.deleted = True
trans.sa_session.add(folder)
with transaction(trans.sa_session):
trans.sa_session.commit()
return folder
[docs] def get_current_roles(self, trans, folder):
"""
Find all roles currently connected to relevant permissions
on the folder.
:param folder: the model object
:type folder: LibraryFolder
:returns: dict of current roles for all available permission types
:rtype: dictionary
"""
# Omit duplicated roles by converting to set
modify_roles = set(
trans.app.security_agent.get_roles_for_action(
folder, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY
)
)
manage_roles = set(
trans.app.security_agent.get_roles_for_action(
folder, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE
)
)
add_roles = set(
trans.app.security_agent.get_roles_for_action(
folder, trans.app.security_agent.permitted_actions.LIBRARY_ADD
)
)
modify_folder_role_list = [
(modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in modify_roles
]
manage_folder_role_list = [
(manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in manage_roles
]
add_library_item_role_list = [(add_role.name, trans.security.encode_id(add_role.id)) for add_role in add_roles]
return dict(
modify_folder_role_list=modify_folder_role_list,
manage_folder_role_list=manage_folder_role_list,
add_library_item_role_list=add_library_item_role_list,
)
[docs] def can_add_item(self, trans, folder):
"""
Return true if the user has permissions to add item to the given folder.
"""
if trans.user_is_admin:
return True
current_user_roles = trans.get_current_user_roles()
add_roles = set(
trans.app.security_agent.get_roles_for_action(
folder, trans.app.security_agent.permitted_actions.LIBRARY_ADD
)
)
for role in current_user_roles:
if role in add_roles:
return True
return False
[docs] def cut_the_prefix(self, encoded_folder_id):
"""
Remove the prefix from the encoded folder id.
:param encoded_folder_id: encoded id of the Folder object with 'F' prepended
:type encoded_folder_id: string
:returns: encoded Folder id without the 'F' prefix
:rtype: string
:raises: MalformedId
"""
if (len(encoded_folder_id) % 16 == 1) and encoded_folder_id.startswith("F"):
cut_id = encoded_folder_id[1:]
else:
raise MalformedId(f"Malformed folder id ( {str(encoded_folder_id)} ) specified, unable to decode.")
return cut_id
[docs] def decode_folder_id(self, trans, encoded_folder_id):
"""
Decode the folder id given that it has already lost the prefixed 'F'.
:param encoded_folder_id: encoded id of the Folder object
:type encoded_folder_id: string
:returns: decoded Folder id
:rtype: int
:raises: MalformedId
"""
return trans.security.decode_id(encoded_folder_id, object_name="folder")
[docs] def cut_and_decode(self, trans, encoded_folder_id):
"""
Cuts the folder prefix (the prepended 'F') and returns the decoded id.
:param encoded_folder_id: encoded id of the Folder object
:type encoded_folder_id: string
:returns: decoded Folder id
:rtype: int
"""
return self.decode_folder_id(trans, self.cut_the_prefix(encoded_folder_id))
[docs] def get_contents(
self,
trans,
folder: LibraryFolder,
payload: LibraryFolderContentsIndexQueryPayload,
) -> Tuple[List[Union[LibraryFolder, LibraryDataset]], int]:
"""Retrieves the contents of the given folder that match the provided filters and pagination parameters.
Returns a tuple with the list of paginated contents and the total number of items contained in the folder."""
limit = payload.limit
offset = payload.offset
sa_session = trans.sa_session
security_params = SecurityParams(
user_role_ids=[role.id for role in trans.get_current_user_roles()],
security_agent=trans.app.security_agent,
is_admin=trans.user_is_admin,
)
content_items: List[Union[LibraryFolder, LibraryDataset]] = []
sub_folders_stmt = self._get_sub_folders_statement(sa_session, folder, security_params, payload)
total_sub_folders = get_count(sa_session, sub_folders_stmt)
if payload.order_by in FOLDER_SORT_COLUMN_MAP:
sort_column = FOLDER_SORT_COLUMN_MAP[payload.order_by](LibraryFolder)
sub_folders_stmt = sub_folders_stmt.order_by(sort_column.desc() if payload.sort_desc else sort_column)
else: # Sort by name alphabetically by default
sub_folders_stmt = sub_folders_stmt.order_by(LibraryFolder.name)
if limit is not None and limit > 0:
sub_folders_stmt = sub_folders_stmt.limit(limit)
if offset is not None:
sub_folders_stmt = sub_folders_stmt.offset(offset)
folders = sa_session.scalars(sub_folders_stmt).all()
content_items.extend(folders)
# Update pagination
num_folders_returned = len(folders)
num_folders_skipped = total_sub_folders - num_folders_returned
if limit is not None and limit > 0:
limit -= num_folders_returned
if offset:
offset -= num_folders_skipped
offset = max(0, offset)
datasets_stmt = self._get_contained_datasets_statement(sa_session, folder, security_params, payload)
total_datasets = get_count(sa_session, datasets_stmt)
if limit is not None and limit > 0:
datasets_stmt = datasets_stmt.limit(limit)
if offset is not None:
datasets_stmt = datasets_stmt.offset(offset)
datasets = sa_session.scalars(datasets_stmt).all()
content_items.extend(datasets)
return (content_items, total_sub_folders + total_datasets)
def _get_sub_folders_statement(
self,
sa_session: galaxy_scoped_session,
folder: LibraryFolder,
security: SecurityParams,
payload: LibraryFolderContentsIndexQueryPayload,
):
"""Builds a query to retrieve all the sub-folders contained in the given folder applying filters."""
search_text = payload.search_text
stmt = select(LibraryFolder).where(LibraryFolder.parent_id == folder.id)
stmt = self._filter_by_include_deleted(
stmt, LibraryFolder, LibraryFolderPermissions, payload.include_deleted, security
)
if search_text:
search_text = search_text.lower()
stmt = stmt.where(
or_(
func.lower(LibraryFolder.name).contains(search_text, autoescape=True),
func.lower(LibraryFolder.description).contains(search_text, autoescape=True),
)
)
stmt = stmt.group_by(LibraryFolder.id)
return stmt
def _get_contained_datasets_statement(
self,
sa_session: galaxy_scoped_session,
folder: LibraryFolder,
security: SecurityParams,
payload: LibraryFolderContentsIndexQueryPayload,
):
"""Builds a query to retrieve all the datasets contained in the given folder applying filters."""
search_text = payload.search_text
access_action = security.security_agent.permitted_actions.DATASET_ACCESS.action
stmt = select(LibraryDataset).where(LibraryDataset.folder_id == folder.id)
stmt = self._filter_by_include_deleted(
stmt, LibraryDataset, LibraryDatasetPermissions, payload.include_deleted, security
)
ldda = aliased(LibraryDatasetDatasetAssociation)
associated_dataset = aliased(Dataset)
stmt = stmt.outerjoin(LibraryDataset.library_dataset_dataset_association.of_type(ldda))
if not security.is_admin: # Non-admin users require ACCESS permission
# We check against the actual dataset and not the ldda (for now?)
dataset_permission = aliased(DatasetPermissions)
is_public_dataset = not_(
exists()
.where(DatasetPermissions.dataset_id == associated_dataset.id)
.where(DatasetPermissions.action == access_action)
)
stmt = stmt.outerjoin(ldda.dataset.of_type(associated_dataset))
stmt = stmt.outerjoin(associated_dataset.actions.of_type(dataset_permission))
stmt = stmt.where(
or_(
# The dataset is public
is_public_dataset,
# The user has explicit access
and_(
dataset_permission.action == access_action,
dataset_permission.role_id.in_(security.user_role_ids),
),
)
)
if search_text:
search_text = search_text.lower()
stmt = stmt.where(
or_(
func.lower(ldda.name).contains(search_text, autoescape=True),
func.lower(ldda.message).contains(search_text, autoescape=True), # type:ignore[attr-defined]
)
)
sort_column = LDDA_SORT_COLUMN_MAP[payload.order_by](ldda, associated_dataset)
stmt = stmt.order_by(sort_column.desc() if payload.sort_desc else sort_column)
stmt = stmt.group_by(LibraryDataset.id, sort_column)
return stmt
def _filter_by_include_deleted(
self, stmt, item_model, item_permissions_model, include_deleted: Optional[bool], security: SecurityParams
):
if include_deleted: # Admins or users with MODIFY permissions can see deleted contents
if not security.is_admin:
item_permission = aliased(item_permissions_model)
stmt = stmt.outerjoin(item_model.actions.of_type(item_permission))
stmt = stmt.where(
or_(
item_model.deleted == false(), # Is not deleted
# User has MODIFY permission
and_(
item_permission.action == security.security_agent.permitted_actions.LIBRARY_MODIFY.action,
item_permission.role_id.in_(security.user_role_ids),
),
)
)
else:
stmt = stmt.where(item_model.deleted == false())
return stmt
[docs] def build_folder_path(
self, sa_session: galaxy_scoped_session, folder: model.LibraryFolder
) -> List[Tuple[int, Optional[str]]]:
"""
Returns the folder path from root to the given folder.
The path items are tuples with the name and id of each folder for breadcrumb building purposes.
"""
current_folder = folder
path_to_root = [(current_folder.id, current_folder.name)]
while current_folder.parent_id is not None:
parent_folder = sa_session.get(LibraryFolder, current_folder.parent_id)
assert parent_folder
current_folder = parent_folder
path_to_root.insert(0, (current_folder.id, current_folder.name))
return path_to_root
[docs]def get_folder(session, folder_id):
stmt = select(LibraryFolder).where(LibraryFolder.id == folder_id)
return session.execute(stmt).scalar_one()
[docs]def get_count(session, statement):
stmt = select(func.count()).select_from(statement)
return session.scalar(stmt)