Source code for galaxy.managers.folders

"""
Manager and Serializer for Library Folders.
"""
import logging
from dataclasses import dataclass
from typing import (
    List,
    Optional,
    Tuple,
    Union,
)

from sqlalchemy import (
    and_,
    false,
    func,
    not_,
    or_,
)
from sqlalchemy.orm import aliased
from sqlalchemy.orm.exc import (
    MultipleResultsFound,
    NoResultFound,
)

from galaxy import (
    model,
    util,
)
from galaxy.exceptions import (
    AuthenticationRequired,
    InconsistentDatabase,
    InsufficientPermissionsException,
    InternalServerError,
    ItemAccessibilityException,
    MalformedId,
    RequestParameterInvalidException,
)
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.schema.schema import LibraryFolderContentsIndexQueryPayload
from galaxy.security import RBACAgent
from galaxy.security.idencoding import IdEncodingHelper

log = logging.getLogger(__name__)


[docs]@dataclass class SecurityParams: """Contains security data bundled for reusability.""" user_role_ids: List[model.Role] security_agent: RBACAgent is_admin: bool
LDDA_SORT_COLUMN_MAP = { "name": lambda ldda, dataset: ldda.name, "description": lambda ldda, dataset: ldda.message, "type": lambda ldda, dataset: ldda.extension, "size": lambda ldda, dataset: dataset.file_size, "update_time": lambda ldda, dataset: ldda.update_time, } FOLDER_SORT_COLUMN_MAP = { "name": lambda folder: folder.name, "description": lambda folder: folder.description, "update_time": lambda folder: folder.update_time, } # =============================================================================
[docs]class FolderManager: """ Interface/service object for interacting with folders. """
[docs] def get(self, trans, decoded_folder_id: int, check_manageable: bool = False, check_accessible: bool = True): """ Get the folder from the DB. :param decoded_folder_id: decoded folder id :param check_manageable: flag whether the check that user can manage item :param check_accessible: flag whether to check that user can access item :returns: the requested folder :rtype: LibraryFolder :raises: InconsistentDatabase, RequestParameterInvalidException, InternalServerError """ try: folder = ( trans.sa_session.query(trans.app.model.LibraryFolder) .filter(trans.app.model.LibraryFolder.table.c.id == decoded_folder_id) .one() ) except MultipleResultsFound: raise InconsistentDatabase("Multiple folders found with the same id.") except NoResultFound: raise RequestParameterInvalidException("No folder found with the id provided.") except Exception as e: raise InternalServerError(f"Error loading from the database.{util.unicodify(e)}") folder = self.secure(trans, folder, check_manageable, check_accessible) return folder
[docs] def secure(self, trans, folder, check_manageable=True, check_accessible=True): """ Check if (a) user can manage folder or (b) folder is accessible to user. :param folder: folder item :type folder: LibraryFolder :param check_manageable: flag whether to check that user can manage item :type check_manageable: bool :param check_accessible: flag whether to check that user can access item :type check_accessible: bool :returns: the original folder :rtype: LibraryFolder """ # all folders are accessible to an admin if trans.user_is_admin: return folder if check_manageable: folder = self.check_manageable(trans, folder) if check_accessible: folder = self.check_accessible(trans, folder) return folder
[docs] def check_modifyable(self, trans, folder): """ Check whether the user can modify the folder (name and description). :returns: the original folder :rtype: LibraryFolder :raises: AuthenticationRequired, InsufficientPermissionsException """ if not trans.user: raise AuthenticationRequired("Must be logged in to manage Galaxy items.", type="error") current_user_roles = trans.get_current_user_roles() if not trans.app.security_agent.can_modify_library_item(current_user_roles, folder): raise InsufficientPermissionsException("You don't have permissions to modify this folder.", type="error") else: return folder
[docs] def check_manageable(self, trans, folder): """ Check whether the user can manage the folder. :returns: the original folder :rtype: LibraryFolder :raises: AuthenticationRequired, InsufficientPermissionsException """ if not trans.user: raise AuthenticationRequired("Must be logged in to manage Galaxy items.", type="error") current_user_roles = trans.get_current_user_roles() if not trans.app.security_agent.can_manage_library_item(current_user_roles, folder): raise InsufficientPermissionsException("You don't have permissions to manage this folder.", type="error") else: return folder
[docs] def check_accessible(self, trans, folder): """ Check whether the folder is accessible to current user. By default every folder is accessible (contents have their own permissions). """ return folder
[docs] def get_folder_dict(self, trans, folder): """ Return folder data in the form of a dictionary. :param folder: folder item :type folder: LibraryFolder :returns: dict with data about the folder :rtype: dictionary """ folder_dict = folder.to_dict(view="element") folder_dict = trans.security.encode_all_ids(folder_dict, True) folder_dict["id"] = f"F{folder_dict['id']}" if folder_dict["parent_id"] is not None: folder_dict["parent_id"] = f"F{folder_dict['parent_id']}" folder_dict["update_time"] = folder.update_time return folder_dict
[docs] def create(self, trans, parent_folder_id, new_folder_name, new_folder_description=""): """ Create a new folder under the given folder. :param parent_folder_id: decoded id :type parent_folder_id: int :param new_folder_name: name of the new folder :type new_folder_name: str :param new_folder_description: description of the folder (optional, defaults to empty string) :type new_folder_description: str :returns: the new folder :rtype: LibraryFolder :raises: InsufficientPermissionsException """ parent_folder = self.get(trans, parent_folder_id) current_user_roles = trans.get_current_user_roles() if not ( trans.user_is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, parent_folder) ): raise InsufficientPermissionsException( "You do not have proper permission to create folders under given folder." ) new_folder = trans.app.model.LibraryFolder(name=new_folder_name, description=new_folder_description) # We are associating the last used genome build with folders, so we will always # initialize a new folder with the first dbkey in genome builds list which is currently # ? unspecified (?) new_folder.genome_build = trans.app.genome_builds.default_value parent_folder.add_folder(new_folder) trans.sa_session.add(new_folder) trans.sa_session.flush() # New folders default to having the same permissions as their parent folder trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder) return new_folder
[docs] def update(self, trans, folder, name=None, description=None): """ Update the given folder's name or description. :param folder: the model object :type folder: LibraryFolder :param name: new name for the library folder :type name: str :param description: new description for the library folder :type description: str :returns: the folder :rtype: LibraryFolder :raises: ItemAccessibilityException, InsufficientPermissionsException """ changed = False if not trans.user_is_admin: folder = self.check_modifyable(trans, folder) if folder.deleted is True: raise ItemAccessibilityException("You cannot update a deleted library folder. Undelete it first.") if name is not None and name != folder.name: folder.name = name changed = True if description is not None and description != folder.description: folder.description = description changed = True if changed: trans.sa_session.add(folder) trans.sa_session.flush() return folder
[docs] def delete(self, trans, folder, undelete=False): """ Mark given folder deleted/undeleted based on the flag. :param folder: the model object :type folder: LibraryFolder :param undelete: flag whether to delete (when False) or undelete :type undelete: Bool :returns: the folder :rtype: LibraryFolder :raises: ItemAccessibilityException """ if not trans.user_is_admin: folder = self.check_manageable(trans, folder) if undelete: folder.deleted = False else: folder.deleted = True trans.sa_session.add(folder) trans.sa_session.flush() return folder
[docs] def get_current_roles(self, trans, folder): """ Find all roles currently connected to relevant permissions on the folder. :param folder: the model object :type folder: LibraryFolder :returns: dict of current roles for all available permission types :rtype: dictionary """ # Omit duplicated roles by converting to set modify_roles = set( trans.app.security_agent.get_roles_for_action( folder, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY ) ) manage_roles = set( trans.app.security_agent.get_roles_for_action( folder, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE ) ) add_roles = set( trans.app.security_agent.get_roles_for_action( folder, trans.app.security_agent.permitted_actions.LIBRARY_ADD ) ) modify_folder_role_list = [ (modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in modify_roles ] manage_folder_role_list = [ (manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in manage_roles ] add_library_item_role_list = [(add_role.name, trans.security.encode_id(add_role.id)) for add_role in add_roles] return dict( modify_folder_role_list=modify_folder_role_list, manage_folder_role_list=manage_folder_role_list, add_library_item_role_list=add_library_item_role_list, )
[docs] def can_add_item(self, trans, folder): """ Return true if the user has permissions to add item to the given folder. """ if trans.user_is_admin: return True current_user_roles = trans.get_current_user_roles() add_roles = set( trans.app.security_agent.get_roles_for_action( folder, trans.app.security_agent.permitted_actions.LIBRARY_ADD ) ) for role in current_user_roles: if role in add_roles: return True return False
[docs] def cut_the_prefix(self, encoded_folder_id): """ Remove the prefix from the encoded folder id. :param encoded_folder_id: encoded id of the Folder object with 'F' prepended :type encoded_folder_id: string :returns: encoded Folder id without the 'F' prefix :rtype: string :raises: MalformedId """ if (len(encoded_folder_id) % 16 == 1) and encoded_folder_id.startswith("F"): cut_id = encoded_folder_id[1:] else: raise MalformedId(f"Malformed folder id ( {str(encoded_folder_id)} ) specified, unable to decode.") return cut_id
[docs] def decode_folder_id(self, trans, encoded_folder_id): """ Decode the folder id given that it has already lost the prefixed 'F'. :param encoded_folder_id: encoded id of the Folder object :type encoded_folder_id: string :returns: decoded Folder id :rtype: int :raises: MalformedId """ return trans.security.decode_id(encoded_folder_id, object_name="folder")
[docs] def cut_and_decode(self, trans, encoded_folder_id): """ Cuts the folder prefix (the prepended 'F') and returns the decoded id. :param encoded_folder_id: encoded id of the Folder object :type encoded_folder_id: string :returns: decoded Folder id :rtype: int """ return self.decode_folder_id(trans, self.cut_the_prefix(encoded_folder_id))
[docs] def get_contents( self, trans, folder: model.LibraryFolder, payload: LibraryFolderContentsIndexQueryPayload, ) -> Tuple[List[Union[model.LibraryFolder, model.LibraryDataset]], int]: """Retrieves the contents of the given folder that match the provided filters and pagination parameters. Returns a tuple with the list of paginated contents and the total number of items contained in the folder.""" limit = payload.limit offset = payload.offset sa_session = trans.sa_session security_params = SecurityParams( user_role_ids=[role.id for role in trans.get_current_user_roles()], security_agent=trans.app.security_agent, is_admin=trans.user_is_admin, ) content_items: List[Union[model.LibraryFolder, model.LibraryDataset]] = [] sub_folders_query = self._get_sub_folders_query(sa_session, folder, security_params, payload) total_sub_folders: int = sub_folders_query.count() if payload.order_by in FOLDER_SORT_COLUMN_MAP: sort_column = FOLDER_SORT_COLUMN_MAP[payload.order_by](model.LibraryFolder) sub_folders_query = sub_folders_query.order_by(sort_column.desc() if payload.sort_desc else sort_column) else: # Sort by name alphabetically by default sub_folders_query = sub_folders_query.order_by(model.LibraryFolder.name) if limit is not None and limit > 0: sub_folders_query = sub_folders_query.limit(limit) if offset is not None: sub_folders_query = sub_folders_query.offset(offset) folders = sub_folders_query.all() content_items.extend(folders) # Update pagination num_folders_returned = len(folders) num_folders_skipped = total_sub_folders - num_folders_returned if limit is not None and limit > 0: limit -= num_folders_returned if offset: offset -= num_folders_skipped offset = max(0, offset) datasets_query = self._get_contained_datasets_query(sa_session, folder, security_params, payload) total_datasets = datasets_query.count() if limit is not None and limit > 0: datasets_query = datasets_query.limit(limit) if offset is not None: datasets_query = datasets_query.offset(offset) datasets = datasets_query.all() content_items.extend(datasets) return (content_items, total_sub_folders + total_datasets)
def _get_sub_folders_query( self, sa_session: galaxy_scoped_session, folder: model.LibraryFolder, security: SecurityParams, payload: LibraryFolderContentsIndexQueryPayload, ): """Builds a query to retrieve all the sub-folders contained in the given folder applying filters.""" item_model = model.LibraryFolder item_permission_model = model.LibraryFolderPermissions search_text = payload.search_text query = sa_session.query(item_model) query = query.filter(item_model.parent_id == folder.id) query = self._filter_by_include_deleted( query, item_model, item_permission_model, payload.include_deleted, security ) if search_text: search_text = search_text.lower() query = query.filter( or_( func.lower(item_model.name).contains(search_text, autoescape=True), func.lower(item_model.description).contains(search_text, autoescape=True), ) ) query = query.group_by(item_model.id) return query def _get_contained_datasets_query( self, sa_session: galaxy_scoped_session, folder: model.LibraryFolder, security: SecurityParams, payload: LibraryFolderContentsIndexQueryPayload, ): """Builds a query to retrieve all the datasets contained in the given folder applying filters.""" search_text = payload.search_text item_model = model.LibraryDataset item_permission_model = model.LibraryDatasetPermissions access_action = security.security_agent.permitted_actions.DATASET_ACCESS.action query = sa_session.query(item_model) query = query.filter(item_model.folder_id == folder.id) query = self._filter_by_include_deleted( query, item_model, item_permission_model, payload.include_deleted, security ) ldda = aliased(model.LibraryDatasetDatasetAssociation) associated_dataset = aliased(model.Dataset) query = query.outerjoin(item_model.library_dataset_dataset_association.of_type(ldda)) if not security.is_admin: # Non-admin users require ACCESS permission # We check against the actual dataset and not the ldda (for now?) dataset_permission = aliased(model.DatasetPermissions) is_public_dataset = not_( sa_session.query(model.DatasetPermissions) .filter( model.DatasetPermissions.dataset_id == associated_dataset.id, model.DatasetPermissions.action == access_action, ) .exists() ) query = query.outerjoin(ldda.dataset.of_type(associated_dataset)) query = query.outerjoin(associated_dataset.actions.of_type(dataset_permission)) query = query.filter( or_( # The dataset is public is_public_dataset, # The user has explicit access and_( dataset_permission.action == access_action, dataset_permission.role_id.in_(security.user_role_ids), ), ) ) if search_text: search_text = search_text.lower() query = query.filter( or_( func.lower(ldda.name).contains(search_text, autoescape=True), func.lower(ldda.message).contains(search_text, autoescape=True), ) ) sort_column = LDDA_SORT_COLUMN_MAP[payload.order_by](ldda, associated_dataset) query = query.order_by(sort_column.desc() if payload.sort_desc else sort_column) query = query.group_by(item_model.id, sort_column) return query def _filter_by_include_deleted( self, query, item_model, item_permissions_model, include_deleted: Optional[bool], security: SecurityParams ): if include_deleted: # Admins or users with MODIFY permissions can see deleted contents if not security.is_admin: item_permission = aliased(item_permissions_model) query = query.outerjoin(item_model.actions.of_type(item_permission)) query = query.filter( or_( item_model.deleted == false(), # Is not deleted # User has MODIFY permission and_( item_permission.action == security.security_agent.permitted_actions.LIBRARY_MODIFY.action, item_permission.role_id.in_(security.user_role_ids), ), ) ) else: query = query.filter(item_model.deleted == false()) return query
[docs] def build_folder_path( self, sa_session: galaxy_scoped_session, security: IdEncodingHelper, folder: model.LibraryFolder ) -> List[Tuple[str, str]]: """ Returns the folder path from root to the given folder. The path items are tuples with the name and id of each folder for breadcrumb building purposes. """ current_folder = folder path_to_root = [(f"F{security.encode_id(current_folder.id)}", current_folder.name)] while current_folder.parent_id is not None: parent_folder = sa_session.query(model.LibraryFolder).get(current_folder.parent_id) current_folder = parent_folder path_to_root.insert(0, (f"F{security.encode_id(current_folder.id)}", current_folder.name)) return path_to_root