galaxy.web.short_term_storage package

galaxy.web.short_term_storage.now()

Return a new datetime representing UTC day and time.

class galaxy.web.short_term_storage.ShortTermStorageConfiguration(short_term_storage_directory: str, default_storage_duration: Union[int, float, NoneType] = None, maximum_storage_duration: Union[int, float, NoneType] = None)[source]

Bases: object

short_term_storage_directory: str
default_storage_duration: Optional[Union[int, float]] = None
maximum_storage_duration: Optional[Union[int, float]] = None
__init__(short_term_storage_directory: str, default_storage_duration: Optional[Union[int, float]] = None, maximum_storage_duration: Optional[Union[int, float]] = None) None
class galaxy.web.short_term_storage.ShortTermStorageTargetSecurity(user_id: Union[int, NoneType] = None, session_id: Union[int, NoneType] = None)[source]

Bases: object

user_id: Optional[int] = None
session_id: Optional[int] = None
to_dict() Dict[str, Optional[int]][source]
classmethod from_dict(as_dict: Dict[str, Optional[int]]) ShortTermStorageTargetSecurity[source]
__init__(user_id: Optional[int] = None, session_id: Optional[int] = None) None
class galaxy.web.short_term_storage.ShortTermStorageTarget(request_id: uuid.UUID, raw_path: str, duration: Union[int, float, NoneType] = None)[source]

Bases: object

request_id: UUID
raw_path: str
duration: Optional[Union[int, float]] = None
property path
__init__(request_id: UUID, raw_path: str, duration: Optional[Union[int, float]] = None) None
class galaxy.web.short_term_storage.ShortTermStorageServeCompletedInformation(target: galaxy.web.short_term_storage.ShortTermStorageTarget, mime_type: str, filename: str, security: galaxy.web.short_term_storage.ShortTermStorageTargetSecurity)[source]

Bases: object

target: ShortTermStorageTarget
mime_type: str
filename: str
security: ShortTermStorageTargetSecurity
__init__(target: ShortTermStorageTarget, mime_type: str, filename: str, security: ShortTermStorageTargetSecurity) None
class galaxy.web.short_term_storage.ShortTermStorageServeCancelledInformation(target: galaxy.web.short_term_storage.ShortTermStorageTarget, status_code: int, exception: Union[Dict[str, Any], NoneType])[source]

Bases: object

target: ShortTermStorageTarget
status_code: int
exception: Optional[Dict[str, Any]]
property message_exception: MessageException
__init__(target: ShortTermStorageTarget, status_code: int, exception: Optional[Dict[str, Any]]) None
class galaxy.web.short_term_storage.ShortTermStorageAllocator[source]

Bases: object

abstract new_target(filename: str, mime_type: str, duration: Optional[int] = None, security: Optional[ShortTermStorageTargetSecurity] = None) ShortTermStorageTarget[source]

Return a new ShortTermStorageTarget for this short term file request.

class galaxy.web.short_term_storage.ShortTermStorageMonitor[source]

Bases: object

abstract is_ready(target: ShortTermStorageTarget) bool[source]

Check if storage is ready.

abstract get_serve_info(target: ShortTermStorageTarget) Union[ShortTermStorageServeCompletedInformation, ShortTermStorageServeCancelledInformation][source]

Get information required to serve this short term storage target.

abstract finalize(target: ShortTermStorageTarget) None[source]

Indicate the file is ready to be served.

abstract cancel(target: ShortTermStorageTarget, exception: Optional[MessageException] = None) None[source]

Store metadata for failed task.

Implementation is responsible for indicating target is finalized as well.

abstract target_path(target: ShortTermStorageTarget) Path[source]

Return fully qualified path on this server for specified target.

abstract cleanup() None[source]

Cleanup old requests.

abstract recover_target(request_id: UUID) ShortTermStorageTarget[source]

Return an existing ShortTermStorageTarget from a specified request_id.

class galaxy.web.short_term_storage.ShortTermStorageManager(config: ShortTermStorageConfiguration)[source]

Bases: ShortTermStorageAllocator, ShortTermStorageMonitor

__init__(config: ShortTermStorageConfiguration)[source]
new_target(filename: str, mime_type: str, duration: Optional[Union[int, float]] = None, security: Optional[ShortTermStorageTargetSecurity] = None) ShortTermStorageTarget[source]

Return a new ShortTermStorageTarget for this short term file request.

recover_target(request_id: UUID) ShortTermStorageTarget[source]

Return an existing ShortTermStorageTarget from a specified request_id.

is_ready(target: ShortTermStorageTarget) bool[source]

Check if storage is ready.

get_serve_info(target: ShortTermStorageTarget) Union[ShortTermStorageServeCompletedInformation, ShortTermStorageServeCancelledInformation][source]

Get information required to serve this short term storage target.

cancel(target: ShortTermStorageTarget, exception: Optional[MessageException] = None)[source]

Write metadata for failed task.

finalize(target: ShortTermStorageTarget) None[source]

Indicate the file is ready to be served.

target_path(target: ShortTermStorageTarget) Path[source]

Return fully qualified path on this server for specified target.

cleanup()[source]

Cleanup old requests.

galaxy.web.short_term_storage.storage_context(short_term_storage_request_id: UUID, short_term_storage_monitor: ShortTermStorageMonitor)[source]