Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.util.heartbeat
import os
import sys
import threading
import time
import traceback
from six import iteritems
[docs]def get_current_thread_object_dict():
"""
Get a dictionary of all 'Thread' objects created via the threading
module keyed by thread_id. Note that not all interpreter threads
have a thread objects, only the main thread and any created via the
'threading' module. Threads created via the low level 'thread' module
will not be in the returned dictionary.
HACK: This mucks with the internals of the threading module since that
module does not expose any way to match 'Thread' objects with
intepreter thread identifiers (though it should).
"""
rval = dict()
# Acquire the lock and then union the contents of 'active' and 'limbo'
# threads into the return value.
threading._active_limbo_lock.acquire()
rval.update(threading._active)
rval.update(threading._limbo)
threading._active_limbo_lock.release()
return rval
[docs]class Heartbeat(threading.Thread):
"""
Thread that periodically dumps the state of all threads to a file
"""
[docs] def __init__(self, config, name="Heartbeat Thread", period=20, fname="heartbeat.log"):
threading.Thread.__init__(self, name=name)
self.config = config
self.should_stop = False
self.period = period
self.fname = fname
self.file = None
self.fname_nonsleeping = None
self.file_nonsleeping = None
self.pid = None
self.nonsleeping_heartbeats = {}
# Event to wait on when sleeping, allows us to interrupt for shutdown
self.wait_event = threading.Event()
[docs] def run(self):
self.pid = os.getpid()
self.fname = self.fname.format(
server_name=self.config.server_name,
pid=self.pid
)
fname, ext = os.path.splitext(self.fname)
self.fname_nonsleeping = fname + '.nonsleeping' + ext
wait = self.period
if self.period <= 0:
wait = 60
while not self.should_stop:
if self.period > 0:
self.dump()
self.wait_event.wait(wait)
[docs] def open_logs(self):
if self.file is None or self.file.closed:
self.file = open(self.fname, "a")
self.file_nonsleeping = open(self.fname_nonsleeping, "a")
self.file.write("Heartbeat for pid %d thread started at %s\n\n" % (self.pid, time.asctime()))
self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread started at %s\n\n" % (self.pid, time.asctime()))
[docs] def close_logs(self):
if self.file is not None and not self.file.closed:
self.file.write("Heartbeat for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime()))
self.file_nonsleeping.write("Non-Sleeping-threads for pid %d thread stopped at %s\n\n" % (self.pid, time.asctime()))
self.file.close()
self.file_nonsleeping.close()
[docs] def dump(self):
self.open_logs()
try:
# Print separator with timestamp
self.file.write("Traceback dump for all threads at %s:\n\n" % time.asctime())
# Print the thread states
threads = get_current_thread_object_dict()
for thread_id, frame in iteritems(sys._current_frames()):
if thread_id in threads:
object = repr(threads[thread_id])
else:
object = "<No Thread object>"
self.file.write("Thread %s, %s:\n\n" % (thread_id, object))
traceback.print_stack(frame, file=self.file)
self.file.write("\n")
self.file.write("End dump\n\n")
self.file.flush()
self.print_nonsleeping(threads)
except Exception:
self.file.write("Caught exception attempting to dump thread states:")
traceback.print_exc(None, self.file)
self.file.write("\n")
[docs] def shutdown(self):
self.should_stop = True
self.wait_event.set()
self.close_logs()
self.join()
[docs] def thread_is_sleeping(self, last_stack_frame):
"""
Returns True if the given stack-frame represents a known
sleeper function (at least in python 2.5)
"""
_filename = last_stack_frame[0]
# _line = last_stack_frame[1]
_funcname = last_stack_frame[2]
_text = last_stack_frame[3]
# Ugly hack to tell if a thread is supposedly sleeping or not
# These are the most common sleeping functions I've found.
# Is there a better way? (python interpreter internals?)
# Tested only with python 2.5
if _funcname == "wait" and _text == "waiter.acquire()":
return True
if _funcname == "wait" and _text == "_sleep(delay)":
return True
if _funcname == "accept" and _text[-14:] == "_sock.accept()":
return True
if _funcname in ("monitor", "__monitor", "app_loop", "check") \
and _text.startswith("time.sleep(") and _text.endswith(")"):
return True
if _funcname == "drain_events" and _text == "sleep(polling_interval)":
return True
# Ugly hack: always skip the heartbeat thread
# TODO: get the current thread-id in python
# skip heartbeat thread by thread-id, not by filename
if _filename.find("/lib/galaxy/util/heartbeat.py") != -1:
return True
# By default, assume the thread is not sleeping
return False
[docs] def get_interesting_stack_frame(self, stack_frames):
"""
Scans a given backtrace stack frames, returns a single
quadraple of [filename, line, function-name, text] of
the single, deepest, most interesting frame.
Interesting being::
inside the galaxy source code ("/lib/galaxy"),
prefreably not an egg.
"""
for _filename, _line, _funcname, _text in reversed(stack_frames):
idx = _filename.find("/lib/galaxy/")
if idx != -1:
relative_filename = _filename[idx:]
return (relative_filename, _line, _funcname, _text)
# no "/lib/galaxy" code found, return the innermost frame
return stack_frames[-1]
[docs] def print_nonsleeping(self, threads_object_dict):
self.file_nonsleeping.write("Non-Sleeping threads at %s:\n\n" % time.asctime())
all_threads_are_sleeping = True
threads = get_current_thread_object_dict()
for thread_id, frame in iteritems(sys._current_frames()):
if thread_id in threads:
object = repr(threads[thread_id])
else:
object = "<No Thread object>"
tb = traceback.extract_stack(frame)
if self.thread_is_sleeping(tb[-1]):
if thread_id in self.nonsleeping_heartbeats:
del self.nonsleeping_heartbeats[thread_id]
continue
# Count non-sleeping thread heartbeats
if thread_id in self.nonsleeping_heartbeats:
self.nonsleeping_heartbeats[thread_id] += 1
else:
self.nonsleeping_heartbeats[thread_id] = 1
good_frame = self.get_interesting_stack_frame(tb)
self.file_nonsleeping.write("Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s\n" %
(thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3]))
all_threads_are_sleeping = False
if all_threads_are_sleeping:
self.file_nonsleeping.write("All threads are sleeping.\n")
self.file_nonsleeping.write("\n")
self.file_nonsleeping.flush()