Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.api.short_term_storage
"""
API operations around galaxy.web.short_term_storage infrastructure.
"""
from uuid import UUID
from galaxy.web.short_term_storage import (
ShortTermStorageMonitor,
ShortTermStorageServeCancelledInformation,
ShortTermStorageServeCompletedInformation,
)
from galaxy.webapps.base.api import GalaxyFileResponse
from . import (
depends,
Router,
)
router = Router(tags=["short_term_storage"])
[docs]@router.cbv
class FastAPIShortTermStorage:
short_term_storage_monitor: ShortTermStorageMonitor = depends(ShortTermStorageMonitor) # type: ignore[type-abstract] # https://github.com/python/mypy/issues/4717
[docs] @router.get(
"/api/short_term_storage/{storage_request_id}/ready",
summary="Determine if specified storage request ID is ready for download.",
response_description="Boolean indicating if the storage is ready.",
)
def is_ready(self, storage_request_id: UUID) -> bool:
storage_target = self.short_term_storage_monitor.recover_target(storage_request_id)
return self.short_term_storage_monitor.is_ready(storage_target)
[docs] @router.get(
"/api/short_term_storage/{storage_request_id}",
summary="Serve the staged download specified by request ID.",
response_description="Raw contents of the file.",
response_class=GalaxyFileResponse,
responses={
200: {
"description": "The archive file containing the History.",
},
204: {
"description": "Request was cancelled without an exception condition recorded.",
},
},
)
def serve(self, storage_request_id: UUID):
storage_target = self.short_term_storage_monitor.recover_target(storage_request_id)
serve_info = self.short_term_storage_monitor.get_serve_info(storage_target)
if isinstance(serve_info, ShortTermStorageServeCompletedInformation):
return GalaxyFileResponse(
path=serve_info.target.path,
media_type=serve_info.mime_type,
filename=serve_info.filename,
)
assert isinstance(serve_info, ShortTermStorageServeCancelledInformation)
raise serve_info.message_exception