Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.verify.wait
"""Abstraction for waiting on API conditions to become true."""
import time
from typing import (
Callable,
Optional,
Union,
)
DEFAULT_POLLING_BACKOFF = 0
DEFAULT_POLLING_DELTA = 0.25
TIMEOUT_MESSAGE_TEMPLATE = "Timed out after {} seconds waiting on {}."
timeout_type = Union[int, float]
[docs]def wait_on(
function: Callable,
desc: str,
timeout: timeout_type,
delta: timeout_type = DEFAULT_POLLING_DELTA,
polling_backoff: timeout_type = DEFAULT_POLLING_BACKOFF,
sleep_: Optional[Callable] = None,
):
"""Wait for function to return non-None value.
Grow the polling interval (initially ``delta`` defaulting to 0.25 seconds)
incrementally by the supplied ``polling_backoff`` (defaulting to 0).
Throw a TimeoutAssertionError if the supplied timeout is reached without
supplied function ever returning a non-None value.
"""
sleep = sleep_ or time.sleep
total_wait = 0.0
while True:
if total_wait > timeout:
raise TimeoutAssertionError(TIMEOUT_MESSAGE_TEMPLATE.format(total_wait, desc))
value = function()
if value is not None:
return value
total_wait += delta
sleep(delta)
delta += polling_backoff