Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.folders
"""
Manager and Serializer for Library Folders.
"""
import logging
from sqlalchemy.orm.exc import (
MultipleResultsFound,
NoResultFound
)
from galaxy.exceptions import (
AuthenticationRequired,
InconsistentDatabase,
InsufficientPermissionsException,
InternalServerError,
ItemAccessibilityException,
MalformedId,
RequestParameterInvalidException
)
from galaxy.util import unicodify
log = logging.getLogger(__name__)
# =============================================================================
[docs]class FolderManager(object):
"""
Interface/service object for interacting with folders.
"""
[docs] def get(self, trans, decoded_folder_id, check_manageable=False, check_accessible=True):
"""
Get the folder from the DB.
:param decoded_folder_id: decoded folder id
:type decoded_folder_id: int
:param check_manageable: flag whether the check that user can manage item
:type check_manageable: bool
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the requested folder
:rtype: LibraryFolder
:raises: InconsistentDatabase, RequestParameterInvalidException, InternalServerError
"""
try:
folder = trans.sa_session.query(trans.app.model.LibraryFolder).filter(trans.app.model.LibraryFolder.table.c.id == decoded_folder_id).one()
except MultipleResultsFound:
raise InconsistentDatabase('Multiple folders found with the same id.')
except NoResultFound:
raise RequestParameterInvalidException('No folder found with the id provided.')
except Exception as e:
raise InternalServerError('Error loading from the database.' + unicodify(e))
folder = self.secure(trans, folder, check_manageable, check_accessible)
return folder
[docs] def secure(self, trans, folder, check_manageable=True, check_accessible=True):
"""
Check if (a) user can manage folder or (b) folder is accessible to user.
:param folder: folder item
:type folder: LibraryFolder
:param check_manageable: flag whether to check that user can manage item
:type check_manageable: bool
:param check_accessible: flag whether to check that user can access item
:type check_accessible: bool
:returns: the original folder
:rtype: LibraryFolder
"""
# all folders are accessible to an admin
if trans.user_is_admin:
return folder
if check_manageable:
folder = self.check_manageable(trans, folder)
if check_accessible:
folder = self.check_accessible(trans, folder)
return folder
[docs] def check_manageable(self, trans, folder):
"""
Check whether the user can manage the folder.
:returns: the original folder
:rtype: LibraryFolder
:raises: AuthenticationRequired, InsufficientPermissionsException
"""
if not trans.user:
raise AuthenticationRequired("Must be logged in to manage Galaxy items.", type='error')
current_user_roles = trans.get_current_user_roles()
if not trans.app.security_agent.can_manage_library_item(current_user_roles, folder):
raise InsufficientPermissionsException("You don't have permissions to manage this folder.", type='error')
else:
return folder
[docs] def check_accessible(self, trans, folder):
"""
Check whether the folder is accessible to current user.
By default every folder is accessible (contents have their own permissions).
"""
return folder
[docs] def get_folder_dict(self, trans, folder):
"""
Return folder data in the form of a dictionary.
:param folder: folder item
:type folder: LibraryFolder
:returns: dict with data about the folder
:rtype: dictionary
"""
folder_dict = folder.to_dict(view='element')
folder_dict = trans.security.encode_all_ids(folder_dict, True)
folder_dict['id'] = 'F' + folder_dict['id']
if folder_dict['parent_id'] is not None:
folder_dict['parent_id'] = 'F' + folder_dict['parent_id']
folder_dict['update_time'] = folder.update_time.strftime("%Y-%m-%d %I:%M %p")
return folder_dict
[docs] def create(self, trans, parent_folder_id, new_folder_name, new_folder_description=''):
"""
Create a new folder under the given folder.
:param parent_folder_id: decoded id
:type parent_folder_id: int
:param new_folder_name: name of the new folder
:type new_folder_name: str
:param new_folder_description: description of the folder (optional, defaults to empty string)
:type new_folder_description: str
:returns: the new folder
:rtype: LibraryFolder
:raises: InsufficientPermissionsException
"""
parent_folder = self.get(trans, parent_folder_id)
current_user_roles = trans.get_current_user_roles()
if not (trans.user_is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, parent_folder)):
raise InsufficientPermissionsException('You do not have proper permission to create folders under given folder.')
new_folder = trans.app.model.LibraryFolder(name=new_folder_name, description=new_folder_description)
# We are associating the last used genome build with folders, so we will always
# initialize a new folder with the first dbkey in genome builds list which is currently
# ? unspecified (?)
new_folder.genome_build = trans.app.genome_builds.default_value
parent_folder.add_folder(new_folder)
trans.sa_session.add(new_folder)
trans.sa_session.flush()
# New folders default to having the same permissions as their parent folder
trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder)
return new_folder
[docs] def update(self, trans, folder, name=None, description=None):
"""
Update the given folder's name or description.
:param folder: the model object
:type folder: LibraryFolder
:param name: new name for the library folder
:type name: str
:param description: new description for the library folder
:type description: str
:returns: the folder
:rtype: LibraryFolder
:raises: ItemAccessibilityException, InsufficientPermissionsException
"""
changed = False
if not trans.user_is_admin:
if not self.check_manageable(trans, folder):
raise InsufficientPermissionsException("You do not have proper permission to update the library folder.")
if folder.deleted is True:
raise ItemAccessibilityException("You cannot update a deleted library folder. Undelete it first.")
if name is not None and name != folder.name:
folder.name = name
changed = True
if description is not None and description != folder.description:
folder.description = description
changed = True
if changed:
trans.sa_session.add(folder)
trans.sa_session.flush()
return folder
[docs] def delete(self, trans, folder, undelete=False):
"""
Mark given folder deleted/undeleted based on the flag.
:param folder: the model object
:type folder: LibraryFolder
:param undelete: flag whether to delete (when False) or undelete
:type undelete: Bool
:returns: the folder
:rtype: LibraryFolder
:raises: ItemAccessibilityException
"""
if not trans.user_is_admin:
folder = self.check_manageable(trans, folder)
if undelete:
folder.deleted = False
else:
folder.deleted = True
trans.sa_session.add(folder)
trans.sa_session.flush()
return folder
[docs] def get_current_roles(self, trans, folder):
"""
Find all roles currently connected to relevant permissions
on the folder.
:param folder: the model object
:type folder: LibraryFolder
:returns: dict of current roles for all available permission types
:rtype: dictionary
"""
# Omit duplicated roles by converting to set
modify_roles = set(trans.app.security_agent.get_roles_for_action(folder, trans.app.security_agent.permitted_actions.LIBRARY_MODIFY))
manage_roles = set(trans.app.security_agent.get_roles_for_action(folder, trans.app.security_agent.permitted_actions.LIBRARY_MANAGE))
add_roles = set(trans.app.security_agent.get_roles_for_action(folder, trans.app.security_agent.permitted_actions.LIBRARY_ADD))
modify_folder_role_list = [(modify_role.name, trans.security.encode_id(modify_role.id)) for modify_role in modify_roles]
manage_folder_role_list = [(manage_role.name, trans.security.encode_id(manage_role.id)) for manage_role in manage_roles]
add_library_item_role_list = [(add_role.name, trans.security.encode_id(add_role.id)) for add_role in add_roles]
return dict(modify_folder_role_list=modify_folder_role_list,
manage_folder_role_list=manage_folder_role_list,
add_library_item_role_list=add_library_item_role_list)
[docs] def can_add_item(self, trans, folder):
"""
Return true if the user has permissions to add item to the given folder.
"""
if trans.user_is_admin:
return True
current_user_roles = trans.get_current_user_roles()
add_roles = set(trans.app.security_agent.get_roles_for_action(folder, trans.app.security_agent.permitted_actions.LIBRARY_ADD))
for role in current_user_roles:
if role in add_roles:
return True
return False
[docs] def cut_the_prefix(self, encoded_folder_id):
"""
Remove the prefix from the encoded folder id.
:param encoded_folder_id: encoded id of the Folder object with 'F' prepended
:type encoded_folder_id: string
:returns: encoded Folder id without the 'F' prefix
:rtype: string
:raises: MalformedId
"""
if ((len(encoded_folder_id) % 16 == 1) and encoded_folder_id.startswith('F')):
cut_id = encoded_folder_id[1:]
else:
raise MalformedId('Malformed folder id ( %s ) specified, unable to decode.' % str(encoded_folder_id))
return cut_id
[docs] def decode_folder_id(self, trans, encoded_folder_id):
"""
Decode the folder id given that it has already lost the prefixed 'F'.
:param encoded_folder_id: encoded id of the Folder object
:type encoded_folder_id: string
:returns: decoded Folder id
:rtype: int
:raises: MalformedId
"""
try:
decoded_id = trans.security.decode_id(encoded_folder_id)
except ValueError:
raise MalformedId("Malformed folder id ( %s ) specified, unable to decode" % (str(encoded_folder_id)))
return decoded_id
[docs] def cut_and_decode(self, trans, encoded_folder_id):
"""
Cuts the folder prefix (the prepended 'F') and returns the decoded id.
:param encoded_folder_id: encoded id of the Folder object
:type encoded_folder_id: string
:returns: decoded Folder id
:rtype: int
"""
return self.decode_folder_id(trans, self.cut_the_prefix(encoded_folder_id))