This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.jobs.runners.util.retry

import logging
from itertools import count
from time import sleep

log = logging.getLogger(__name__)

DEFAULT_MAX_RETRIES = -1  # By default don't retry.
DEFAULT_CATCH = (Exception,)


[docs]class RetryActionExecutor(object):
[docs] def __init__(self, **kwds): # Use variables that match kombu to keep things consistent across # Pulsar. # http://ask.github.io/kombu/reference/kombu.connection.html#kombu.connection.BrokerConnection.ensure_connection raw_max_retries = kwds.get("max_retries", DEFAULT_MAX_RETRIES) self.max_retries = None if not raw_max_retries else int(raw_max_retries) self.interval_start = float(kwds.get("interval_start", DEFAULT_INTERVAL_START)) self.interval_step = float(kwds.get("interval_step", DEFAULT_INTERVAL_STEP)) self.interval_max = float(kwds.get("interval_max", DEFAULT_INTERVAL_MAX)) self.errback = kwds.get("errback", self.__default_errback) self.catch = kwds.get("catch", DEFAULT_CATCH) self.default_description = kwds.get("description", DEFAULT_DESCRIPTION)
[docs] def execute(self, action, description=None): def on_error(exc, intervals, retries, interval=0): interval = next(intervals) if self.errback: errback_args = [exc, interval] if description is not None: errback_args.append(description) self.errback(exc, interval, description) return interval return _retry_over_time( action, catch=self.catch, max_retries=self.max_retries, interval_start=self.interval_start, interval_step=self.interval_step, interval_max=self.interval_max, errback=on_error, )
def __default_errback(self, exc, interval, description=None): description = description or self.default_description log.info( "Failed to execute %s, retrying in %s seconds.", description, interval, exc_info=True )
# Following functions are derived from Kombu versions @ # https://github.com/celery/kombu/blob/master/kombu/utils/__init__.py # BSD License (https://github.com/celery/kombu/blob/master/LICENSE) def _retry_over_time(fun, catch, args=[], kwargs={}, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30): """Retry the function over and over until max retries is exceeded. For each retry we sleep a for a while before we try again, this interval is increased for every retry until the max seconds is reached. :param fun: The function to try :param catch: Exceptions to catch, can be either tuple or a single exception class. :keyword args: Positional arguments passed on to the function. :keyword kwargs: Keyword arguments passed on to the function. :keyword max_retries: Maximum number of retries before we give up. If this is not set, we will retry forever. :keyword interval_start: How long (in seconds) we start sleeping between retries. :keyword interval_step: By how much the interval is increased for each retry. :keyword interval_max: Maximum number of seconds to sleep between retries. """ retries = 0 interval_range = __fxrange(interval_start, interval_max + interval_start, interval_step, repeatlast=True) for retries in count(): try: return fun(*args, **kwargs) except catch as exc: if max_retries and retries >= max_retries: raise tts = float(errback(exc, interval_range, retries) if errback else next(interval_range)) if tts: sleep(tts) def __fxrange(start=1.0, stop=None, step=1.0, repeatlast=False): cur = start * 1.0 while 1: if not stop or cur <= stop: yield cur cur += step else: if not repeatlast: break yield cur - step