Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.short_term_storage

"""
API operations around galaxy.web.short_term_storage infrastructure.
"""
from galaxy.web.short_term_storage import (
    ShortTermStorageMonitor,
    ShortTermStorageServeCancelledInformation,
    ShortTermStorageServeCompletedInformation,
)
from galaxy.webapps.base.api import GalaxyFileResponse
from . import (
    depends,
    Router,
)

router = Router(tags=["short_term_storage"])


[docs]@router.cbv class FastAPIShortTermStorage: # typing here is not ideal, mypy is the issue xref https://github.com/python/mypy/issues/5374 short_term_storage_monitor: ShortTermStorageMonitor = depends(ShortTermStorageMonitor) # type: ignore[misc]
[docs] @router.get( "/api/short_term_storage/{storage_request_id}/ready", summary="Determine if specified storage request ID is ready for download.", response_description="Boolean indicating if the storage is ready.", ) def is_ready(self, storage_request_id: str) -> bool: storage_target = self.short_term_storage_monitor.recover_target(storage_request_id) return self.short_term_storage_monitor.is_ready(storage_target)
[docs] @router.get( "/api/short_term_storage/{storage_request_id}", summary="Serve the staged download specified by request ID.", response_description="Raw contents of the file.", response_class=GalaxyFileResponse, responses={ 200: { "description": "The archive file containing the History.", }, 204: { "description": "Request was cancelled without an exception condition recorded.", }, }, ) def serve(self, storage_request_id: str): storage_target = self.short_term_storage_monitor.recover_target(storage_request_id) serve_info = self.short_term_storage_monitor.get_serve_info(storage_target) if isinstance(serve_info, ShortTermStorageServeCompletedInformation): return GalaxyFileResponse( path=serve_info.target.path, media_type=serve_info.mime_type, filename=serve_info.filename, ) assert isinstance(serve_info, ShortTermStorageServeCancelledInformation) raise serve_info.message_exception