Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.verify.wait
"""Abstraction for waiting on API conditions to become true."""
import time
from typing import Callable, Optional, Union
DEFAULT_POLLING_BACKOFF = 0
DEFAULT_POLLING_DELTA = 0.25
TIMEOUT_MESSAGE_TEMPLATE = "Timed out after {} seconds waiting on {}."
timeout_type = Union[int, float]
[docs]def wait_on(function: Callable, desc: str, timeout: timeout_type, delta: timeout_type = DEFAULT_POLLING_DELTA, polling_backoff: timeout_type = DEFAULT_POLLING_BACKOFF, sleep_: Optional[Callable] = None):
"""Wait for function to return non-None value.
Grow the polling interval (initially ``delta`` defaulting to 0.25 seconds)
incrementally by the supplied ``polling_backoff`` (defaulting to 0).
Throw a TimeoutAssertionError if the supplied timeout is reached without
supplied function ever returning a non-None value.
"""
sleep = sleep_ or time.sleep
total_wait = 0.0
while True:
if total_wait > timeout:
raise TimeoutAssertionError(TIMEOUT_MESSAGE_TEMPLATE.format(total_wait, desc))
value = function()
if value is not None:
return value
total_wait += delta
sleep(delta)
delta += polling_backoff