Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.api.library_contents
"""
API operations on the contents of a data library.
"""
import logging
from sqlalchemy.orm.exc import (
MultipleResultsFound,
NoResultFound,
)
from galaxy import (
exceptions,
managers,
util,
)
from galaxy.actions.library import LibraryActions, validate_path_upload
from galaxy.managers.collections_util import (
api_payload_to_create_params,
dictify_dataset_collection_instance
)
from galaxy.model import (
ExtendedMetadata,
ExtendedMetadataIndex,
tags
)
from galaxy.web import expose_api
from galaxy.webapps.base.controller import (
BaseAPIController,
HTTPBadRequest,
url_for,
UsesFormDefinitionsMixin,
UsesLibraryMixin,
UsesLibraryMixinItems
)
log = logging.getLogger(__name__)
[docs]class LibraryContentsController(BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems, UsesFormDefinitionsMixin, LibraryActions):
[docs] def __init__(self, app):
super().__init__(app)
self.hda_manager = managers.hdas.HDAManager(app)
[docs] @expose_api
def index(self, trans, library_id, **kwd):
"""
GET /api/libraries/{library_id}/contents:
Return a list of library files and folders.
.. note:: This endpoint is slow for large libraries. Returns all content traversing recursively through all folders.
.. seealso:: :class:`galaxy.webapps.galaxy.api.FolderContentsController.index` for a faster non-recursive solution
:param library_id: the encoded id of the library
:type library_id: str
:returns: list of dictionaries of the form:
* id: the encoded id of the library item
* name: the 'library path'
or relationship of the library item to the root
* type: 'file' or 'folder'
* url: the url to get detailed information on the library item
:rtype: list
:raises: MalformedId, InconsistentDatabase, RequestParameterInvalidException, InternalServerError
"""
rval = []
current_user_roles = trans.get_current_user_roles()
def traverse(folder):
admin = trans.user_is_admin
rval = []
for subfolder in folder.active_folders:
if not admin:
can_access, folder_ids = trans.app.security_agent.check_folder_contents(trans.user, current_user_roles, subfolder)
if (admin or can_access) and not subfolder.deleted:
subfolder.api_path = folder.api_path + '/' + subfolder.name
subfolder.api_type = 'folder'
rval.append(subfolder)
rval.extend(traverse(subfolder))
for ld in folder.datasets:
if not admin:
can_access = trans.app.security_agent.can_access_dataset(
current_user_roles,
ld.library_dataset_dataset_association.dataset
)
if (admin or can_access) and not ld.deleted:
ld.api_path = folder.api_path + '/' + ld.name
ld.api_type = 'file'
rval.append(ld)
return rval
try:
decoded_library_id = self.decode_id(library_id)
except Exception:
raise exceptions.MalformedId('Malformed library id ( %s ) specified, unable to decode.' % library_id)
try:
library = trans.sa_session.query(trans.app.model.Library).filter(trans.app.model.Library.table.c.id == decoded_library_id).one()
except MultipleResultsFound:
raise exceptions.InconsistentDatabase('Multiple libraries found with the same id.')
except NoResultFound:
raise exceptions.RequestParameterInvalidException('No library found with the id provided.')
except Exception as e:
raise exceptions.InternalServerError('Error loading from the database.' + util.unicodify(e))
if not (trans.user_is_admin or trans.app.security_agent.can_access_library(current_user_roles, library)):
raise exceptions.RequestParameterInvalidException('No library found with the id provided.')
encoded_id = 'F' + trans.security.encode_id(library.root_folder.id)
# appending root folder
rval.append(dict(id=encoded_id,
type='folder',
name='/',
url=url_for('library_content', library_id=library_id, id=encoded_id)))
library.root_folder.api_path = ''
# appending all other items in the library recursively
for content in traverse(library.root_folder):
encoded_id = trans.security.encode_id(content.id)
if content.api_type == 'folder':
encoded_id = 'F' + encoded_id
rval.append(dict(id=encoded_id,
type=content.api_type,
name=content.api_path,
url=url_for('library_content', library_id=library_id, id=encoded_id, )))
return rval
[docs] @expose_api
def show(self, trans, id, library_id, **kwd):
"""
GET /api/libraries/{library_id}/contents/{id}
Returns information about library file or folder.
:param id: the encoded id of the library item to return
:type id: str
:param library_id: the encoded id of the library that contains this item
:type library_id: str
:returns: detailed library item information
:rtype: dict
.. seealso::
:func:`galaxy.model.LibraryDataset.to_dict` and
:attr:`galaxy.model.LibraryFolder.dict_element_visible_keys`
"""
class_name, content_id = self._decode_library_content_id(id)
if class_name == 'LibraryFolder':
content = self.get_library_folder(trans, content_id, check_ownership=False, check_accessible=True)
rval = content.to_dict(view='element', value_mapper={'id': trans.security.encode_id})
rval['id'] = 'F' + str(rval['id'])
if rval['parent_id'] is not None: # This can happen for root folders.
rval['parent_id'] = 'F' + str(trans.security.encode_id(rval['parent_id']))
rval['parent_library_id'] = trans.security.encode_id(rval['parent_library_id'])
else:
content = self.get_library_dataset(trans, content_id, check_ownership=False, check_accessible=True)
rval = content.to_dict(view='element')
rval['id'] = trans.security.encode_id(rval['id'])
rval['ldda_id'] = trans.security.encode_id(rval['ldda_id'])
rval['folder_id'] = 'F' + str(trans.security.encode_id(rval['folder_id']))
rval['parent_library_id'] = trans.security.encode_id(rval['parent_library_id'])
tag_manager = tags.GalaxyTagHandler(trans.sa_session)
rval['tags'] = tag_manager.get_tags_str(content.library_dataset_dataset_association.tags)
return rval
[docs] @expose_api
def create(self, trans, library_id, payload, **kwd):
"""
POST /api/libraries/{library_id}/contents:
Create a new library file or folder.
To copy an HDA into a library send ``create_type`` of 'file' and
the HDA's encoded id in ``from_hda_id`` (and optionally ``ldda_message``).
To copy an HDCA into a library send ``create_type`` of 'file' and
the HDCA's encoded id in ``from_hdca_id`` (and optionally ``ldda_message``).
:type library_id: str
:param library_id: the encoded id of the library where to create the new item
:type payload: dict
:param payload: dictionary structure containing:
* folder_id: the encoded id of the parent folder of the new item
* create_type: the type of item to create ('file', 'folder' or 'collection')
* from_hda_id: (optional, only if create_type is 'file') the
encoded id of an accessible HDA to copy into the library
* ldda_message: (optional) the new message attribute of the LDDA created
* extended_metadata: (optional) sub-dictionary containing any extended
metadata to associate with the item
* upload_option: (optional) one of 'upload_file' (default), 'upload_directory' or 'upload_paths'
* server_dir: (optional, only if upload_option is
'upload_directory') relative path of the subdirectory of Galaxy
``library_import_dir`` (if admin) or ``user_library_import_dir``
(if non-admin) to upload. All and only the files (i.e.
no subdirectories) contained in the specified directory will be
uploaded.
* filesystem_paths: (optional, only if upload_option is
'upload_paths' and the user is an admin) file paths on the
Galaxy server to upload to the library, one file per line
* link_data_only: (optional, only when upload_option is
'upload_directory' or 'upload_paths') either 'copy_files'
(default) or 'link_to_files'. Setting to 'link_to_files'
symlinks instead of copying the files
* name: (optional, only if create_type is 'folder') name of the
folder to create
* description: (optional, only if create_type is 'folder')
description of the folder to create
* tag_using_filenames: (optional)
create tags on datasets using the file's original name
* tags: (optional)
create the given list of tags on datasets
:returns: a dictionary describing the new item unless ``from_hdca_id`` is supplied,
in that case a list of such dictionaries is returned.
:rtype: object
"""
if 'create_type' not in payload:
trans.response.status = 400
return "Missing required 'create_type' parameter."
else:
create_type = payload.pop('create_type')
if create_type not in ('file', 'folder', 'collection'):
trans.response.status = 400
return "Invalid value for 'create_type' parameter ( %s ) specified." % create_type
if 'folder_id' not in payload:
trans.response.status = 400
return "Missing required 'folder_id' parameter."
else:
folder_id = payload.pop('folder_id')
class_name, folder_id = self._decode_library_content_id(folder_id)
try:
# security is checked in the downstream controller
parent = self.get_library_folder(trans, folder_id, check_ownership=False, check_accessible=False)
except Exception as e:
return util.unicodify(e)
# The rest of the security happens in the library_common controller.
real_folder_id = trans.security.encode_id(parent.id)
payload['tag_using_filenames'] = util.string_as_bool(payload.get('tag_using_filenames', None))
payload['tags'] = util.listify(payload.get('tags', None))
# are we copying an HDA to the library folder?
# we'll need the id and any message to attach, then branch to that private function
from_hda_id, from_hdca_id, ldda_message = (payload.pop('from_hda_id', None), payload.pop('from_hdca_id', None), payload.pop('ldda_message', ''))
if create_type == 'file':
if from_hda_id:
return self._copy_hda_to_library_folder(trans, self.hda_manager, self.decode_id(from_hda_id), real_folder_id, ldda_message)
if from_hdca_id:
return self._copy_hdca_to_library_folder(trans, self.hda_manager, self.decode_id(from_hdca_id), real_folder_id, ldda_message)
# check for extended metadata, store it and pop it out of the param
# otherwise sanitize_param will have a fit
ex_meta_payload = payload.pop('extended_metadata', None)
# Now create the desired content object, either file or folder.
if create_type == 'file':
status, output = self._upload_library_dataset(trans, library_id, real_folder_id, **payload)
elif create_type == 'folder':
status, output = self._create_folder(trans, real_folder_id, library_id, **payload)
elif create_type == 'collection':
# Not delegating to library_common, so need to check access to parent
# folder here.
self.check_user_can_add_to_library_item(trans, parent, check_accessible=True)
create_params = api_payload_to_create_params(payload)
create_params['parent'] = parent
service = trans.app.dataset_collections_service
dataset_collection_instance = service.create(**create_params)
return [dictify_dataset_collection_instance(dataset_collection_instance, security=trans.security, parent=parent)]
if status != 200:
trans.response.status = status
return output
else:
rval = []
for v in output.values():
if ex_meta_payload is not None:
# If there is extended metadata, store it, attach it to the dataset, and index it
ex_meta = ExtendedMetadata(ex_meta_payload)
trans.sa_session.add(ex_meta)
v.extended_metadata = ex_meta
trans.sa_session.add(v)
trans.sa_session.flush()
for path, value in self._scan_json_block(ex_meta_payload):
meta_i = ExtendedMetadataIndex(ex_meta, path, value)
trans.sa_session.add(meta_i)
trans.sa_session.flush()
if type(v) == trans.app.model.LibraryDatasetDatasetAssociation:
v = v.library_dataset
encoded_id = trans.security.encode_id(v.id)
if create_type == 'folder':
encoded_id = 'F' + encoded_id
rval.append(dict(id=encoded_id,
name=v.name,
url=url_for('library_content', library_id=library_id, id=encoded_id)))
return rval
def _upload_library_dataset(self, trans, library_id, folder_id, **kwd):
replace_id = kwd.get('replace_id', None)
replace_dataset = None
upload_option = kwd.get('upload_option', 'upload_file')
dbkey = kwd.get('dbkey', '?')
if isinstance(dbkey, list):
last_used_build = dbkey[0]
else:
last_used_build = dbkey
roles = kwd.get('roles', '')
is_admin = trans.user_is_admin
current_user_roles = trans.get_current_user_roles()
if replace_id not in ['', None, 'None']:
replace_dataset = trans.sa_session.query(trans.app.model.LibraryDataset).get(trans.security.decode_id(replace_id))
self._check_access(trans, is_admin, replace_dataset, current_user_roles)
self._check_modify(trans, is_admin, replace_dataset, current_user_roles)
library = replace_dataset.folder.parent_library
folder = replace_dataset.folder
# The name is stored - by the time the new ldda is created, replace_dataset.name
# will point to the new ldda, not the one it's replacing.
if not last_used_build:
last_used_build = replace_dataset.library_dataset_dataset_association.dbkey
else:
folder = trans.sa_session.query(trans.app.model.LibraryFolder).get(trans.security.decode_id(folder_id))
self._check_access(trans, is_admin, folder, current_user_roles)
self._check_add(trans, is_admin, folder, current_user_roles)
library = folder.parent_library
if folder and last_used_build in ['None', None, '?']:
last_used_build = folder.genome_build
error = False
if upload_option == 'upload_paths':
validate_path_upload(trans) # Duplicate check made in _upload_dataset.
elif upload_option not in ('upload_file', 'upload_directory', 'upload_paths'):
error = True
message = 'Invalid upload_option'
elif roles:
# Check to see if the user selected roles to associate with the DATASET_ACCESS permission
# on the dataset that would cause accessibility issues.
vars = dict(DATASET_ACCESS_in=roles)
permissions, in_roles, error, message = \
trans.app.security_agent.derive_roles_from_access(trans, library.id, 'api', library=True, **vars)
if error:
return 400, message
else:
created_outputs_dict = self._upload_dataset(trans,
library_id=trans.security.encode_id(library.id),
folder_id=trans.security.encode_id(folder.id),
replace_dataset=replace_dataset,
**kwd)
if created_outputs_dict:
if type(created_outputs_dict) == str:
return 400, created_outputs_dict
elif type(created_outputs_dict) == tuple:
return created_outputs_dict[0], created_outputs_dict[1]
return 200, created_outputs_dict
else:
return 400, "Upload failed"
def _scan_json_block(self, meta, prefix=""):
"""
Scan a json style data structure, and emit all fields and their values.
Example paths
Data
{ "data" : [ 1, 2, 3 ] }
Path:
/data == [1,2,3]
/data/[0] == 1
"""
if isinstance(meta, dict):
for a in meta:
yield from self._scan_json_block(meta[a], prefix + "/" + a)
elif isinstance(meta, list):
for i, a in enumerate(meta):
yield from self._scan_json_block(a, prefix + "[%d]" % (i))
else:
# BUG: Everything is cast to string, which can lead to false positives
# for cross type comparisions, ie "True" == True
yield prefix, ("%s" % (meta)).encode("utf8", errors='replace')
[docs] @expose_api
def update(self, trans, id, library_id, payload, **kwd):
"""
PUT /api/libraries/{library_id}/contents/{id}
Create an ImplicitlyConvertedDatasetAssociation.
.. seealso:: :class:`galaxy.model.ImplicitlyConvertedDatasetAssociation`
:type id: str
:param id: the encoded id of the library item to return
:type library_id: str
:param library_id: the encoded id of the library that contains this item
:type payload: dict
:param payload: dictionary structure containing::
'converted_dataset_id':
:rtype: None
:returns: None
"""
if 'converted_dataset_id' in payload:
converted_id = payload.pop('converted_dataset_id')
content = self.get_library_dataset(trans, id, check_ownership=False, check_accessible=False)
content_conv = self.get_library_dataset(trans, converted_id, check_ownership=False, check_accessible=False)
assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation(parent=content.library_dataset_dataset_association,
dataset=content_conv.library_dataset_dataset_association,
file_type=content_conv.library_dataset_dataset_association.extension,
metadata_safe=True)
trans.sa_session.add(assoc)
trans.sa_session.flush()
def _decode_library_content_id(self, content_id):
if len(content_id) % 16 == 0:
return 'LibraryDataset', content_id
elif content_id.startswith('F'):
return 'LibraryFolder', content_id[1:]
else:
raise HTTPBadRequest('Malformed library content id ( %s ) specified, unable to decode.' % str(content_id))
[docs] @expose_api
def delete(self, trans, library_id, id, **kwd):
"""
DELETE /api/libraries/{library_id}/contents/{id}
Delete the LibraryDataset with the given ``id``.
:type id: str
:param id: the encoded id of the library dataset to delete
:type kwd: dict
:param kwd: (optional) dictionary structure containing:
* payload: a dictionary itself containing:
* purge: if True, purge the LD
:rtype: dict
:returns: an error object if an error occurred or a dictionary containing:
* id: the encoded id of the library dataset,
* deleted: if the library dataset was marked as deleted,
* purged: if the library dataset was purged
"""
purge = False
if kwd.get('payload', None):
purge = util.string_as_bool(kwd['payload'].get('purge', False))
rval = {'id': id}
try:
ld = self.get_library_dataset(trans, id, check_ownership=False, check_accessible=True)
user_is_admin = trans.user_is_admin
can_modify = trans.app.security_agent.can_modify_library_item(trans.user.all_roles(), ld)
log.debug('is_admin: %s, can_modify: %s', user_is_admin, can_modify)
if not (user_is_admin or can_modify):
trans.response.status = 403
rval.update({'error': 'Unauthorized to delete or purge this library dataset'})
return rval
ld.deleted = True
if purge:
ld.purged = True
trans.sa_session.add(ld)
trans.sa_session.flush()
# TODO: had to change this up a bit from Dataset.user_can_purge
dataset = ld.library_dataset_dataset_association.dataset
no_history_assoc = len(dataset.history_associations) == len(dataset.purged_history_associations)
no_library_assoc = dataset.library_associations == [ld.library_dataset_dataset_association]
can_purge_dataset = not dataset.purged and no_history_assoc and no_library_assoc
if can_purge_dataset:
try:
ld.library_dataset_dataset_association.dataset.full_delete()
trans.sa_session.add(ld.dataset)
except Exception:
pass
# flush now to preserve deleted state in case of later interruption
trans.sa_session.flush()
rval['purged'] = True
trans.sa_session.flush()
rval['deleted'] = True
except exceptions.httpexceptions.HTTPInternalServerError:
log.exception('Library_contents API, delete: uncaught HTTPInternalServerError: %s, %s',
id, str(kwd))
raise
except exceptions.httpexceptions.HTTPException:
raise
except Exception as exc:
log.exception('library_contents API, delete: uncaught exception: %s, %s',
id, str(kwd))
trans.response.status = 500
rval.update({'error': util.unicodify(exc)})
return rval