Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.libraries
"""
Manager and Serializer for libraries.
"""
import logging
from typing import (
Any,
Dict,
List,
Optional,
)
from sqlalchemy import and_, false, not_, or_, true
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy.orm.exc import NoResultFound
from galaxy import (
exceptions,
util,
)
from galaxy.managers import (
folders,
roles,
)
from galaxy.managers.context import ProvidesAppContext
from galaxy.schema.fields import EncodedDatabaseIdField
from galaxy.util import (
pretty_print_time_interval,
unicodify,
)
log = logging.getLogger(__name__)
# =============================================================================
[docs]class LibraryManager:
"""
Interface/service object for interacting with libraries.
"""
[docs] def get(self, trans, decoded_library_id, check_accessible=True):
"""
Get the library from the DB.
:param decoded_library_id: decoded library id
:type decoded_library_id: int
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the requested library
:rtype: galaxy.model.Library
"""
try:
library = trans.sa_session.query(trans.app.model.Library).filter(trans.app.model.Library.table.c.id == decoded_library_id).one()
except MultipleResultsFound:
raise exceptions.InconsistentDatabase('Multiple libraries found with the same id.')
except NoResultFound:
raise exceptions.RequestParameterInvalidException('No library found with the id provided.')
except Exception as e:
raise exceptions.InternalServerError('Error loading from the database.' + unicodify(e))
library = self.secure(trans, library, check_accessible)
return library
[docs] def create(self, trans, name, description='', synopsis=''):
"""
Create a new library.
"""
if not trans.user_is_admin:
raise exceptions.ItemAccessibilityException('Only administrators can create libraries.')
else:
library = trans.app.model.Library(name=name, description=description, synopsis=synopsis)
root_folder = trans.app.model.LibraryFolder(name=name, description='')
library.root_folder = root_folder
trans.sa_session.add_all((library, root_folder))
trans.sa_session.flush()
return library
[docs] def update(self, trans, library, name=None, description=None, synopsis=None):
"""
Update the given library
"""
changed = False
if not trans.user_is_admin:
raise exceptions.ItemAccessibilityException('Only administrators can update libraries.')
if library.deleted:
raise exceptions.RequestParameterInvalidException('You cannot modify a deleted library. Undelete it first.')
if name is not None:
library.name = name
changed = True
# When library is renamed the root folder has to be renamed too.
folder_manager = folders.FolderManager()
folder_manager.update(trans, library.root_folder, name=name)
if description is not None:
library.description = description
changed = True
if synopsis is not None:
library.synopsis = synopsis
changed = True
if changed:
trans.sa_session.add(library)
trans.sa_session.flush()
return library
[docs] def delete(self, trans, library, undelete=False):
"""
Mark given library deleted/undeleted based on the flag.
"""
if not trans.user_is_admin:
raise exceptions.ItemAccessibilityException('Only administrators can delete and undelete libraries.')
if undelete:
library.deleted = False
else:
library.deleted = True
trans.sa_session.add(library)
trans.sa_session.flush()
return library
[docs] def list(self, trans, deleted: Optional[bool] = False):
"""
Return a list of libraries from the DB.
:param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
:type deleted: boolean (optional)
:returns: query that will emit all accessible libraries
:rtype: sqlalchemy query
:returns: dict of 3 sets with available actions for user's accessible
libraries and a set of ids of all public libraries. These are
used for limiting the number of queries when dictifying the
libraries later on.
:rtype: dict
"""
is_admin = trans.user_is_admin
query = trans.sa_session.query(trans.app.model.Library)
library_access_action = trans.app.security_agent.permitted_actions.LIBRARY_ACCESS.action
restricted_library_ids = {lp.library_id for lp in (
trans.sa_session.query(trans.model.LibraryPermissions).filter(
trans.model.LibraryPermissions.table.c.action == library_access_action
).distinct())}
prefetched_ids = {'restricted_library_ids': restricted_library_ids}
if is_admin:
if deleted is None:
# Flag is not specified, do not filter on it.
pass
elif deleted:
query = query.filter(trans.app.model.Library.table.c.deleted == true())
else:
query = query.filter(trans.app.model.Library.table.c.deleted == false())
else:
# Nonadmins can't see deleted libraries
if deleted:
raise exceptions.AdminRequiredException()
else:
query = query.filter(trans.app.model.Library.table.c.deleted == false())
current_user_role_ids = [role.id for role in trans.get_current_user_roles()]
all_actions = trans.sa_session.query(trans.model.LibraryPermissions).filter(trans.model.LibraryPermissions.table.c.role_id.in_(current_user_role_ids))
library_add_action = trans.app.security_agent.permitted_actions.LIBRARY_ADD.action
library_modify_action = trans.app.security_agent.permitted_actions.LIBRARY_MODIFY.action
library_manage_action = trans.app.security_agent.permitted_actions.LIBRARY_MANAGE.action
accessible_restricted_library_ids = set()
allowed_library_add_ids = set()
allowed_library_modify_ids = set()
allowed_library_manage_ids = set()
for action in all_actions:
if action.action == library_access_action:
accessible_restricted_library_ids.add(action.library_id)
if action.action == library_add_action:
allowed_library_add_ids.add(action.library_id)
if action.action == library_modify_action:
allowed_library_modify_ids.add(action.library_id)
if action.action == library_manage_action:
allowed_library_manage_ids.add(action.library_id)
query = query.filter(or_(
not_(trans.model.Library.table.c.id.in_(restricted_library_ids)),
trans.model.Library.table.c.id.in_(accessible_restricted_library_ids)
))
prefetched_ids['allowed_library_add_ids'] = allowed_library_add_ids
prefetched_ids['allowed_library_modify_ids'] = allowed_library_modify_ids
prefetched_ids['allowed_library_manage_ids'] = allowed_library_manage_ids
return query, prefetched_ids
[docs] def secure(self, trans, library, check_accessible=True):
"""
Check if library is accessible to user.
:param library: library
:type library: galaxy.model.Library
:param check_accessible: flag whether to check that user can access library
:type check_accessible: bool
:returns: the original library
:rtype: galaxy.model.Library
"""
# all libraries are accessible to an admin
if trans.user_is_admin:
return library
if check_accessible:
library = self.check_accessible(trans, library)
return library
[docs] def check_accessible(self, trans, library):
"""
Check whether the library is accessible to current user.
"""
if not trans.app.security_agent.can_access_library(trans.get_current_user_roles(), library):
raise exceptions.ObjectNotFound('Library with the id provided was not found.')
elif library.deleted:
raise exceptions.ObjectNotFound('Library with the id provided is deleted.')
else:
return library
[docs] def get_library_dict(self, trans, library, prefetched_ids=None):
"""
Return library data in the form of a dictionary.
:param library: library
:type library: galaxy.model.Library
:param prefetched_ids: dict of 3 sets with available actions for user's accessible
libraries and a set of ids of all public libraries. These are
used for limiting the number of queries when dictifying a
set of libraries.
:type prefetched_ids: dict
:returns: dict with data about the library
:rtype: dictionary
"""
restricted_library_ids = prefetched_ids.get('restricted_library_ids', None) if prefetched_ids else None
allowed_library_add_ids = prefetched_ids.get('allowed_library_add_ids', None) if prefetched_ids else None
allowed_library_modify_ids = prefetched_ids.get('allowed_library_modify_ids', None) if prefetched_ids else None
allowed_library_manage_ids = prefetched_ids.get('allowed_library_manage_ids', None) if prefetched_ids else None
library_dict = library.to_dict(view='element', value_mapper={'id': trans.security.encode_id, 'root_folder_id': trans.security.encode_id})
library_dict['public'] = False if (restricted_library_ids and library.id in restricted_library_ids) else True
library_dict['create_time_pretty'] = pretty_print_time_interval(library.create_time, precise=True)
if not trans.user_is_admin:
if prefetched_ids:
library_dict['can_user_add'] = True if (allowed_library_add_ids and library.id in allowed_library_add_ids) else False
library_dict['can_user_modify'] = True if (allowed_library_modify_ids and library.id in allowed_library_modify_ids) else False
library_dict['can_user_manage'] = True if (allowed_library_manage_ids and library.id in allowed_library_manage_ids) else False
else:
current_user_roles = trans.get_current_user_roles()
library_dict['can_user_add'] = trans.app.security_agent.can_add_library_item(current_user_roles, library)
library_dict['can_user_modify'] = trans.app.security_agent.can_modify_library_item(current_user_roles, library)
library_dict['can_user_manage'] = trans.app.security_agent.can_manage_library_item(current_user_roles, library)
else:
library_dict['can_user_add'] = True
library_dict['can_user_modify'] = True
library_dict['can_user_manage'] = True
return library_dict
[docs] def get_current_roles(self, trans, library):
"""
Load all permissions currently related to the given library.
:param library: the model object
:type library: galaxy.model.Library
:rtype: dictionary
:returns: dict of current roles for all available permission types
"""
access_library_role_list = [(access_role.name, trans.security.encode_id(access_role.id)) for access_role in self.get_access_roles(trans, library)]
modify_library_role_list = [(modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in self.get_modify_roles(trans, library)]
manage_library_role_list = [(manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in self.get_manage_roles(trans, library)]
add_library_item_role_list = [(add_role.name, trans.security.encode_id(add_role.id)) for add_role in self.get_add_roles(trans, library)]
return dict(access_library_role_list=access_library_role_list,
modify_library_role_list=modify_library_role_list,
manage_library_role_list=manage_library_role_list,
add_library_item_role_list=add_library_item_role_list)
[docs] def get_access_roles(self, trans, library):
"""
Load access roles for all library permissions
"""
return set(library.get_access_roles(trans.app.security_agent))
[docs] def get_modify_roles(self, trans, library):
"""
Load modify roles for all library permissions
"""
return set(trans.app.security_agent.get_roles_for_action(library, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY))
[docs] def get_manage_roles(self, trans, library):
"""
Load manage roles for all library permissions
"""
return set(trans.app.security_agent.get_roles_for_action(library, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE))
[docs] def get_add_roles(self, trans, library):
"""
Load add roles for all library permissions
"""
return set(trans.app.security_agent.get_roles_for_action(library, trans.app.security_agent.permitted_actions.LIBRARY_ADD))
[docs] def set_permission_roles(self, trans, library, access_roles, modify_roles, manage_roles, add_roles):
"""
Set permissions on the given library.
"""
[docs] def make_public(self, trans, library):
"""
Makes the given library public (removes all access roles)
"""
trans.app.security_agent.make_library_public(library)
return self.is_public(trans, library)
[docs] def is_public(self, trans, library):
"""
Return true if lib is public.
"""
return trans.app.security_agent.library_is_public(library)
[docs]def get_containing_library_from_library_dataset(trans, library_dataset):
"""Given a library_dataset, get the containing library"""
folder = library_dataset.folder
while folder.parent:
folder = folder.parent
# We have folder set to the library's root folder, which has the same name as the library
for library in trans.sa_session.query(trans.model.Library).filter(
and_(trans.model.Library.table.c.deleted == false(),
trans.model.Library.table.c.name == folder.name)):
# Just to double-check
if library.root_folder == folder:
return library
return None
[docs]class LibrariesManager:
"""
Interface/service object for sharing logic between controllers.
"""
[docs] def __init__(
self,
folder_manager: folders.FolderManager,
library_manager: LibraryManager,
role_manager: roles.RoleManager,
):
self.folder_manager = folder_manager
self.library_manager = library_manager
self.role_manager = role_manager
[docs] def index(self, trans: ProvidesAppContext, deleted: Optional[bool] = False) -> List[Any]:
"""Returns a list of summary data for all libraries.
:param deleted: if True, show only ``deleted`` libraries, if False show only ``non-deleted``
:type deleted: boolean (optional)
:returns: list of dictionaries containing library information
:rtype: list
.. seealso:: :attr:`galaxy.model.Library.dict_collection_visible_keys`
"""
query, prefetched_ids = self.library_manager.list(trans, deleted)
libraries = []
for library in query:
libraries.append(self.library_manager.get_library_dict(trans, library, prefetched_ids))
return libraries
[docs] def show(self, trans, id: EncodedDatabaseIdField):
""" Returns detailed information about a library.
:param id: the encoded id of the library
:type id: an encoded id string
:param deleted: if True, allow information on a ``deleted`` library
:type deleted: boolean
:returns: detailed library information
:rtype: dict
.. seealso:: :attr:`galaxy.model.Library.dict_element_visible_keys`
:raises: MalformedId, ObjectNotFound
"""
library = self.library_manager.get(trans, trans.security.decode_id(id, object_name='library'))
library_dict = self.library_manager.get_library_dict(trans, library)
return library_dict
[docs] def create(self, trans, payload: Dict[str, str]):
"""Creates a new library.
.. note:: Currently, only admin users can create libraries.
:param payload: dictionary structure containing::
:param name: (required) the new library's name
:type name: str
:param description: the new library's description
:type description: str
:param synopsis: the new library's synopsis
:type synopsis: str
:type payload: dict
:returns: detailed library information
:rtype: dict
:raises: RequestParameterMissingException
"""
name = payload.get('name', None)
if not name:
raise exceptions.RequestParameterMissingException("Missing required parameter 'name'.")
description = payload.get('description', '')
synopsis = payload.get('synopsis', '')
if synopsis in ['None', None]:
synopsis = ''
library = self.library_manager.create(trans, name, description, synopsis)
library_dict = self.library_manager.get_library_dict(trans, library)
return library_dict
[docs] def update(self, trans, id: EncodedDatabaseIdField, payload: Dict[str, str]):
"""Updates the library defined by an ``encoded_id`` with the data in the payload.
.. note:: Currently, only admin users can update libraries. Also the library must not be `deleted`.
:param id: the encoded id of the library
:type id: an encoded id string
:param payload: dictionary structure containing::
:param name: new library's name, cannot be empty
:type name: str
:param description: new library's description
:type description: str
:param synopsis: new library's synopsis
:type synopsis: str
:type payload: dict
:returns: detailed library information
:rtype: dict
:raises: RequestParameterMissingException
"""
library = self.library_manager.get(trans, trans.security.decode_id(id, object_name='library'))
name = payload.get('name', None)
if name == '':
raise exceptions.RequestParameterMissingException("Parameter 'name' of library is required. You cannot remove it.")
description = payload.get('description', None)
synopsis = payload.get('synopsis', None)
updated_library = self.library_manager.update(trans, library, name, description, synopsis)
library_dict = self.library_manager.get_library_dict(trans, updated_library)
return library_dict
[docs] def delete(self, trans, id: EncodedDatabaseIdField, undelete: Optional[bool] = False):
"""Marks the library with the given ``id`` as `deleted` (or removes the `deleted` mark if the `undelete` param is true)
.. note:: Currently, only admin users can un/delete libraries.
:param id: the encoded id of the library to un/delete
:type id: an encoded id string
:param undelete: (optional) flag specifying whether the item should be deleted or undeleted, defaults to false:
:type undelete: bool
:returns: detailed library information
:rtype: dictionary
.. seealso:: :attr:`galaxy.model.Library.dict_element_visible_keys`
"""
library = self.library_manager.get(trans, trans.security.decode_id(id, object_name='library'))
library = self.library_manager.delete(trans, library, undelete)
library_dict = self.library_manager.get_library_dict(trans, library)
return library_dict
[docs] def get_permissions(
self,
trans,
id: EncodedDatabaseIdField,
scope: Optional[str] = 'current',
is_library_access: Optional[bool] = False,
page: Optional[int] = 1,
page_limit: Optional[int] = 10,
query: Optional[str] = None,
):
"""Load all permissions for the given library id and return it.
:param id: the encoded id of the library
:type id: an encoded id string
:param scope: either 'current' or 'available'
:type scope: string
:param is_library_access: indicates whether the roles available for the library access are requested
:type is_library_access: bool
:returns: dictionary with all applicable permissions' values
:rtype: dictionary
:raises: InsufficientPermissionsException
"""
current_user_roles = trans.get_current_user_roles()
is_admin = trans.user_is_admin
library = self.library_manager.get(trans, trans.security.decode_id(id, object_name='library'))
if not (is_admin or trans.app.security_agent.can_manage_library_item(current_user_roles, library)):
raise exceptions.InsufficientPermissionsException('You do not have proper permission to access permissions of this library.')
if scope == 'current' or scope is None:
roles = self.library_manager.get_current_roles(trans, library)
return roles
# Return roles that are available to select.
elif scope == 'available':
roles, total_roles = trans.app.security_agent.get_valid_roles(trans, library, query, page, page_limit, is_library_access)
return_roles = []
for role in roles:
role_id = trans.security.encode_id(role.id)
return_roles.append(dict(id=role_id, name=role.name, type=role.type))
return dict(roles=return_roles, page=page, page_limit=page_limit, total=total_roles)
else:
raise exceptions.RequestParameterInvalidException("The value of 'scope' parameter is invalid. Alllowed values: current, available")
[docs] def set_permissions(self, trans, id: EncodedDatabaseIdField, payload: Dict[str, Any]):
"""Set permissions of the given library to the given role ids.
:param id: the encoded id of the library to set the permissions of
:type id: an encoded id string
:param payload: dictionary structure containing:
:param action: (required) describes what action should be performed
available actions: remove_restrictions, set_permissions
:type action: str
:param access_ids[]: list of Role.id defining roles that should have access permission on the library
:type access_ids[]: string or list
:param add_ids[]: list of Role.id defining roles that should have add item permission on the library
:type add_ids[]: string or list
:param manage_ids[]: list of Role.id defining roles that should have manage permission on the library
:type manage_ids[]: string or list
:param modify_ids[]: list of Role.id defining roles that should have modify permission on the library
:type modify_ids[]: string or list
:type: dictionary
:returns: dict of current roles for all available permission types
:rtype: dictionary
:raises: RequestParameterInvalidException, InsufficientPermissionsException, InternalServerError
RequestParameterMissingException
"""
is_admin = trans.user_is_admin
current_user_roles = trans.get_current_user_roles()
library = self.library_manager.get(trans, trans.security.decode_id(id, object_name='library'))
if not (is_admin or trans.app.security_agent.can_manage_library_item(current_user_roles, library)):
raise exceptions.InsufficientPermissionsException('You do not have proper permission to modify permissions of this library.')
new_access_roles_ids = util.listify(payload.get('access_ids[]', None))
new_add_roles_ids = util.listify(payload.get('add_ids[]', None))
new_manage_roles_ids = util.listify(payload.get('manage_ids[]', None))
new_modify_roles_ids = util.listify(payload.get('modify_ids[]', None))
action = payload.get('action', None)
if action is None:
if payload is not None:
return self.set_permissions_old(trans, library, payload)
else:
raise exceptions.RequestParameterMissingException('The mandatory parameter "action" is missing.')
elif action == 'remove_restrictions':
is_public = self.library_manager.make_public(trans, library)
if not is_public:
raise exceptions.InternalServerError('An error occurred while making library public.')
elif action == 'set_permissions':
# ACCESS LIBRARY ROLES
valid_access_roles = []
invalid_access_roles_names = []
for role_id in new_access_roles_ids:
role = self.role_manager.get(trans, trans.security.decode_id(role_id, object_name='role'))
valid_roles, total_roles = trans.app.security_agent.get_valid_roles(trans, library, is_library_access=True)
if role in valid_roles:
valid_access_roles.append(role)
else:
invalid_access_roles_names.append(role_id)
if len(invalid_access_roles_names) > 0:
log.warning("The following roles could not be added to the library access permission: " + str(invalid_access_roles_names))
# ADD TO LIBRARY ROLES
valid_add_roles = []
invalid_add_roles_names = []
for role_id in new_add_roles_ids:
role = self.role_manager.get(trans, trans.security.decode_id(role_id, object_name='role'))
valid_roles, total_roles = trans.app.security_agent.get_valid_roles(trans, library)
if role in valid_roles:
valid_add_roles.append(role)
else:
invalid_add_roles_names.append(role_id)
if len(invalid_add_roles_names) > 0:
log.warning("The following roles could not be added to the add library item permission: " + str(invalid_add_roles_names))
# MANAGE LIBRARY ROLES
valid_manage_roles = []
invalid_manage_roles_names = []
for role_id in new_manage_roles_ids:
role = self.role_manager.get(trans, trans.security.decode_id(role_id, object_name='role'))
valid_roles, total_roles = trans.app.security_agent.get_valid_roles(trans, library)
if role in valid_roles:
valid_manage_roles.append(role)
else:
invalid_manage_roles_names.append(role_id)
if len(invalid_manage_roles_names) > 0:
log.warning("The following roles could not be added to the manage library permission: " + str(invalid_manage_roles_names))
# MODIFY LIBRARY ROLES
valid_modify_roles = []
invalid_modify_roles_names = []
for role_id in new_modify_roles_ids:
role = self.role_manager.get(trans, trans.security.decode_id(role_id, object_name='role'))
valid_roles, total_roles = trans.app.security_agent.get_valid_roles(trans, library)
if role in valid_roles:
valid_modify_roles.append(role)
else:
invalid_modify_roles_names.append(role_id)
if len(invalid_modify_roles_names) > 0:
log.warning("The following roles could not be added to the modify library permission: " + str(invalid_modify_roles_names))
permissions = {trans.app.security_agent.permitted_actions.LIBRARY_ACCESS: valid_access_roles}
permissions.update({trans.app.security_agent.permitted_actions.LIBRARY_ADD: valid_add_roles})
permissions.update({trans.app.security_agent.permitted_actions.LIBRARY_MANAGE: valid_manage_roles})
permissions.update({trans.app.security_agent.permitted_actions.LIBRARY_MODIFY: valid_modify_roles})
trans.app.security_agent.set_all_library_permissions(trans, library, permissions)
trans.sa_session.refresh(library)
# Copy the permissions to the root folder
trans.app.security_agent.copy_library_permissions(trans, library, library.root_folder)
else:
raise exceptions.RequestParameterInvalidException('The mandatory parameter "action" has an invalid value.'
'Allowed values are: "remove_restrictions", set_permissions"')
roles = self.library_manager.get_current_roles(trans, library)
return roles
[docs] def set_permissions_old(self, trans, library, payload):
"""
*** old implementation for backward compatibility ***
Updates the library permissions.
"""
params = util.Params(payload)
permissions = {}
for k, v in trans.app.model.Library.permitted_actions.items():
role_params = params.get(k + '_in', [])
in_roles = [trans.sa_session.query(trans.app.model.Role).get(trans.security.decode_id(x)) for x in util.listify(role_params)]
permissions[trans.app.security_agent.get_action(v.action)] = in_roles
trans.app.security_agent.set_all_library_permissions(trans, library, permissions)
trans.sa_session.refresh(library)
# Copy the permissions to the root folder
trans.app.security_agent.copy_library_permissions(trans, library, library.root_folder)
item = library.to_dict(view='element', value_mapper={'id': trans.security.encode_id, 'root_folder_id': trans.security.encode_id})
return item