Source code for galaxy.managers.libraries

"""
Manager and Serializer for libraries.
"""
import logging
from typing import Optional

from sqlalchemy import (
    and_,
    false,
    not_,
    or_,
    true,
)
from sqlalchemy.orm.exc import (
    MultipleResultsFound,
    NoResultFound,
)

from galaxy import exceptions
from galaxy.managers.folders import FolderManager
from galaxy.util import (
    pretty_print_time_interval,
    unicodify,
)

log = logging.getLogger(__name__)


# =============================================================================
[docs]class LibraryManager: """ Interface/service object for interacting with libraries. """
[docs] def get(self, trans, decoded_library_id, check_accessible=True): """ Get the library from the DB. :param decoded_library_id: decoded library id :type decoded_library_id: int :param check_accessible: flag whether to check that user can access item :type check_accessible: bool :returns: the requested library :rtype: galaxy.model.Library """ try: library = ( trans.sa_session.query(trans.app.model.Library) .filter(trans.app.model.Library.table.c.id == decoded_library_id) .one() ) except MultipleResultsFound: raise exceptions.InconsistentDatabase("Multiple libraries found with the same id.") except NoResultFound: raise exceptions.RequestParameterInvalidException("No library found with the id provided.") except Exception as e: raise exceptions.InternalServerError(f"Error loading from the database.{unicodify(e)}") library = self.secure(trans, library, check_accessible) return library
[docs] def create(self, trans, name, description="", synopsis=""): """ Create a new library. """ if not trans.user_is_admin: raise exceptions.ItemAccessibilityException("Only administrators can create libraries.") else: library = trans.app.model.Library(name=name, description=description, synopsis=synopsis) root_folder = trans.app.model.LibraryFolder(name=name, description="") library.root_folder = root_folder trans.sa_session.add_all((library, root_folder)) trans.sa_session.flush() return library
[docs] def update(self, trans, library, name=None, description=None, synopsis=None): """ Update the given library """ changed = False if not trans.user_is_admin: current_user_roles = trans.get_current_user_roles() library_modify_roles = self.get_modify_roles(trans, library) user_can_modify = any(role in library_modify_roles for role in current_user_roles) if not user_can_modify: raise exceptions.ItemAccessibilityException("You don't have permission update libraries.") if library.deleted: raise exceptions.RequestParameterInvalidException("You cannot modify a deleted library. Undelete it first.") if name is not None: library.name = name changed = True # When library is renamed the root folder has to be renamed too. folder_manager = FolderManager() folder_manager.update(trans, library.root_folder, name=name) if description is not None: library.description = description changed = True if synopsis is not None: library.synopsis = synopsis changed = True if changed: trans.sa_session.add(library) trans.sa_session.flush() return library
[docs] def delete(self, trans, library, undelete=False): """ Mark given library deleted/undeleted based on the flag. """ if not trans.user_is_admin: raise exceptions.ItemAccessibilityException("Only administrators can delete and undelete libraries.") if undelete: library.deleted = False else: library.deleted = True trans.sa_session.add(library) trans.sa_session.flush() return library
[docs] def list(self, trans, deleted: Optional[bool] = False): """ Return a list of libraries from the DB. :param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted`` :type deleted: boolean (optional) :returns: query that will emit all accessible libraries :rtype: sqlalchemy query :returns: dict of 3 sets with available actions for user's accessible libraries and a set of ids of all public libraries. These are used for limiting the number of queries when dictifying the libraries later on. :rtype: dict """ is_admin = trans.user_is_admin query = trans.sa_session.query(trans.app.model.Library) library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action restricted_library_ids = { lp.library_id for lp in ( trans.sa_session.query(trans.model.LibraryPermissions) .filter(trans.model.LibraryPermissions.table.c.action == library_access_action) .distinct() ) } prefetched_ids = {"restricted_library_ids": restricted_library_ids} if is_admin: if deleted is None: # Flag is not specified, do not filter on it. pass elif deleted: query = query.filter(trans.app.model.Library.table.c.deleted == true()) else: query = query.filter(trans.app.model.Library.table.c.deleted == false()) else: # Nonadmins can't see deleted libraries if deleted: raise exceptions.AdminRequiredException() else: query = query.filter(trans.app.model.Library.table.c.deleted == false()) current_user_role_ids = [role.id for role in trans.get_current_user_roles()] all_actions = trans.sa_session.query(trans.model.LibraryPermissions).filter( trans.model.LibraryPermissions.table.c.role_id.in_(current_user_role_ids) ) library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action accessible_restricted_library_ids = set() allowed_library_add_ids = set() allowed_library_modify_ids = set() allowed_library_manage_ids = set() for action in all_actions: if action.action == library_access_action: accessible_restricted_library_ids.add(action.library_id) if action.action == library_add_action: allowed_library_add_ids.add(action.library_id) if action.action == library_modify_action: allowed_library_modify_ids.add(action.library_id) if action.action == library_manage_action: allowed_library_manage_ids.add(action.library_id) query = query.filter( or_( not_(trans.model.Library.table.c.id.in_(restricted_library_ids)), trans.model.Library.table.c.id.in_(accessible_restricted_library_ids), ) ) prefetched_ids["allowed_library_add_ids"] = allowed_library_add_ids prefetched_ids["allowed_library_modify_ids"] = allowed_library_modify_ids prefetched_ids["allowed_library_manage_ids"] = allowed_library_manage_ids return query, prefetched_ids
[docs] def secure(self, trans, library, check_accessible=True): """ Check if library is accessible to user. :param library: library :type library: galaxy.model.Library :param check_accessible: flag whether to check that user can access library :type check_accessible: bool :returns: the original library :rtype: galaxy.model.Library """ # all libraries are accessible to an admin if trans.user_is_admin: return library if check_accessible: library = self.check_accessible(trans, library) return library
[docs] def check_accessible(self, trans, library): """ Check whether the library is accessible to current user. """ if not trans.app.security_agent.can_access_library(trans.get_current_user_roles(), library): raise exceptions.ObjectNotFound("Library with the id provided was not found.") elif library.deleted: raise exceptions.ObjectNotFound("Library with the id provided is deleted.") else: return library
[docs] def get_library_dict(self, trans, library, prefetched_ids=None): """ Return library data in the form of a dictionary. :param library: library :type library: galaxy.model.Library :param prefetched_ids: dict of 3 sets with available actions for user's accessible libraries and a set of ids of all public libraries. These are used for limiting the number of queries when dictifying a set of libraries. :type prefetched_ids: dict :returns: dict with data about the library :rtype: dictionary """ restricted_library_ids = prefetched_ids.get("restricted_library_ids", None) if prefetched_ids else None allowed_library_add_ids = prefetched_ids.get("allowed_library_add_ids", None) if prefetched_ids else None allowed_library_modify_ids = prefetched_ids.get("allowed_library_modify_ids", None) if prefetched_ids else None allowed_library_manage_ids = prefetched_ids.get("allowed_library_manage_ids", None) if prefetched_ids else None library_dict = library.to_dict( view="element", value_mapper={"id": trans.security.encode_id, "root_folder_id": trans.security.encode_id} ) library_dict["public"] = False if (restricted_library_ids and library.id in restricted_library_ids) else True library_dict["create_time_pretty"] = pretty_print_time_interval(library.create_time, precise=True) if not trans.user_is_admin: if prefetched_ids: library_dict["can_user_add"] = ( True if (allowed_library_add_ids and library.id in allowed_library_add_ids) else False ) library_dict["can_user_modify"] = ( True if (allowed_library_modify_ids and library.id in allowed_library_modify_ids) else False ) library_dict["can_user_manage"] = ( True if (allowed_library_manage_ids and library.id in allowed_library_manage_ids) else False ) else: current_user_roles = trans.get_current_user_roles() library_dict["can_user_add"] = trans.app.security_agent.can_add_library_item( current_user_roles, library ) library_dict["can_user_modify"] = trans.app.security_agent.can_modify_library_item( current_user_roles, library ) library_dict["can_user_manage"] = trans.app.security_agent.can_manage_library_item( current_user_roles, library ) else: library_dict["can_user_add"] = True library_dict["can_user_modify"] = True library_dict["can_user_manage"] = True return library_dict
[docs] def get_current_roles(self, trans, library): """ Load all permissions currently related to the given library. :param library: the model object :type library: galaxy.model.Library :rtype: dictionary :returns: dict of current roles for all available permission types """ access_library_role_list = [ (access_role.name, trans.security.encode_id(access_role.id)) for access_role in self.get_access_roles(trans, library) ] modify_library_role_list = [ (modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in self.get_modify_roles(trans, library) ] manage_library_role_list = [ (manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in self.get_manage_roles(trans, library) ] add_library_item_role_list = [ (add_role.name, trans.security.encode_id(add_role.id)) for add_role in self.get_add_roles(trans, library) ] return dict( access_library_role_list=access_library_role_list, modify_library_role_list=modify_library_role_list, manage_library_role_list=manage_library_role_list, add_library_item_role_list=add_library_item_role_list, )
[docs] def get_access_roles(self, trans, library): """ Load access roles for all library permissions """ return set(library.get_access_roles(trans.app.security_agent))
[docs] def get_modify_roles(self, trans, library): """ Load modify roles for all library permissions """ return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY ) )
[docs] def get_manage_roles(self, trans, library): """ Load manage roles for all library permissions """ return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE ) )
[docs] def get_add_roles(self, trans, library): """ Load add roles for all library permissions """ return set( trans.app.security_agent.get_roles_for_action( library, trans.app.security_agent.permitted_actions.LIBRARY_ADD ) )
[docs] def set_permission_roles(self, trans, library, access_roles, modify_roles, manage_roles, add_roles): """ Set permissions on the given library. """
[docs] def make_public(self, trans, library): """ Makes the given library public (removes all access roles) """ trans.app.security_agent.make_library_public(library) return self.is_public(trans, library)
[docs] def is_public(self, trans, library): """ Return true if lib is public. """ return trans.app.security_agent.library_is_public(library)
[docs]def get_containing_library_from_library_dataset(trans, library_dataset): """Given a library_dataset, get the containing library""" folder = library_dataset.folder while folder.parent: folder = folder.parent # We have folder set to the library's root folder, which has the same name as the library for library in trans.sa_session.query(trans.model.Library).filter( and_(trans.model.Library.table.c.deleted == false(), trans.model.Library.table.c.name == folder.name) ): # Just to double-check if library.root_folder == folder: return library return None