Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.datasets

"""
API operations on the contents of a history dataset.
"""
import logging
from io import (
    BytesIO,
    IOBase,
)
from typing import (
    Any,
    cast,
    Dict,
    List,
    Optional,
)

from fastapi import (
    Body,
    Depends,
    Path,
    Query,
    Request,
)
from starlette.responses import StreamingResponse

from galaxy.schema import (
    FilterQueryParams,
    SerializationParams,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.schema import (
    AnyHDA,
    AnyHistoryContentItem,
    AsyncTaskResultSummary,
    DatasetAssociationRoles,
    DatasetSourceType,
    UpdateDatasetPermissionsPayload,
)
from galaxy.util.zipstream import ZipstreamWrapper
from galaxy.webapps.base.api import GalaxyFileResponse
from galaxy.webapps.galaxy.api import (
    depends,
    DependsOnTrans,
    Router,
)
from galaxy.webapps.galaxy.api.common import (
    get_filter_query_params,
    get_query_parameters_from_request_excluding,
    get_update_permission_payload,
    query_serialization_params,
)
from galaxy.webapps.galaxy.services.datasets import (
    ComputeDatasetHashPayload,
    ConvertedDatasetsMap,
    DatasetInheritanceChain,
    DatasetsService,
    DatasetStorageDetails,
    DatasetTextContentDetails,
    DeleteDatasetBatchPayload,
    DeleteDatasetBatchResult,
    RequestDataType,
)

log = logging.getLogger(__name__)

router = Router(tags=["datasets"])

DatasetIDPathParam: DecodedDatabaseIdField = Path(..., description="The encoded database identifier of the dataset.")

HistoryIDPathParam: DecodedDatabaseIdField = Path(..., description="The encoded database identifier of the History.")

DatasetSourceQueryParam: DatasetSourceType = Query(
    default=DatasetSourceType.hda,
    description="Whether this dataset belongs to a history (HDA) or a library (LDDA).",
)

PreviewQueryParam = Query(
    default=False,
    description=(
        "Whether to get preview contents to be directly displayed on the web. "
        "If preview is False (default) the contents will be downloaded instead."
    ),
)

FilenameQueryParam = Query(
    default=None,
    description="If non-null, get the specified filename from the extra files for this dataset.",
)

ToExtQueryParam = Query(
    default=None,
    description=(
        "The file extension when downloading the display data. Use the value `data` to "
        "let the server infer it from the data type."
    ),
)

RawQueryParam = Query(
    default=False,
    description=(
        "The query parameter 'raw' should be considered experimental and may be dropped at "
        "some point in the future without warning. Generally, data should be processed by its "
        "datatype prior to display."
    ),
)


[docs]@router.cbv class FastAPIDatasets: service: DatasetsService = depends(DatasetsService)
[docs] @router.get( "/api/datasets", summary="Search datasets or collections using a query system.", ) def index( self, trans=DependsOnTrans, history_id: Optional[DecodedDatabaseIdField] = Query( default=None, description="Optional identifier of a History. Use it to restrict the search within a particular History.", ), serialization_params: SerializationParams = Depends(query_serialization_params), filter_query_params: FilterQueryParams = Depends(get_filter_query_params), ) -> List[AnyHistoryContentItem]: return self.service.index(trans, history_id, serialization_params, filter_query_params)
[docs] @router.get( "/api/datasets/{dataset_id}/storage", summary="Display user-facing storage details related to the objectstore a dataset resides in.", ) def show_storage( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, hda_ldda: DatasetSourceType = DatasetSourceQueryParam, ) -> DatasetStorageDetails: return self.service.show_storage(trans, dataset_id, hda_ldda)
[docs] @router.get( "/api/datasets/{dataset_id}/inheritance_chain", summary="For internal use, this endpoint may change without warning.", include_in_schema=True, # Can be changed to False if we don't really want to expose this ) def show_inheritance_chain( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, hda_ldda: DatasetSourceType = DatasetSourceQueryParam, ) -> DatasetInheritanceChain: return self.service.show_inheritance_chain(trans, dataset_id, hda_ldda)
[docs] @router.get( "/api/datasets/{dataset_id}/get_content_as_text", summary="Returns dataset content as Text.", ) def get_content_as_text( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, ) -> DatasetTextContentDetails: return self.service.get_content_as_text(trans, dataset_id)
[docs] @router.get( "/api/datasets/{dataset_id}/converted/{ext}", summary="Return information about datasets made by converting this dataset to a new format.", ) def converted_ext( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, ext: str = Path( ..., description="File extension of the new format to convert this dataset to.", ), serialization_params: SerializationParams = Depends(query_serialization_params), ) -> AnyHDA: """ Return information about datasets made by converting this dataset to a new format. If there is no existing converted dataset for the format in `ext`, one will be created. **Note**: `view` and `keys` are also available to control the serialization of the dataset. """ return self.service.converted_ext(trans, dataset_id, ext, serialization_params)
[docs] @router.get( "/api/datasets/{dataset_id}/converted", summary=("Return a a map with all the existing converted datasets associated with this instance."), ) def converted( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, ) -> ConvertedDatasetsMap: """ Return a map of `<converted extension> : <converted id>` containing all the *existing* converted datasets. """ return self.service.converted(trans, dataset_id)
[docs] @router.put( "/api/datasets/{dataset_id}/permissions", summary="Set permissions of the given history dataset to the given role ids.", ) def update_permissions( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, # Using a generic Dict here as an attempt on supporting multiple aliases for the permissions params. payload: Dict[str, Any] = Body( default=..., example=UpdateDatasetPermissionsPayload(), ), ) -> DatasetAssociationRoles: """Set permissions of the given history dataset to the given role ids.""" update_payload = get_update_permission_payload(payload) return self.service.update_permissions(trans, dataset_id, update_payload)
[docs] @router.get( "/api/histories/{history_id}/contents/{history_content_id}/extra_files", summary="Generate list of extra files.", tags=["histories"], ) def extra_files( self, trans=DependsOnTrans, history_id: DecodedDatabaseIdField = HistoryIDPathParam, history_content_id: DecodedDatabaseIdField = DatasetIDPathParam, ): return self.service.extra_files(trans, history_content_id)
[docs] @router.get( "/api/histories/{history_id}/contents/{history_content_id}/display", name="history_contents_display", summary="Displays (preview) or downloads dataset content.", tags=["histories"], response_class=StreamingResponse, ) @router.head( "/api/histories/{history_id}/contents/{history_content_id}/display", name="history_contents_display", summary="Check if dataset content can be previewed or downloaded.", tags=["histories"], ) def display_history_content( self, request: Request, trans=DependsOnTrans, history_id: Optional[DecodedDatabaseIdField] = Path( description="The encoded database identifier of the History.", ), history_content_id: DecodedDatabaseIdField = DatasetIDPathParam, preview: bool = PreviewQueryParam, filename: Optional[str] = FilenameQueryParam, to_ext: Optional[str] = ToExtQueryParam, raw: bool = RawQueryParam, ): """Streams the dataset for download or the contents preview to be displayed in a browser.""" return self._display(request, trans, history_content_id, preview, filename, to_ext, raw)
[docs] @router.get( "/api/datasets/{history_content_id}/display", summary="Displays (preview) or downloads dataset content.", response_class=StreamingResponse, ) @router.head( "/api/datasets/{history_content_id}/display", summary="Check if dataset content can be previewed or downloaded.", ) def display( self, request: Request, trans=DependsOnTrans, history_content_id: DecodedDatabaseIdField = DatasetIDPathParam, preview: bool = PreviewQueryParam, filename: Optional[str] = FilenameQueryParam, to_ext: Optional[str] = ToExtQueryParam, raw: bool = RawQueryParam, ): """Streams the dataset for download or the contents preview to be displayed in a browser.""" return self._display(request, trans, history_content_id, preview, filename, to_ext, raw)
def _display( self, request: Request, trans, history_content_id: DecodedDatabaseIdField, preview: bool, filename: Optional[str], to_ext: Optional[str], raw: bool, ): extra_params = get_query_parameters_from_request_excluding( request, {"preview", "filename", "to_ext", "raw", "dataset"} ) display_data, headers = self.service.display( trans, history_content_id, preview=preview, filename=filename, to_ext=to_ext, raw=raw, **extra_params ) if isinstance(display_data, IOBase): file_name = getattr(display_data, "name", None) if file_name: return GalaxyFileResponse(file_name, headers=headers, method=request.method) elif isinstance(display_data, ZipstreamWrapper): return StreamingResponse(display_data.response(), headers=headers) elif isinstance(display_data, bytes): return StreamingResponse(BytesIO(display_data), headers=headers) return StreamingResponse(display_data, headers=headers)
[docs] @router.get( "/api/histories/{history_id}/contents/{history_content_id}/metadata_file", summary="Returns the metadata file associated with this history item.", name="get_metadata_file", tags=["histories"], operation_id="history_contents__get_metadata_file", response_class=GalaxyFileResponse, ) def get_metadata_file_history_content( self, trans=DependsOnTrans, history_id: DecodedDatabaseIdField = Path( description="The encoded database identifier of the History.", ), history_content_id: DecodedDatabaseIdField = DatasetIDPathParam, metadata_file: str = Query( ..., description="The name of the metadata file to retrieve.", ), ): return self._get_metadata_file(trans, history_content_id, metadata_file)
[docs] @router.get( "/api/datasets/{history_content_id}/metadata_file", summary="Returns the metadata file associated with this history item.", response_class=GalaxyFileResponse, operation_id="datasets__get_metadata_file", ) def get_metadata_file_datasets( self, trans=DependsOnTrans, history_content_id: DecodedDatabaseIdField = DatasetIDPathParam, metadata_file: str = Query( ..., description="The name of the metadata file to retrieve.", ), ): return self._get_metadata_file(trans, history_content_id, metadata_file)
def _get_metadata_file( self, trans, history_content_id: DecodedDatabaseIdField, metadata_file: str, ) -> GalaxyFileResponse: metadata_file_path, headers = self.service.get_metadata_file(trans, history_content_id, metadata_file) return GalaxyFileResponse(path=cast(str, metadata_file_path), headers=headers)
[docs] @router.get( "/api/datasets/{dataset_id}", summary="Displays information about and/or content of a dataset.", ) def show( self, request: Request, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, hda_ldda: DatasetSourceType = Query( default=DatasetSourceType.hda, description=("The type of information about the dataset to be requested."), ), data_type: Optional[RequestDataType] = Query( default=None, description=( "The type of information about the dataset to be requested. " "Each of these values may require additional parameters in the request and " "may return different responses." ), ), serialization_params: SerializationParams = Depends(query_serialization_params), ): """ **Note**: Due to the multipurpose nature of this endpoint, which can receive a wild variety of parameters and return different kinds of responses, the documentation here will be limited. To get more information please check the source code. """ exclude_params = {"hda_ldda", "data_type"} exclude_params.update(SerializationParams.__fields__.keys()) extra_params = get_query_parameters_from_request_excluding(request, exclude_params) return self.service.show(trans, dataset_id, hda_ldda, serialization_params, data_type, **extra_params)
[docs] @router.delete( "/api/datasets", summary="Deletes or purges a batch of datasets.", ) def delete_batch( self, trans=DependsOnTrans, payload: DeleteDatasetBatchPayload = Body(...), ) -> DeleteDatasetBatchResult: """ Deletes or purges a batch of datasets. **Warning**: only the ownership of the datasets (and upload state for HDAs) is checked, no other checks or restrictions are made. """ return self.service.delete_batch(trans, payload)
[docs] @router.put( "/api/datasets/{dataset_id}/hash", summary="Compute dataset hash for dataset and update model", ) def compute_hash( self, trans=DependsOnTrans, dataset_id: DecodedDatabaseIdField = DatasetIDPathParam, hda_ldda: DatasetSourceType = DatasetSourceQueryParam, payload: ComputeDatasetHashPayload = Body(...), ) -> AsyncTaskResultSummary: return self.service.compute_hash(trans, dataset_id, payload, hda_ldda=hda_ldda)