Source code for galaxy.managers.libraries

"""
Manager and Serializer for libraries.
"""

import logging
from typing import (
    Dict,
    Optional,
    Set,
    Tuple,
)

from sqlalchemy import (
    asc,
    false,
    func,
    not_,
    or_,
    select,
    true,
)
from sqlalchemy.exc import (
    MultipleResultsFound,
    NoResultFound,
)
from sqlalchemy.orm import Query

from galaxy import exceptions
from galaxy.managers.folders import FolderManager
from galaxy.model import (
    Library,
    LibraryPermissions,
    Role,
)
from galaxy.model.base import transaction
from galaxy.util import (
    pretty_print_time_interval,
    unicodify,
)

log = logging.getLogger(__name__)


# =============================================================================
[docs]class LibraryManager: """ Interface/service object for interacting with libraries. """
[docs] def get(self, trans, decoded_library_id: int, check_accessible: bool = True) -> Library: """ Get the library from the DB. :param decoded_library_id: decoded library id :type decoded_library_id: int :param check_accessible: flag whether to check that user can access item :type check_accessible: bool :returns: the requested library :rtype: galaxy.model.Library """ try: library = get_library(trans.sa_session, decoded_library_id) except MultipleResultsFound: raise exceptions.InconsistentDatabase("Multiple libraries found with the same id.") except NoResultFound: raise exceptions.RequestParameterInvalidException("No library found with the id provided.") except Exception as e: raise exceptions.InternalServerError(f"Error loading from the database.{unicodify(e)}") library = self.secure(trans, library, check_accessible) return library
[docs] def create(self, trans, name: str, description: Optional[str] = "", synopsis: Optional[str] = "") -> Library: """ Create a new library. """ if not trans.user_is_admin: raise exceptions.ItemAccessibilityException("Only administrators can create libraries.") else: library = trans.app.model.Library(name=name, description=description, synopsis=synopsis) root_folder = trans.app.model.LibraryFolder(name=name, description="") library.root_folder = root_folder trans.sa_session.add_all((library, root_folder)) with transaction(trans.sa_session): trans.sa_session.commit() return library
[docs] def update( self, trans, library: Library, name: Optional[str] = None, description: Optional[str] = None, synopsis: Optional[str] = None, ) -> Library: """ Update the given library """ changed = False if not trans.user_is_admin: current_user_roles = trans.get_current_user_roles() library_modify_roles = self.get_modify_roles(trans, library) user_can_modify = any(role in library_modify_roles for role in current_user_roles) if not user_can_modify: raise exceptions.ItemAccessibilityException("You don't have permission update libraries.") if library.deleted: raise exceptions.RequestParameterInvalidException("You cannot modify a deleted library. Undelete it first.") if name is not None: library.name = name changed = True # When library is renamed the root folder has to be renamed too. folder_manager = FolderManager() folder_manager.update(trans, library.root_folder, name=name) if description is not None: library.description = description changed = True if synopsis is not None: library.synopsis = synopsis changed = True if changed: trans.sa_session.add(library) with transaction(trans.sa_session): trans.sa_session.commit() return library
[docs] def delete(self, trans, library: Library, undelete: Optional[bool] = False) -> Library: """ Mark given library deleted/undeleted based on the flag. """ if not trans.user_is_admin: raise exceptions.ItemAccessibilityException("Only administrators can delete and undelete libraries.") if undelete: library.deleted = False else: library.deleted = True trans.sa_session.add(library) with transaction(trans.sa_session): trans.sa_session.commit() return library
[docs] def list(self, trans, deleted: Optional[bool] = False) -> Tuple[Query, Dict[str, Set]]: """ Return a list of libraries from the DB. :param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted`` :type deleted: boolean (optional) :returns: iterable that will emit all accessible libraries :rtype: sqlalchemy ScalarResult :returns: dict of 3 sets with available actions for user's accessible libraries and a set of ids of all public libraries. These are used for limiting the number of queries when dictifying the libraries later on. :rtype: dict """ is_admin = trans.user_is_admin library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action restricted_library_ids = set(get_library_ids(trans.sa_session, library_access_action)) prefetched_ids = {"restricted_library_ids": restricted_library_ids} if is_admin: libraries = get_libraries_for_admins(trans.sa_session, deleted=deleted) else: # Nonadmins can't see deleted libraries if deleted: raise exceptions.AdminRequiredException() current_user_role_ids = [role.id for role in trans.get_current_user_roles()] library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action accessible_restricted_library_ids = set() allowed_library_add_ids = set() allowed_library_modify_ids = set() allowed_library_manage_ids = set() for action in get_library_permissions_by_role(trans.sa_session, current_user_role_ids): if action.action == library_access_action: accessible_restricted_library_ids.add(action.library_id) if action.action == library_add_action: allowed_library_add_ids.add(action.library_id) if action.action == library_modify_action: allowed_library_modify_ids.add(action.library_id) if action.action == library_manage_action: allowed_library_manage_ids.add(action.library_id) prefetched_ids["allowed_library_add_ids"] = allowed_library_add_ids prefetched_ids["allowed_library_modify_ids"] = allowed_library_modify_ids prefetched_ids["allowed_library_manage_ids"] = allowed_library_manage_ids libraries = get_libraries_for_nonadmins( trans.sa_session, restricted_library_ids, accessible_restricted_library_ids ) return libraries, prefetched_ids
[docs] def secure(self, trans, library: Library, check_accessible: bool = True) -> Library: """ Check if library is accessible to user. :param library: library :type library: galaxy.model.Library :param check_accessible: flag whether to check that user can access library :type check_accessible: bool :returns: the original library :rtype: galaxy.model.Library """ # all libraries are accessible to an admin if trans.user_is_admin: return library if check_accessible: library = self.check_accessible(trans, library) return library
[docs] def check_accessible(self, trans, library: Library) -> Library: """ Check whether the library is accessible to current user. """ if not trans.app.security_agent.can_access_library(trans.get_current_user_roles(), library): raise exceptions.ObjectNotFound("Library with the id provided was not found.") elif library.deleted: raise exceptions.ObjectNotFound("Library with the id provided is deleted.") else: return library
[docs] def get_library_dict(self, trans, library: Library, prefetched_ids: Optional[Dict[str, Set]] = None) -> dict: """ Return library data in the form of a dictionary. :param library: library :type library: galaxy.model.Library :param prefetched_ids: dict of 3 sets with available actions for user's accessible libraries and a set of ids of all public libraries. These are used for limiting the number of queries when dictifying a set of libraries. :type prefetched_ids: dict :returns: dict with data about the library :rtype: dictionary """ restricted_library_ids = prefetched_ids.get("restricted_library_ids", None) if prefetched_ids else None allowed_library_add_ids = prefetched_ids.get("allowed_library_add_ids", None) if prefetched_ids else None allowed_library_modify_ids = prefetched_ids.get("allowed_library_modify_ids", None) if prefetched_ids else None allowed_library_manage_ids = prefetched_ids.get("allowed_library_manage_ids", None) if prefetched_ids else None library_dict = library.to_dict(view="element") library_dict["public"] = False if (restricted_library_ids and library.id in restricted_library_ids) else True library_dict["create_time_pretty"] = pretty_print_time_interval(library.create_time, precise=True) if not trans.user_is_admin: if prefetched_ids: library_dict["can_user_add"] = ( True if (allowed_library_add_ids and library.id in allowed_library_add_ids) else False ) library_dict["can_user_modify"] = ( True if (allowed_library_modify_ids and library.id in allowed_library_modify_ids) else False ) library_dict["can_user_manage"] = ( True if (allowed_library_manage_ids and library.id in allowed_library_manage_ids) else False ) else: current_user_roles = trans.get_current_user_roles() library_dict["can_user_add"] = trans.app.security_agent.can_add_library_item( current_user_roles, library ) library_dict["can_user_modify"] = trans.app.security_agent.can_modify_library_item( current_user_roles, library ) library_dict["can_user_manage"] = trans.app.security_agent.can_manage_library_item( current_user_roles, library ) else: library_dict["can_user_add"] = True library_dict["can_user_modify"] = True library_dict["can_user_manage"] = True return library_dict
[docs] def get_current_roles(self, trans, library: Library) -> dict: """ Load all permissions currently related to the given library. :param library: the model object :type library: galaxy.model.Library :rtype: dictionary :returns: dict of current roles for all available permission types """ access_library_role_list = [ (access_role.name, trans.security.encode_id(access_role.id)) for access_role in self.get_access_roles(trans, library) ] modify_library_role_list = [ (modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in self.get_modify_roles(trans, library) ] manage_library_role_list = [ (manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in self.get_manage_roles(trans, library) ] add_library_item_role_list = [ (add_role.name, trans.security.encode_id(add_role.id)) for add_role in self.get_add_roles(trans, library) ] return dict( access_library_role_list=access_library_role_list, modify_library_role_list=modify_library_role_list, manage_library_role_list=manage_library_role_list, add_library_item_role_list=add_library_item_role_list, )
[docs] def get_access_roles(self, trans, library: Library) -> Set[Role]: """ Load access roles for all library permissions """ return set(library.get_access_roles(trans.app.security_agent))
[docs] def get_modify_roles(self, trans, library: Library) -> Set[Role]: """ Load modify roles for all library permissions """ return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY ) )
[docs] def get_manage_roles(self, trans, library: Library) -> Set[Role]: """ Load manage roles for all library permissions """ return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE ) )
[docs] def get_add_roles(self, trans, library: Library) -> Set[Role]: """ Load add roles for all library permissions """ return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_ADD ) )
[docs] def make_public(self, trans, library: Library) -> bool: """ Makes the given library public (removes all access roles) """ trans.app.security_agent.make_library_public(library) return self.is_public(trans, library)
[docs] def is_public(self, trans, library: Library) -> bool: """ Return true if lib is public. """ return trans.app.security_agent.library_is_public(library)
[docs]def get_containing_library_from_library_dataset(trans, library_dataset) -> Optional[Library]: """Given a library_dataset, get the containing library""" folder = library_dataset.folder while folder.parent: folder = folder.parent # We have folder set to the library's root folder, which has the same name as the library stmt = select(Library).where(Library.deleted == false()).where(Library.name == folder.name) for library in trans.sa_session.scalars(stmt): # Just to double-check if library.root_folder == folder: return library return None
[docs]def get_library(session, library_id): stmt = select(Library).where(Library.id == library_id) return session.execute(stmt).scalar_one()
[docs]def get_library_ids(session, library_access_action): stmt = select(LibraryPermissions.library_id).where(LibraryPermissions.action == library_access_action).distinct() return session.scalars(stmt)
[docs]def get_library_permissions_by_role(session, role_ids): stmt = select(LibraryPermissions).where(LibraryPermissions.role_id.in_(role_ids)) return session.scalars(stmt)
[docs]def get_libraries_for_admins(session, deleted): stmt = select(Library) if deleted is None: # Flag is not specified, do not filter on it. pass elif deleted: stmt = stmt.where(Library.deleted == true()) else: stmt = stmt.where(Library.deleted == false()) stmt = stmt.order_by(asc(func.lower(Library.name))) return session.scalars(stmt)
[docs]def get_libraries_for_nonadmins(session, restricted_library_ids, accessible_restricted_library_ids): stmt = ( select(Library) .where(Library.deleted == false()) .where( or_( not_(Library.id.in_(restricted_library_ids)), Library.id.in_(accessible_restricted_library_ids), ) ) ) stmt = stmt.order_by(asc(func.lower(Library.name))) return session.scalars(stmt)