Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web.short_term_storage

import abc
import contextlib
import json
import os
import shutil
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import (
    Any,
    Dict,
    Optional,
    Union,
)
from uuid import (
    UUID,
    uuid4,
)

from galaxy.exceptions import (
    InternalServerError,
    MessageException,
    NoContentException,
    ObjectNotFound,
)
from galaxy.exceptions.error_codes import error_codes_by_int_code
from galaxy.schema.schema import OptionalNumberT
from galaxy.util import (
    directory_hash_id,
    is_uuid,
    safe_makedirs,
)
from galaxy.web.framework.decorators import api_error_message

now = datetime.utcnow
DEFAULT_STORAGE_DURATION = 24 * 60 * 60  # store for a day by default


[docs]@dataclass class ShortTermStorageConfiguration: short_term_storage_directory: str default_storage_duration: OptionalNumberT = None maximum_storage_duration: OptionalNumberT = None
[docs]@dataclass class ShortTermStorageTargetSecurity: user_id: Optional[int] = None session_id: Optional[int] = None
[docs] def to_dict(self) -> Dict[str, Optional[int]]: return { "user_id": self.user_id, "session_id": self.session_id, }
[docs] @classmethod def from_dict(self, as_dict: Dict[str, Optional[int]]) -> "ShortTermStorageTargetSecurity": return ShortTermStorageTargetSecurity( user_id=as_dict.get("user_id"), session_id=as_dict.get("session_id"), )
[docs]@dataclass class ShortTermStorageTarget: request_id: UUID raw_path: str duration: OptionalNumberT = None @property def path(self): return Path(self.raw_path)
[docs]@dataclass class ShortTermStorageServeCompletedInformation: target: ShortTermStorageTarget mime_type: str filename: str security: ShortTermStorageTargetSecurity
[docs]@dataclass class ShortTermStorageServeCancelledInformation: target: ShortTermStorageTarget status_code: int exception: Optional[Dict[str, Any]] @property def message_exception(self) -> MessageException: serialized_exception = self.exception if not serialized_exception: raise NoContentException() exception_obj = MessageException() exception_obj.status_code = self.status_code exception_obj.err_code = error_codes_by_int_code[serialized_exception["err_code"]] exception_obj.err_msg = serialized_exception["err_msg"] return exception_obj
ShortTermStorageServeInformation = Union[ ShortTermStorageServeCompletedInformation, ShortTermStorageServeCancelledInformation ]
[docs]class ShortTermStorageAllocator(metaclass=abc.ABCMeta): # TODO: Implement upstream_mod_zip=False, upstream_gzip=False - in initial request and serving...
[docs] @abc.abstractmethod def new_target( self, filename: str, mime_type: str, duration: Optional[int] = None, security: Optional[ShortTermStorageTargetSecurity] = None, ) -> ShortTermStorageTarget: """Return a new ShortTermStorageTarget for this short term file request."""
[docs]class ShortTermStorageMonitor(metaclass=abc.ABCMeta):
[docs] @abc.abstractmethod def is_ready(self, target: ShortTermStorageTarget) -> bool: """Check if storage is ready."""
[docs] @abc.abstractmethod def get_serve_info(self, target: ShortTermStorageTarget) -> ShortTermStorageServeInformation: """Get information required to serve this short term storage target."""
[docs] @abc.abstractmethod def finalize(self, target: ShortTermStorageTarget) -> None: """Indicate the file is ready to be served."""
[docs] @abc.abstractmethod def cancel(self, target: ShortTermStorageTarget, exception: Optional[MessageException] = None) -> None: """Store metadata for failed task. Implementation is responsible for indicating target is finalized as well. """
[docs] @abc.abstractmethod def target_path(self, target: ShortTermStorageTarget) -> Path: """Return fully qualified path on this server for specified target."""
[docs] @abc.abstractmethod def cleanup(self) -> None: """Cleanup old requests."""
[docs] @abc.abstractmethod def recover_target(self, request_id: UUID) -> ShortTermStorageTarget: """Return an existing ShortTermStorageTarget from a specified request_id."""
[docs]class ShortTermStorageManager(ShortTermStorageAllocator, ShortTermStorageMonitor):
[docs] def __init__(self, config: ShortTermStorageConfiguration): self._config = config
[docs] def new_target( self, filename: str, mime_type: str, duration: OptionalNumberT = None, security: Optional[ShortTermStorageTargetSecurity] = None, ) -> ShortTermStorageTarget: if security is None: security = ShortTermStorageTargetSecurity() request_id = uuid4() target_directory = self._directory(request_id) duration = duration or self._config.default_storage_duration or DEFAULT_STORAGE_DURATION maximum_storage_duration = self._config.maximum_storage_duration if duration and maximum_storage_duration and duration > maximum_storage_duration: duration = maximum_storage_duration target = ShortTermStorageTarget( request_id=request_id, raw_path=str(target_directory / "target"), duration=duration ) safe_makedirs(target_directory) # optimize by placing the deletion time outside JSON as new file... request_info = { "filename": filename, "mime_type": mime_type, "duration": duration, "created": str(now()), "security": security.to_dict(), } self._store_metadata(target_directory, "request", request_info) return target
[docs] def recover_target(self, request_id: UUID) -> ShortTermStorageTarget: target_directory = self._directory(request_id) target = ShortTermStorageTarget(request_id=request_id, raw_path=str(target_directory / "target")) return target
[docs] def is_ready(self, target: ShortTermStorageTarget) -> bool: """Check if storage is ready.""" return self._finalized_path(target).exists()
[docs] def get_serve_info(self, target: ShortTermStorageTarget) -> ShortTermStorageServeInformation: """Get information required to serve this short term storage target.""" cancelled = self._cancelled_path(target) serve_info: ShortTermStorageServeInformation target_directory = self._directory(target) if cancelled.exists(): exception_metadata = self._load_metadata(target_directory, "cancelled") serve_info = ShortTermStorageServeCancelledInformation( target=target, status_code=exception_metadata["status_code"], exception=exception_metadata["exception"], ) else: request_metadata = self._load_metadata(target_directory, "request") serve_info = ShortTermStorageServeCompletedInformation( target=target, filename=request_metadata["filename"], mime_type=request_metadata["mime_type"], security=ShortTermStorageTargetSecurity.from_dict(request_metadata["security"]), ) return serve_info
[docs] def cancel(self, target: ShortTermStorageTarget, exception: Optional[MessageException] = None): """Write metadata for failed task.""" if exception: exception_json = { "status_code": exception.status_code, "exception": api_error_message(None, exception=exception), } else: exception_json = {"status_code": 204, "exception": None} # NO CONTENT self._store_metadata(self._directory(target), "cancelled", exception_json) self.finalize(target)
[docs] def finalize(self, target: ShortTermStorageTarget) -> None: """Indicate the file is ready to be served.""" self._finalized_path(target).touch()
[docs] def target_path(self, target: ShortTermStorageTarget) -> Path: return self._directory(target) / "target"
def _cancelled_path(self, target: ShortTermStorageTarget) -> Path: return self._directory(target) / "cancelled.json" def _finalized_path(self, target: ShortTermStorageTarget) -> Path: return self._directory(target) / "finalized" def _store_metadata(self, target_directory: Path, meta_name: str, meta_value: Any): meta_path = target_directory / f"{meta_name}.json" with open(meta_path, "w") as f: json.dump(meta_value, f) def _load_metadata(self, target_directory: Path, meta_name: str): meta_path = target_directory / f"{meta_name}.json" if not meta_path.exists(): raise ObjectNotFound with open(meta_path) as f: return json.load(f) def _load_metadata_safe(self, target_directory: Path, meta_name: str): try: return self._load_metadata(target_directory, meta_name) except ObjectNotFound: return None def _directory(self, target: Union[UUID, ShortTermStorageTarget]) -> Path: if isinstance(target, ShortTermStorageTarget): request_id = target.request_id else: request_id = target relative_directory = directory_hash_id(request_id) + [str(request_id)] return self._root.joinpath(*relative_directory) def _cleanup_if_needed(self, request_id: UUID): target_directory = self._directory(request_id) if not target_directory.exists(): return # Nothing to clean request_metadata = self._load_metadata_safe(target_directory, "request") if request_metadata is None: # Delete if metadata is lost self._delete(request_id) return duration = request_metadata["duration"] creation_datetime_str = request_metadata["created"] unprintStrptimeFmt = "%Y-%m-%d %H:%M:%S.%f" creation_datetime = datetime.strptime(creation_datetime_str, unprintStrptimeFmt) request_time = now() - creation_datetime request_seconds = request_time.total_seconds() if request_seconds > duration: self._delete(request_id) def _delete(self, request_id: UUID): shutil.rmtree(self._directory(request_id), ignore_errors=True)
[docs] def cleanup(self): for directory in self._root.glob("*/*/*/*"): request_id = os.path.basename(directory) if not is_uuid(request_id): continue self._cleanup_if_needed(UUID(request_id))
@property def _root(self) -> Path: return Path(self._config.short_term_storage_directory)
[docs]@contextlib.contextmanager def storage_context(short_term_storage_request_id: UUID, short_term_storage_monitor: ShortTermStorageMonitor): target = short_term_storage_monitor.recover_target(short_term_storage_request_id) try: yield target except MessageException as e: short_term_storage_monitor.cancel(target, exception=e) raise except Exception as e: short_term_storage_monitor.cancel(target, exception=InternalServerError(f"Unknown error: {e}")) raise short_term_storage_monitor.finalize(target)