Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web.short_term_storage
import abc
import contextlib
import json
import os
import shutil
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Dict,
Optional,
Union,
)
from uuid import uuid4
from galaxy.exceptions import (
InternalServerError,
MessageException,
NoContentException,
)
from galaxy.exceptions.error_codes import error_codes_by_int_code
from galaxy.util import (
directory_hash_id,
is_uuid,
safe_makedirs,
)
from galaxy.web.framework.decorators import api_error_message
now = datetime.utcnow
DEFAULT_STORAGE_DURATION = 24 * 60 * 60 # store for a day by default
OptionalNumberT = Optional[Union[int, float]]
[docs]@dataclass
class ShortTermStorageConfiguration:
short_term_storage_directory: str
default_storage_duration: OptionalNumberT = None
maximum_storage_duration: OptionalNumberT = None
[docs]@dataclass
class ShortTermStorageTargetSecurity:
user_id: Optional[int] = None
session_id: Optional[int] = None
[docs] def to_dict(self) -> Dict[str, Optional[int]]:
return {
"user_id": self.user_id,
"session_id": self.session_id,
}
[docs] @classmethod
def from_dict(self, as_dict: Dict[str, Optional[int]]) -> "ShortTermStorageTargetSecurity":
return ShortTermStorageTargetSecurity(
user_id=as_dict.get("user_id"),
session_id=as_dict.get("session_id"),
)
[docs]@dataclass
class ShortTermStorageTarget:
request_id: str # uuid
raw_path: str
@property
def path(self):
return Path(self.raw_path)
[docs]@dataclass
class ShortTermStorageServeCompletedInformation:
target: ShortTermStorageTarget
mime_type: str
filename: str
security: ShortTermStorageTargetSecurity
[docs]@dataclass
class ShortTermStorageServeCancelledInformation:
target: ShortTermStorageTarget
status_code: int
exception: Optional[Dict[str, Any]]
@property
def message_exception(self) -> MessageException:
serialized_exception = self.exception
if not serialized_exception:
raise NoContentException()
exception_obj = MessageException()
exception_obj.status_code = self.status_code
exception_obj.err_code = error_codes_by_int_code[serialized_exception["err_code"]]
exception_obj.err_msg = serialized_exception["err_msg"]
return exception_obj
ShortTermStorageServeInformation = Union[
ShortTermStorageServeCompletedInformation, ShortTermStorageServeCancelledInformation
]
[docs]class ShortTermStorageAllocator(metaclass=abc.ABCMeta):
# TODO: Implement upstream_mod_zip=False, upstream_gzip=False - in initial request and serving...
[docs] @abc.abstractmethod
def new_target(
self,
filename: str,
mime_type: str,
duration: Optional[int] = None,
security: Optional[ShortTermStorageTargetSecurity] = None,
) -> ShortTermStorageTarget:
"""Return a new ShortTermStorageTarget for this short term file request."""
[docs]class ShortTermStorageMonitor(metaclass=abc.ABCMeta):
[docs] @abc.abstractmethod
def is_ready(self, target: ShortTermStorageTarget) -> bool:
"""Check if storage is ready."""
[docs] @abc.abstractmethod
def get_serve_info(self, target: ShortTermStorageTarget) -> ShortTermStorageServeInformation:
"""Get information required to serve this short term storage target."""
[docs] @abc.abstractmethod
def finalize(self, target: ShortTermStorageTarget) -> None:
"""Indicate the file is ready to be served."""
[docs] @abc.abstractmethod
def cancel(self, target: ShortTermStorageTarget, exception: Optional[MessageException] = None) -> None:
"""Store metadata for failed task.
Implementation is responsible for indicating target is finalized as well.
"""
[docs] @abc.abstractmethod
def target_path(self, target: ShortTermStorageTarget) -> Path:
"""Return fully qualified path on this server for specified target."""
[docs] @abc.abstractmethod
def recover_target(self, request_id: str) -> ShortTermStorageTarget:
"""Return an existing ShortTermStorageTarget from a specified request_id."""
[docs]class ShortTermStorageManager(ShortTermStorageAllocator, ShortTermStorageMonitor):
[docs] def new_target(
self,
filename: str,
mime_type: str,
duration: OptionalNumberT = None,
security: Optional[ShortTermStorageTargetSecurity] = None,
) -> ShortTermStorageTarget:
if security is None:
security = ShortTermStorageTargetSecurity()
request_id = str(uuid4())
target_directory = self._directory(request_id)
target = ShortTermStorageTarget(request_id=request_id, raw_path=str(target_directory / "target"))
safe_makedirs(target_directory)
duration = duration or self._config.default_storage_duration or DEFAULT_STORAGE_DURATION
maximum_storage_duration = self._config.maximum_storage_duration
if duration and maximum_storage_duration and duration > maximum_storage_duration:
duration = maximum_storage_duration
# optimize by placing the deletion time outside JSON as new file...
request_info = {
"filename": filename,
"mime_type": mime_type,
"duration": duration,
"created": str(now()),
"security": security.to_dict(),
}
self._store_metadata(target_directory, "request", request_info)
return target
[docs] def recover_target(self, request_id: str) -> ShortTermStorageTarget:
assert is_uuid(request_id) # secure the directory structure...
target_directory = self._directory(request_id)
target = ShortTermStorageTarget(request_id=request_id, raw_path=str(target_directory / "target"))
return target
[docs] def is_ready(self, target: ShortTermStorageTarget) -> bool:
"""Check if storage is ready."""
return self._finalized_path(target).exists()
[docs] def get_serve_info(self, target: ShortTermStorageTarget) -> ShortTermStorageServeInformation:
"""Get information required to serve this short term storage target."""
cancelled = self._cancelled_path(target)
serve_info: ShortTermStorageServeInformation
target_directory = self._directory(target)
if cancelled.exists():
exception_metadata = self._load_metadata(target_directory, "cancelled")
serve_info = ShortTermStorageServeCancelledInformation(
target=target,
status_code=exception_metadata["status_code"],
exception=exception_metadata["exception"],
)
else:
request_metadata = self._load_metadata(target_directory, "request")
serve_info = ShortTermStorageServeCompletedInformation(
target=target,
filename=request_metadata["filename"],
mime_type=request_metadata["mime_type"],
security=ShortTermStorageTargetSecurity.from_dict(request_metadata["security"]),
)
return serve_info
[docs] def cancel(self, target: ShortTermStorageTarget, exception: Optional[MessageException] = None):
"""Write metadata for failed task."""
if exception:
exception_json = {
"status_code": exception.status_code,
"exception": api_error_message(None, exception=exception),
}
else:
exception_json = {"status_code": 204, "exception": None} # NO CONTENT
self._store_metadata(self._directory(target), "cancelled", exception_json)
self.finalize(target)
[docs] def finalize(self, target: ShortTermStorageTarget) -> None:
"""Indicate the file is ready to be served."""
self._finalized_path(target).touch()
[docs] def target_path(self, target: ShortTermStorageTarget) -> Path:
return self._directory(target) / "target"
def _cancelled_path(self, target: ShortTermStorageTarget) -> Path:
return self._directory(target) / "cancelled.json"
def _finalized_path(self, target: ShortTermStorageTarget) -> Path:
return self._directory(target) / "finalized"
def _store_metadata(self, target_directory: Path, meta_name: str, meta_value: Any):
meta_path = target_directory / f"{meta_name}.json"
with open(meta_path, "w") as f:
json.dump(meta_value, f)
def _load_metadata(self, target_directory: Path, meta_name: str):
meta_path = target_directory / f"{meta_name}.json"
with open(meta_path, "r") as f:
return json.load(f)
def _directory(self, target: Union[str, ShortTermStorageTarget]) -> Path:
if isinstance(target, ShortTermStorageTarget):
request_id = target.request_id
else:
request_id = target
relative_directory = directory_hash_id(request_id) + [request_id]
return self._root.joinpath(*relative_directory)
def _cleanup_if_needed(self, request_id: str):
request_metadata = self._load_metadata(self._directory(request_id), "request")
duration = request_metadata["duration"]
creation_datetime_str = request_metadata["created"]
unprintStrptimeFmt = "%Y-%m-%d %H:%M:%S.%f"
creation_datetime = datetime.strptime(creation_datetime_str, unprintStrptimeFmt)
request_time = now() - creation_datetime
request_seconds = request_time.total_seconds()
if request_seconds > duration:
self._delete(request_id)
def _delete(self, request_id: str):
shutil.rmtree(self._directory(request_id))
[docs] def cleanup(self):
for directory in self._root.glob("*/*/*/*"):
request_id = os.path.basename(directory)
if not is_uuid(request_id):
continue
self._cleanup_if_needed(request_id)
@property
def _root(self) -> Path:
return Path(self._config.short_term_storage_directory)
[docs]@contextlib.contextmanager
def storage_context(short_term_storage_request_id: str, short_term_storage_monitor: ShortTermStorageMonitor):
target = short_term_storage_monitor.recover_target(short_term_storage_request_id)
try:
yield target
except MessageException as e:
short_term_storage_monitor.cancel(target, exception=e)
raise
except Exception as e:
short_term_storage_monitor.cancel(target, exception=InternalServerError(f"Unknown error: {e}"))
raise
short_term_storage_monitor.finalize(target)