Source code for galaxy.webapps.galaxy.api.short_term_storage

"""
API operations around galaxy.web.short_term_storage infrastructure.
"""
from galaxy.web.short_term_storage import (
    ShortTermStorageMonitor,
    ShortTermStorageServeCancelledInformation,
    ShortTermStorageServeCompletedInformation,
)
from galaxy.webapps.base.api import GalaxyFileResponse
from . import (
    depends,
    Router,
)

router = Router(tags=["short_term_storage"])


[docs]@router.cbv class FastAPIShortTermStorage: # typing here is not ideal, mypy is the issue xref https://github.com/python/mypy/issues/5374 short_term_storage_monitor: ShortTermStorageMonitor = depends(ShortTermStorageMonitor) # type: ignore[misc]
[docs] @router.get( "/api/short_term_storage/{storage_request_id}/ready", summary="Determine if specified storage request ID is ready for download.", response_description="Boolean indicating if the storage is ready.", ) def is_ready(self, storage_request_id: str) -> bool: storage_target = self.short_term_storage_monitor.recover_target(storage_request_id) return self.short_term_storage_monitor.is_ready(storage_target)
[docs] @router.get( "/api/short_term_storage/{storage_request_id}", summary="Serve the staged download specified by request ID.", response_description="Raw contents of the file.", response_class=GalaxyFileResponse, responses={ 200: { "description": "The archive file containing the History.", }, 204: { "description": "Request was cancelled without an exception condition recorded.", }, }, ) def serve(self, storage_request_id: str): storage_target = self.short_term_storage_monitor.recover_target(storage_request_id) serve_info = self.short_term_storage_monitor.get_serve_info(storage_target) if isinstance(serve_info, ShortTermStorageServeCompletedInformation): return GalaxyFileResponse( path=serve_info.target.path, media_type=serve_info.mime_type, filename=serve_info.filename, ) assert isinstance(serve_info, ShortTermStorageServeCancelledInformation) raise serve_info.message_exception