Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.task

import logging
from threading import (
    Event,
    Thread,
)

from galaxy.util import ExecutionTimer

log = logging.getLogger(__name__)


[docs]class IntervalTask:
[docs] def __init__(self, func, name="Periodic task", interval=3600, immediate_start=False, time_execution=False): """ Run an arbitrary function `func` every `interval` seconds. Set `immediate_start` to True to run `func` when task is started. """ self.func = func self.name = name self.interval = interval self.time_execution = time_execution self.immediate_start = immediate_start self.event = Event() self.thread = Thread(target=self.run, name=self.name, daemon=True) self.running = False
[docs] def start(self): self.running = True self.thread.start()
def _exec(self): if self.time_execution: timer = ExecutionTimer() self.func() if self.time_execution: log.debug(f"Executed periodic task {self.name} {timer}")
[docs] def run(self): if self.immediate_start: self._exec() while not self.event.is_set(): self.event.wait(self.interval) if self.running: self._exec()
[docs] def shutdown(self): self.running = False self.event.set() self.thread.join(5)