Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.database_heartbeat
import datetime
import logging
import os
import socket
import threading
from galaxy.model import WorkerProcess
from galaxy.model.orm.now import now
log = logging.getLogger(__name__)
[docs]class DatabaseHeartbeat(object):
[docs] def __init__(self, application_stack, heartbeat_interval=60):
self.application_stack = application_stack
self.heartbeat_interval = heartbeat_interval
self.hostname = socket.gethostname()
self._is_config_watcher = False
self._observers = []
self.exit = threading.Event()
self.thread = None
self.active = False
self.pid = None
@property
def sa_session(self):
return self.application_stack.app.model.context
@property
def server_name(self):
# Application stack manipulates server name after forking
return self.application_stack.app.config.server_name
[docs] def start(self):
if not self.active:
self.thread = threading.Thread(target=self.send_database_heartbeat, name="database_heartbeart_%s.thread" % self.server_name)
self.thread.daemon = True
self.active = True
self.thread.start()
self.pid = os.getpid()
[docs] def shutdown(self):
self.active = False
self.exit.set()
if self.thread:
self.thread.join()
worker_process = self.worker_process
if worker_process:
self.sa_session.delete(worker_process)
self.sa_session.flush()
self.application_stack.app.queue_worker.send_control_task('reconfigure_watcher', noop_self=True)
[docs] def get_active_processes(self, last_seen_seconds=None):
"""Return all processes seen in ``last_seen_seconds`` seconds."""
if last_seen_seconds is None:
last_seen_seconds = self.heartbeat_interval
seconds_ago = now() - datetime.timedelta(seconds=last_seen_seconds)
return self.sa_session.query(WorkerProcess).filter(WorkerProcess.table.c.update_time > seconds_ago).all()
@property
def is_config_watcher(self):
return self._is_config_watcher
@is_config_watcher.setter
def is_config_watcher(self, value):
self._is_config_watcher = value
log.debug('%s %s config watcher', self.server_name, 'is' if self.is_config_watcher else 'is not')
for callback in self._observers:
callback(self._is_config_watcher)
@property
def worker_process(self):
return self.sa_session.query(WorkerProcess).with_for_update(of=WorkerProcess).filter_by(
server_name=self.server_name,
hostname=self.hostname,
).first()
[docs] def update_watcher_designation(self):
worker_process = self.worker_process
if not worker_process:
worker_process = WorkerProcess(server_name=self.server_name, hostname=self.hostname)
worker_process.update_time = now()
worker_process.pid = self.pid
self.sa_session.add(worker_process)
self.sa_session.flush()
# We only want a single process watching the various config files on the file system.
# We just pick the max server name for simplicity
is_config_watcher = self.server_name == max(
(p.server_name for p in self.get_active_processes(self.heartbeat_interval + 1)))
if is_config_watcher != self.is_config_watcher:
self.is_config_watcher = is_config_watcher
[docs] def send_database_heartbeat(self):
if self.active:
while not self.exit.isSet():
self.update_watcher_designation()
self.exit.wait(self.heartbeat_interval)