Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.database_heartbeat
import datetime
import logging
import os
import socket
import threading
from galaxy.model import WorkerProcess
from galaxy.model.orm.now import now
log = logging.getLogger(__name__)
[docs]class DatabaseHeartbeat:
[docs] def __init__(self, application_stack, heartbeat_interval=60):
self.application_stack = application_stack
self.heartbeat_interval = heartbeat_interval
self.hostname = socket.gethostname()
self._is_config_watcher = False
self._observers = []
self.exit = threading.Event()
self.thread = None
self.active = False
self.pid = None
@property
def sa_session(self):
return self.application_stack.app.model.session
@property
def server_name(self):
# Application stack manipulates server name after forking
return self.application_stack.app.config.server_name
[docs] def start(self):
if not self.active:
self.thread = threading.Thread(
target=self.send_database_heartbeat, name=f"database_heartbeart_{self.server_name}.thread"
)
self.thread.daemon = True
self.active = True
self.thread.start()
self.pid = os.getpid()
[docs] def shutdown(self):
self.active = False
self.exit.set()
if self.thread:
self.thread.join()
worker_process = self.worker_process
if worker_process:
self.sa_session.delete(worker_process)
self.sa_session.flush()
self.application_stack.app.queue_worker.send_control_task("reconfigure_watcher", noop_self=True)
[docs] def get_active_processes(self, last_seen_seconds=None):
"""Return all processes seen in ``last_seen_seconds`` seconds."""
if last_seen_seconds is None:
last_seen_seconds = self.heartbeat_interval
seconds_ago = now() - datetime.timedelta(seconds=last_seen_seconds)
return self.sa_session.query(WorkerProcess).filter(WorkerProcess.update_time > seconds_ago).all()
@property
def is_config_watcher(self):
return self._is_config_watcher
@is_config_watcher.setter
def is_config_watcher(self, value):
self._is_config_watcher = value
log.debug("%s %s config watcher", self.server_name, "is" if self.is_config_watcher else "is not")
for callback in self._observers:
callback(self._is_config_watcher)
@property
def worker_process(self):
return (
self.sa_session.query(WorkerProcess)
.with_for_update(of=WorkerProcess)
.filter_by(
server_name=self.server_name,
hostname=self.hostname,
)
.first()
)
[docs] def update_watcher_designation(self):
worker_process = self.worker_process
if not worker_process:
worker_process = WorkerProcess(server_name=self.server_name, hostname=self.hostname)
worker_process.update_time = now()
worker_process.pid = self.pid
self.sa_session.add(worker_process)
self.sa_session.flush()
# We only want a single process watching the various config files on the file system.
# We just pick the max server name for simplicity
is_config_watcher = self.server_name == max(
p.server_name for p in self.get_active_processes(self.heartbeat_interval + 1)
)
if is_config_watcher != self.is_config_watcher:
self.is_config_watcher = is_config_watcher
[docs] def send_database_heartbeat(self):
if self.active:
while not self.exit.is_set():
self.update_watcher_designation()
self.exit.wait(self.heartbeat_interval)