Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.libraries
"""
Manager and Serializer for libraries.
"""
import logging
from typing import (
Dict,
Optional,
Set,
Tuple,
)
from sqlalchemy import (
asc,
false,
func,
not_,
or_,
select,
true,
)
from sqlalchemy.orm import Query
from sqlalchemy.orm.exc import (
MultipleResultsFound,
NoResultFound,
)
from galaxy import exceptions
from galaxy.managers.folders import FolderManager
from galaxy.model import (
Library,
LibraryPermissions,
Role,
)
from galaxy.model.base import transaction
from galaxy.util import (
pretty_print_time_interval,
unicodify,
)
log = logging.getLogger(__name__)
# =============================================================================
[docs]class LibraryManager:
"""
Interface/service object for interacting with libraries.
"""
[docs] def get(self, trans, decoded_library_id: int, check_accessible: bool = True) -> Library:
"""
Get the library from the DB.
:param decoded_library_id: decoded library id
:type decoded_library_id: int
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the requested library
:rtype: galaxy.model.Library
"""
try:
library = get_library(trans.sa_session, decoded_library_id)
except MultipleResultsFound:
raise exceptions.InconsistentDatabase("Multiple libraries found with the same id.")
except NoResultFound:
raise exceptions.RequestParameterInvalidException("No library found with the id provided.")
except Exception as e:
raise exceptions.InternalServerError(f"Error loading from the database.{unicodify(e)}")
library = self.secure(trans, library, check_accessible)
return library
[docs] def create(self, trans, name: str, description: Optional[str] = "", synopsis: Optional[str] = "") -> Library:
"""
Create a new library.
"""
if not trans.user_is_admin:
raise exceptions.ItemAccessibilityException("Only administrators can create libraries.")
else:
library = trans.app.model.Library(name=name, description=description, synopsis=synopsis)
root_folder = trans.app.model.LibraryFolder(name=name, description="")
library.root_folder = root_folder
trans.sa_session.add_all((library, root_folder))
with transaction(trans.sa_session):
trans.sa_session.commit()
return library
[docs] def update(
self,
trans,
library: Library,
name: Optional[str] = None,
description: Optional[str] = None,
synopsis: Optional[str] = None,
) -> Library:
"""
Update the given library
"""
changed = False
if not trans.user_is_admin:
current_user_roles = trans.get_current_user_roles()
library_modify_roles = self.get_modify_roles(trans, library)
user_can_modify = any(role in library_modify_roles for role in current_user_roles)
if not user_can_modify:
raise exceptions.ItemAccessibilityException("You don't have permission update libraries.")
if library.deleted:
raise exceptions.RequestParameterInvalidException("You cannot modify a deleted library. Undelete it first.")
if name is not None:
library.name = name
changed = True
# When library is renamed the root folder has to be renamed too.
folder_manager = FolderManager()
folder_manager.update(trans, library.root_folder, name=name)
if description is not None:
library.description = description
changed = True
if synopsis is not None:
library.synopsis = synopsis
changed = True
if changed:
trans.sa_session.add(library)
with transaction(trans.sa_session):
trans.sa_session.commit()
return library
[docs] def delete(self, trans, library: Library, undelete: Optional[bool] = False) -> Library:
"""
Mark given library deleted/undeleted based on the flag.
"""
if not trans.user_is_admin:
raise exceptions.ItemAccessibilityException("Only administrators can delete and undelete libraries.")
if undelete:
library.deleted = False
else:
library.deleted = True
trans.sa_session.add(library)
with transaction(trans.sa_session):
trans.sa_session.commit()
return library
[docs] def list(self, trans, deleted: Optional[bool] = False) -> Tuple[Query, Dict[str, Set]]:
"""
Return a list of libraries from the DB.
:param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
:type deleted: boolean (optional)
:returns: iterable that will emit all accessible libraries
:rtype: sqlalchemy ScalarResult
:returns: dict of 3 sets with available actions for user's accessible
libraries and a set of ids of all public libraries. These are
used for limiting the number of queries when dictifying the
libraries later on.
:rtype: dict
"""
is_admin = trans.user_is_admin
library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action
restricted_library_ids = {id for id in get_library_ids(trans.sa_session, library_access_action)}
prefetched_ids = {"restricted_library_ids": restricted_library_ids}
if is_admin:
libraries = get_libraries_for_admins(trans.sa_session, deleted=deleted)
else:
# Nonadmins can't see deleted libraries
if deleted:
raise exceptions.AdminRequiredException()
current_user_role_ids = [role.id for role in trans.get_current_user_roles()]
library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action
library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action
library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action
accessible_restricted_library_ids = set()
allowed_library_add_ids = set()
allowed_library_modify_ids = set()
allowed_library_manage_ids = set()
for action in get_library_permissions_by_role(trans.sa_session, current_user_role_ids):
if action.action == library_access_action:
accessible_restricted_library_ids.add(action.library_id)
if action.action == library_add_action:
allowed_library_add_ids.add(action.library_id)
if action.action == library_modify_action:
allowed_library_modify_ids.add(action.library_id)
if action.action == library_manage_action:
allowed_library_manage_ids.add(action.library_id)
prefetched_ids["allowed_library_add_ids"] = allowed_library_add_ids
prefetched_ids["allowed_library_modify_ids"] = allowed_library_modify_ids
prefetched_ids["allowed_library_manage_ids"] = allowed_library_manage_ids
libraries = get_libraries_for_nonadmins(
trans.sa_session, restricted_library_ids, accessible_restricted_library_ids
)
return libraries, prefetched_ids
[docs] def secure(self, trans, library: Library, check_accessible: bool = True) -> Library:
"""
Check if library is accessible to user.
:param library: library
:type library: galaxy.model.Library
:param check_accessible: flag whether to check that user can access library
:type check_accessible: bool
:returns: the original library
:rtype: galaxy.model.Library
"""
# all libraries are accessible to an admin
if trans.user_is_admin:
return library
if check_accessible:
library = self.check_accessible(trans, library)
return library
[docs] def check_accessible(self, trans, library: Library) -> Library:
"""
Check whether the library is accessible to current user.
"""
if not trans.app.security_agent.can_access_library(trans.get_current_user_roles(), library):
raise exceptions.ObjectNotFound("Library with the id provided was not found.")
elif library.deleted:
raise exceptions.ObjectNotFound("Library with the id provided is deleted.")
else:
return library
[docs] def get_library_dict(self, trans, library: Library, prefetched_ids: Optional[Dict[str, Set]] = None) -> dict:
"""
Return library data in the form of a dictionary.
:param library: library
:type library: galaxy.model.Library
:param prefetched_ids: dict of 3 sets with available actions for user's accessible
libraries and a set of ids of all public libraries. These are
used for limiting the number of queries when dictifying a
set of libraries.
:type prefetched_ids: dict
:returns: dict with data about the library
:rtype: dictionary
"""
restricted_library_ids = prefetched_ids.get("restricted_library_ids", None) if prefetched_ids else None
allowed_library_add_ids = prefetched_ids.get("allowed_library_add_ids", None) if prefetched_ids else None
allowed_library_modify_ids = prefetched_ids.get("allowed_library_modify_ids", None) if prefetched_ids else None
allowed_library_manage_ids = prefetched_ids.get("allowed_library_manage_ids", None) if prefetched_ids else None
library_dict = library.to_dict(view="element")
library_dict["public"] = False if (restricted_library_ids and library.id in restricted_library_ids) else True
library_dict["create_time_pretty"] = pretty_print_time_interval(library.create_time, precise=True)
if not trans.user_is_admin:
if prefetched_ids:
library_dict["can_user_add"] = (
True if (allowed_library_add_ids and library.id in allowed_library_add_ids) else False
)
library_dict["can_user_modify"] = (
True if (allowed_library_modify_ids and library.id in allowed_library_modify_ids) else False
)
library_dict["can_user_manage"] = (
True if (allowed_library_manage_ids and library.id in allowed_library_manage_ids) else False
)
else:
current_user_roles = trans.get_current_user_roles()
library_dict["can_user_add"] = trans.app.security_agent.can_add_library_item(
current_user_roles, library
)
library_dict["can_user_modify"] = trans.app.security_agent.can_modify_library_item(
current_user_roles, library
)
library_dict["can_user_manage"] = trans.app.security_agent.can_manage_library_item(
current_user_roles, library
)
else:
library_dict["can_user_add"] = True
library_dict["can_user_modify"] = True
library_dict["can_user_manage"] = True
return library_dict
[docs] def get_current_roles(self, trans, library: Library) -> dict:
"""
Load all permissions currently related to the given library.
:param library: the model object
:type library: galaxy.model.Library
:rtype: dictionary
:returns: dict of current roles for all available permission types
"""
access_library_role_list = [
(access_role.name, trans.security.encode_id(access_role.id))
for access_role in self.get_access_roles(trans, library)
]
modify_library_role_list = [
(modify_role.name, trans.security.encode_id(modify_role.id))
for modify_role in self.get_modify_roles(trans, library)
]
manage_library_role_list = [
(manage_role.name, trans.security.encode_id(manage_role.id))
for manage_role in self.get_manage_roles(trans, library)
]
add_library_item_role_list = [
(add_role.name, trans.security.encode_id(add_role.id)) for add_role in self.get_add_roles(trans, library)
]
return dict(
access_library_role_list=access_library_role_list,
modify_library_role_list=modify_library_role_list,
manage_library_role_list=manage_library_role_list,
add_library_item_role_list=add_library_item_role_list,
)
[docs] def get_access_roles(self, trans, library: Library) -> Set[Role]:
"""
Load access roles for all library permissions
"""
return set(library.get_access_roles(trans.app.security_agent))
[docs] def get_modify_roles(self, trans, library: Library) -> Set[Role]:
"""
Load modify roles for all library permissions
"""
return set(
trans.app.security_agent.get_roles_for_action(
library, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY
)
)
[docs] def get_manage_roles(self, trans, library: Library) -> Set[Role]:
"""
Load manage roles for all library permissions
"""
return set(
trans.app.security_agent.get_roles_for_action(
library, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE
)
)
[docs] def get_add_roles(self, trans, library: Library) -> Set[Role]:
"""
Load add roles for all library permissions
"""
return set(
trans.app.security_agent.get_roles_for_action(
library, trans.app.security_agent.permitted_actions.LIBRARY_ADD
)
)
[docs] def make_public(self, trans, library: Library) -> bool:
"""
Makes the given library public (removes all access roles)
"""
trans.app.security_agent.make_library_public(library)
return self.is_public(trans, library)
[docs] def is_public(self, trans, library: Library) -> bool:
"""
Return true if lib is public.
"""
return trans.app.security_agent.library_is_public(library)
[docs]def get_containing_library_from_library_dataset(trans, library_dataset) -> Optional[Library]:
"""Given a library_dataset, get the containing library"""
folder = library_dataset.folder
while folder.parent:
folder = folder.parent
# We have folder set to the library's root folder, which has the same name as the library
stmt = select(Library).where(Library.deleted == false()).where(Library.name == folder.name)
for library in trans.sa_session.scalars(stmt):
# Just to double-check
if library.root_folder == folder:
return library
return None
[docs]def get_library(session, library_id):
stmt = select(Library).where(Library.id == library_id)
return session.execute(stmt).scalar_one()
[docs]def get_library_ids(session, library_access_action):
stmt = select(LibraryPermissions.library_id).where(LibraryPermissions.action == library_access_action).distinct()
return session.scalars(stmt)
[docs]def get_library_permissions_by_role(session, role_ids):
stmt = select(LibraryPermissions).where(LibraryPermissions.role_id.in_(role_ids))
return session.scalars(stmt)
[docs]def get_libraries_for_admins(session, deleted):
stmt = select(Library)
if deleted is None:
# Flag is not specified, do not filter on it.
pass
elif deleted:
stmt = stmt.where(Library.deleted == true())
else:
stmt = stmt.where(Library.deleted == false())
stmt = stmt.order_by(asc(func.lower(Library.name)))
return session.scalars(stmt)
[docs]def get_libraries_for_nonadmins(session, restricted_library_ids, accessible_restricted_library_ids):
stmt = (
select(Library)
.where(Library.deleted == false())
.where(
or_(
not_(Library.id.in_(restricted_library_ids)),
Library.id.in_(accessible_restricted_library_ids),
)
)
)
stmt = stmt.order_by(asc(func.lower(Library.name)))
return session.scalars(stmt)