Source code for galaxy.webapps.galaxy.api.library_contents

"""
API operations on the contents of a data library.
"""

import logging
from typing import Optional

from galaxy import (
    exceptions,
    managers,
    util,
)
from galaxy.actions.library import (
    LibraryActions,
    validate_path_upload,
)
from galaxy.managers.collections_util import (
    api_payload_to_create_params,
    dictify_dataset_collection_instance,
)
from galaxy.model import (
    ExtendedMetadata,
    ExtendedMetadataIndex,
    Library,
    LibraryDataset,
    LibraryFolder,
    tags,
)
from galaxy.model.base import transaction
from galaxy.structured_app import StructuredApp
from galaxy.web import expose_api
from galaxy.webapps.base.controller import (
    HTTPBadRequest,
    url_for,
    UsesFormDefinitionsMixin,
    UsesLibraryMixinItems,
)
from galaxy.webapps.galaxy.api import BaseGalaxyAPIController

log = logging.getLogger(__name__)


[docs]class LibraryContentsController( BaseGalaxyAPIController, UsesLibraryMixinItems, UsesFormDefinitionsMixin, LibraryActions ):
[docs] def __init__(self, app: StructuredApp, hda_manager: managers.hdas.HDAManager): super().__init__(app) self.hda_manager = hda_manager
[docs] @expose_api def index(self, trans, library_id, **kwd): """ GET /api/libraries/{library_id}/contents: Return a list of library files and folders. .. note:: This endpoint is slow for large libraries. Returns all content traversing recursively through all folders. .. seealso:: :class:`galaxy.webapps.galaxy.api.FolderContentsController.index` for a faster non-recursive solution :param library_id: the encoded id of the library :type library_id: str :returns: list of dictionaries of the form: * id: the encoded id of the library item * name: the 'library path' or relationship of the library item to the root * type: 'file' or 'folder' * url: the url to get detailed information on the library item :rtype: list :raises: MalformedId, InconsistentDatabase, RequestParameterInvalidException, InternalServerError """ rval = [] current_user_roles = trans.get_current_user_roles() def traverse(folder): admin = trans.user_is_admin rval = [] for subfolder in folder.active_folders: if not admin: can_access, folder_ids = trans.app.security_agent.check_folder_contents( trans.user, current_user_roles, subfolder ) if (admin or can_access) and not subfolder.deleted: subfolder.api_path = f"{folder.api_path}/{subfolder.name}" subfolder.api_type = "folder" rval.append(subfolder) rval.extend(traverse(subfolder)) for ld in folder.datasets: if not admin: can_access = trans.app.security_agent.can_access_dataset( current_user_roles, ld.library_dataset_dataset_association.dataset ) if (admin or can_access) and not ld.deleted: ld.api_path = f"{folder.api_path}/{ld.name}" ld.api_type = "file" rval.append(ld) return rval library = trans.sa_session.get(Library, self.decode_id(library_id)) if not library: raise exceptions.RequestParameterInvalidException("No library found with the id provided.") if not (trans.user_is_admin or trans.app.security_agent.can_access_library(current_user_roles, library)): raise exceptions.RequestParameterInvalidException("No library found with the id provided.") encoded_id = f"F{trans.security.encode_id(library.root_folder.id)}" # appending root folder rval.append( dict( id=encoded_id, type="folder", name="/", url=url_for("library_content", library_id=library_id, id=encoded_id), ) ) library.root_folder.api_path = "" # appending all other items in the library recursively for content in traverse(library.root_folder): encoded_id = trans.security.encode_id(content.id) if content.api_type == "folder": encoded_id = f"F{encoded_id}" rval.append( dict( id=encoded_id, type=content.api_type, name=content.api_path, url=url_for( "library_content", library_id=library_id, id=encoded_id, ), ) ) return rval
[docs] @expose_api def show(self, trans, id, library_id, **kwd): """ GET /api/libraries/{library_id}/contents/{id} Returns information about library file or folder. :param id: the encoded id of the library item to return :type id: str :param library_id: the encoded id of the library that contains this item :type library_id: str :returns: detailed library item information :rtype: dict .. seealso:: :func:`galaxy.model.LibraryDataset.to_dict` and :attr:`galaxy.model.LibraryFolder.dict_element_visible_keys` """ class_name, content_id = self._decode_library_content_id(id) if class_name == "LibraryFolder": content = self.get_library_folder(trans, content_id, check_ownership=False, check_accessible=True) rval = content.to_dict(view="element", value_mapper={"id": trans.security.encode_id}) rval["id"] = f"F{str(rval['id'])}" if rval["parent_id"] is not None: # This can happen for root folders. rval["parent_id"] = f"F{str(trans.security.encode_id(rval['parent_id']))}" rval["parent_library_id"] = trans.security.encode_id(rval["parent_library_id"]) else: content = self.get_library_dataset(trans, content_id, check_ownership=False, check_accessible=True) rval = content.to_dict(view="element") rval["id"] = trans.security.encode_id(rval["id"]) rval["ldda_id"] = trans.security.encode_id(rval["ldda_id"]) rval["folder_id"] = f"F{str(trans.security.encode_id(rval['folder_id']))}" rval["parent_library_id"] = trans.security.encode_id(rval["parent_library_id"]) tag_manager = tags.GalaxyTagHandler(trans.sa_session) rval["tags"] = tag_manager.get_tags_list(content.library_dataset_dataset_association.tags) return rval
[docs] @expose_api def create(self, trans, library_id, payload, **kwd): """ POST /api/libraries/{library_id}/contents: Create a new library file or folder. To copy an HDA into a library send ``create_type`` of 'file' and the HDA's encoded id in ``from_hda_id`` (and optionally ``ldda_message``). To copy an HDCA into a library send ``create_type`` of 'file' and the HDCA's encoded id in ``from_hdca_id`` (and optionally ``ldda_message``). :type library_id: str :param library_id: the encoded id of the library where to create the new item :type payload: dict :param payload: dictionary structure containing: * folder_id: the encoded id of the parent folder of the new item * create_type: the type of item to create ('file', 'folder' or 'collection') * from_hda_id: (optional, only if create_type is 'file') the encoded id of an accessible HDA to copy into the library * ldda_message: (optional) the new message attribute of the LDDA created * extended_metadata: (optional) sub-dictionary containing any extended metadata to associate with the item * upload_option: (optional) one of 'upload_file' (default), 'upload_directory' or 'upload_paths' * server_dir: (optional, only if upload_option is 'upload_directory') relative path of the subdirectory of Galaxy ``library_import_dir`` (if admin) or ``user_library_import_dir`` (if non-admin) to upload. All and only the files (i.e. no subdirectories) contained in the specified directory will be uploaded. * filesystem_paths: (optional, only if upload_option is 'upload_paths' and the user is an admin) file paths on the Galaxy server to upload to the library, one file per line * link_data_only: (optional, only when upload_option is 'upload_directory' or 'upload_paths') either 'copy_files' (default) or 'link_to_files'. Setting to 'link_to_files' symlinks instead of copying the files * name: (optional, only if create_type is 'folder') name of the folder to create * description: (optional, only if create_type is 'folder') description of the folder to create * tag_using_filenames: (optional) create tags on datasets using the file's original name * tags: (optional) create the given list of tags on datasets :returns: a dictionary describing the new item unless ``from_hdca_id`` is supplied, in that case a list of such dictionaries is returned. :rtype: object """ if trans.user_is_bootstrap_admin: raise exceptions.RealUserRequiredException("Only real users can create a new library file or folder.") if "create_type" not in payload: raise exceptions.RequestParameterMissingException("Missing required 'create_type' parameter.") create_type = payload.pop("create_type") if create_type not in ("file", "folder", "collection"): raise exceptions.RequestParameterInvalidException( f"Invalid value for 'create_type' parameter ( {create_type} ) specified." ) if "upload_option" in payload and payload["upload_option"] not in ( "upload_file", "upload_directory", "upload_paths", ): raise exceptions.RequestParameterInvalidException( f"Invalid value for 'upload_option' parameter ( {payload['upload_option']} ) specified." ) if "folder_id" not in payload: raise exceptions.RequestParameterMissingException("Missing required 'folder_id' parameter.") folder_id = payload.pop("folder_id") _, folder_id = self._decode_library_content_id(folder_id) folder_id = trans.security.decode_id(folder_id) # security is checked in the downstream controller parent = self.get_library_folder(trans, folder_id, check_ownership=False, check_accessible=False) # The rest of the security happens in the library_common controller. payload["tag_using_filenames"] = util.string_as_bool(payload.get("tag_using_filenames", None)) payload["tags"] = util.listify(payload.get("tags", None)) # are we copying an HDA to the library folder? # we'll need the id and any message to attach, then branch to that private function from_hda_id, from_hdca_id, ldda_message = ( payload.pop("from_hda_id", None), payload.pop("from_hdca_id", None), payload.pop("ldda_message", ""), ) if create_type == "file": if from_hda_id: return self._copy_hda_to_library_folder( trans, self.hda_manager, self.decode_id(from_hda_id), folder_id, ldda_message ) if from_hdca_id: return self._copy_hdca_to_library_folder( trans, self.hda_manager, self.decode_id(from_hdca_id), folder_id, ldda_message ) # check for extended metadata, store it and pop it out of the param # otherwise sanitize_param will have a fit ex_meta_payload = payload.pop("extended_metadata", None) # Now create the desired content object, either file or folder. if create_type == "file": status, output = self._upload_library_dataset(trans, folder_id, **payload) elif create_type == "folder": status, output = self._create_folder(trans, folder_id, **payload) elif create_type == "collection": # Not delegating to library_common, so need to check access to parent # folder here. self.check_user_can_add_to_library_item(trans, parent, check_accessible=True) create_params = api_payload_to_create_params(payload) create_params["parent"] = parent dataset_collection_manager = trans.app.dataset_collection_manager dataset_collection_instance = dataset_collection_manager.create(**create_params) return [ dictify_dataset_collection_instance( dataset_collection_instance, security=trans.security, url_builder=trans.url_builder, parent=parent ) ] if status != 200: trans.response.status = status return output else: rval = [] for v in output.values(): if ex_meta_payload is not None: # If there is extended metadata, store it, attach it to the dataset, and index it ex_meta = ExtendedMetadata(ex_meta_payload) trans.sa_session.add(ex_meta) v.extended_metadata = ex_meta trans.sa_session.add(v) with transaction(trans.sa_session): trans.sa_session.commit() for path, value in self._scan_json_block(ex_meta_payload): meta_i = ExtendedMetadataIndex(ex_meta, path, value) trans.sa_session.add(meta_i) with transaction(trans.sa_session): trans.sa_session.commit() if isinstance(v, trans.app.model.LibraryDatasetDatasetAssociation): v = v.library_dataset encoded_id = trans.security.encode_id(v.id) if create_type == "folder": encoded_id = f"F{encoded_id}" rval.append( dict( id=encoded_id, name=v.name, url=url_for("library_content", library_id=library_id, id=encoded_id) ) ) return rval
def _upload_library_dataset(self, trans, folder_id: int, **kwd): replace_dataset: Optional[LibraryDataset] = None upload_option = kwd.get("upload_option", "upload_file") dbkey = kwd.get("dbkey", "?") if isinstance(dbkey, list): last_used_build = dbkey[0] else: last_used_build = dbkey is_admin = trans.user_is_admin current_user_roles = trans.get_current_user_roles() folder = trans.sa_session.get(LibraryFolder, folder_id) self._check_access(trans, is_admin, folder, current_user_roles) self._check_add(trans, is_admin, folder, current_user_roles) library = folder.parent_library if folder and last_used_build in ["None", None, "?"]: last_used_build = folder.genome_build error = False if upload_option == "upload_paths": validate_path_upload(trans) # Duplicate check made in _upload_dataset. elif roles := kwd.get("roles", ""): # Check to see if the user selected roles to associate with the DATASET_ACCESS permission # on the dataset that would cause accessibility issues. vars = dict(DATASET_ACCESS_in=roles) permissions, in_roles, error, message = trans.app.security_agent.derive_roles_from_access( trans, library.id, "api", library=True, **vars ) if error: return 400, message else: created_outputs_dict = self._upload_dataset( trans, folder_id=folder.id, replace_dataset=replace_dataset, **kwd ) if created_outputs_dict: if isinstance(created_outputs_dict, str): return 400, created_outputs_dict elif isinstance(created_outputs_dict, tuple): return created_outputs_dict[0], created_outputs_dict[1] return 200, created_outputs_dict else: return 400, "Upload failed" def _scan_json_block(self, meta, prefix=""): """ Scan a json style data structure, and emit all fields and their values. Example paths Data { "data" : [ 1, 2, 3 ] } Path: /data == [1,2,3] /data/[0] == 1 """ if isinstance(meta, dict): for a in meta: yield from self._scan_json_block(meta[a], f"{prefix}/{a}") elif isinstance(meta, list): for i, a in enumerate(meta): yield from self._scan_json_block(a, prefix + "[%d]" % (i)) else: # BUG: Everything is cast to string, which can lead to false positives # for cross type comparisions, ie "True" == True yield prefix, (f"{meta}").encode()
[docs] @expose_api def update(self, trans, id, library_id, payload, **kwd): """ PUT /api/libraries/{library_id}/contents/{id} Create an ImplicitlyConvertedDatasetAssociation. .. seealso:: :class:`galaxy.model.ImplicitlyConvertedDatasetAssociation` :type id: str :param id: the encoded id of the library item to return :type library_id: str :param library_id: the encoded id of the library that contains this item :type payload: dict :param payload: dictionary structure containing:: 'converted_dataset_id': :rtype: None :returns: None """ if "converted_dataset_id" in payload: converted_id = payload.pop("converted_dataset_id") content = self.get_library_dataset(trans, id, check_ownership=False, check_accessible=False) content_conv = self.get_library_dataset(trans, converted_id, check_ownership=False, check_accessible=False) assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation( parent=content.library_dataset_dataset_association, dataset=content_conv.library_dataset_dataset_association, file_type=content_conv.library_dataset_dataset_association.extension, metadata_safe=True, ) trans.sa_session.add(assoc) with transaction(trans.sa_session): trans.sa_session.commit()
def _decode_library_content_id(self, content_id): if len(content_id) % 16 == 0: return "LibraryDataset", content_id elif content_id.startswith("F"): return "LibraryFolder", content_id[1:] else: raise HTTPBadRequest(f"Malformed library content id ( {str(content_id)} ) specified, unable to decode.")
[docs] @expose_api def delete(self, trans, library_id, id, **kwd): """ DELETE /api/libraries/{library_id}/contents/{id} Delete the LibraryDataset with the given ``id``. :type id: str :param id: the encoded id of the library dataset to delete :type kwd: dict :param kwd: (optional) dictionary structure containing: * payload: a dictionary itself containing: * purge: if True, purge the LD :rtype: dict :returns: an error object if an error occurred or a dictionary containing: * id: the encoded id of the library dataset, * deleted: if the library dataset was marked as deleted, * purged: if the library dataset was purged """ purge = False if kwd.get("payload", None): purge = util.string_as_bool(kwd["payload"].get("purge", False)) rval = {"id": id} try: ld = self.get_library_dataset(trans, id, check_ownership=False, check_accessible=True) user_is_admin = trans.user_is_admin can_modify = trans.app.security_agent.can_modify_library_item(trans.user.all_roles(), ld) log.debug("is_admin: %s, can_modify: %s", user_is_admin, can_modify) if not (user_is_admin or can_modify): trans.response.status = 403 rval.update({"error": "Unauthorized to delete or purge this library dataset"}) return rval ld.deleted = True if purge: ld.purged = True trans.sa_session.add(ld) with transaction(trans.sa_session): trans.sa_session.commit() # TODO: had to change this up a bit from Dataset.user_can_purge dataset = ld.library_dataset_dataset_association.dataset no_history_assoc = len(dataset.history_associations) == len(dataset.purged_history_associations) no_library_assoc = dataset.library_associations == [ld.library_dataset_dataset_association] can_purge_dataset = not dataset.purged and no_history_assoc and no_library_assoc if can_purge_dataset: try: ld.library_dataset_dataset_association.dataset.full_delete() trans.sa_session.add(ld.dataset) except Exception: pass # flush now to preserve deleted state in case of later interruption with transaction(trans.sa_session): trans.sa_session.commit() rval["purged"] = True with transaction(trans.sa_session): trans.sa_session.commit() rval["deleted"] = True except exceptions.httpexceptions.HTTPInternalServerError: log.exception("Library_contents API, delete: uncaught HTTPInternalServerError: %s, %s", id, str(kwd)) raise except exceptions.httpexceptions.HTTPException: raise except Exception as exc: log.exception("library_contents API, delete: uncaught exception: %s, %s", id, str(kwd)) trans.response.status = 500 rval.update({"error": util.unicodify(exc)}) return rval