Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.database_heartbeat

import datetime
import logging
import os
import socket
import threading

from galaxy.model import WorkerProcess
from galaxy.model.orm.now import now

log = logging.getLogger(__name__)


[docs]class DatabaseHeartbeat:
[docs] def __init__(self, application_stack, heartbeat_interval=60): self.application_stack = application_stack self.heartbeat_interval = heartbeat_interval self.hostname = socket.gethostname() self._is_config_watcher = False self._observers = [] self.exit = threading.Event() self.thread = None self.active = False self.pid = None
@property def sa_session(self): return self.application_stack.app.model.session @property def server_name(self): # Application stack manipulates server name after forking return self.application_stack.app.config.server_name
[docs] def start(self): if not self.active: self.thread = threading.Thread(target=self.send_database_heartbeat, name=f"database_heartbeart_{self.server_name}.thread") self.thread.daemon = True self.active = True self.thread.start() self.pid = os.getpid()
[docs] def shutdown(self): self.active = False self.exit.set() if self.thread: self.thread.join() worker_process = self.worker_process if worker_process: self.sa_session.delete(worker_process) self.sa_session.flush() self.application_stack.app.queue_worker.send_control_task('reconfigure_watcher', noop_self=True)
[docs] def get_active_processes(self, last_seen_seconds=None): """Return all processes seen in ``last_seen_seconds`` seconds.""" if last_seen_seconds is None: last_seen_seconds = self.heartbeat_interval seconds_ago = now() - datetime.timedelta(seconds=last_seen_seconds) return self.sa_session.query(WorkerProcess).filter(WorkerProcess.update_time > seconds_ago).all()
[docs] def add_change_callback(self, callback): self._observers.append(callback)
@property def is_config_watcher(self): return self._is_config_watcher @is_config_watcher.setter def is_config_watcher(self, value): self._is_config_watcher = value log.debug('%s %s config watcher', self.server_name, 'is' if self.is_config_watcher else 'is not') for callback in self._observers: callback(self._is_config_watcher) @property def worker_process(self): return self.sa_session.query(WorkerProcess).with_for_update(of=WorkerProcess).filter_by( server_name=self.server_name, hostname=self.hostname, ).first()
[docs] def update_watcher_designation(self): worker_process = self.worker_process if not worker_process: worker_process = WorkerProcess(server_name=self.server_name, hostname=self.hostname) worker_process.update_time = now() worker_process.pid = self.pid self.sa_session.add(worker_process) self.sa_session.flush() # We only want a single process watching the various config files on the file system. # We just pick the max server name for simplicity is_config_watcher = self.server_name == max( p.server_name for p in self.get_active_processes(self.heartbeat_interval + 1)) if is_config_watcher != self.is_config_watcher: self.is_config_watcher = is_config_watcher
[docs] def send_database_heartbeat(self): if self.active: while not self.exit.isSet(): self.update_watcher_designation() self.exit.wait(self.heartbeat_interval)