Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web.stack.transport
"""Web application stack operations
"""
from __future__ import absolute_import
import logging
import threading
from galaxy.util import unicodify
try:
import uwsgi
except ImportError:
uwsgi = None
log = logging.getLogger(__name__)
[docs]class ApplicationStackTransport(object):
SHUTDOWN_MSG = '__SHUTDOWN__'
[docs] def __init__(self, app, stack, dispatcher=None):
""" Pre-fork initialization.
"""
self.app = app
self.stack = stack
self.can_run = False
self.running = False
self.dispatcher = dispatcher
self.dispatcher_thread = None
def _dispatch_messages(self):
pass
[docs] def start_if_needed(self):
# Don't unnecessarily start a thread that we don't need.
if self.can_run and not self.running and not self.dispatcher_thread and self.dispatcher and self.dispatcher.handler_count:
self.running = True
self.dispatcher_thread = threading.Thread(name=self.__class__.__name__ + ".dispatcher_thread", target=self._dispatch_messages)
self.dispatcher_thread.start()
log.info('%s dispatcher started', self.__class__.__name__)
[docs] def stop_if_unneeded(self):
if self.can_run and self.running and self.dispatcher_thread and self.dispatcher and not self.dispatcher.handler_count:
self.running = False
self.dispatcher_thread.join()
self.dispatcher_thread = None
log.info('%s dispatcher stopped', self.__class__.__name__)
[docs] def start(self):
""" Post-fork initialization.
"""
self.can_run = True
self.start_if_needed()
[docs] def shutdown(self):
self.running = False
if self.dispatcher_thread:
log.info('Joining application stack transport dispatcher thread')
self.dispatcher_thread.join()
self.dispatcher_thread = None
[docs]class UWSGIFarmMessageTransport(ApplicationStackTransport):
""" Communication via uWSGI Mule Farm messages. Communication is unidirectional (workers -> mules).
"""
# Define any static lock names here, additional locks will be appended for each configured farm's message handler
_locks = []
[docs] def init_late_prefork(self):
num = int(uwsgi.opt.get('locks', 0)) + 1
need = len(self.stack._lock_farms)
if num < need:
raise RuntimeError('Need %i uWSGI locks but only %i exist(s): Set `locks = %i` in uWSGI configuration' % (need, num, need - 1))
self._locks.extend(['RECV_MSG_FARM_' + x for x in sorted(self.stack._lock_farms)])
# this would be nice, but in my 2.0.15 uWSGI, the uwsgi module has no set_option function, and I don't know if it'd work even if the function existed as documented
# if len(self.lock_map) > 1:
# uwsgi.set_option('locks', len(self.lock_map))
# log.debug('Created %s uWSGI locks' % len(self.lock_map))
[docs] def __init__(self, app, stack, dispatcher=None):
super(UWSGIFarmMessageTransport, self).__init__(app, stack, dispatcher=dispatcher)
def __lock(self, name_or_id):
try:
uwsgi.lock(name_or_id)
except TypeError:
uwsgi.lock(self._locks.index(name_or_id))
def __unlock(self, name_or_id):
try:
uwsgi.unlock(name_or_id)
except TypeError:
uwsgi.unlock(self._locks.index(name_or_id))
def _farm_recv_msg_lock_num(self):
return self._locks.index('RECV_MSG_FARM_' + self.stack._farm_name)
def _dispatch_messages(self):
# this could be moved to the base class if locking was abstracted and a get_message method was added
log.info('Application stack message dispatcher thread starting up')
# we are going to do this a lot, so cache the lock number
lock = self._farm_recv_msg_lock_num()
while self.running:
msg = None
self.__lock(lock)
try:
log.debug('Acquired message lock, waiting for new message')
msg = unicodify(uwsgi.farm_get_msg())
log.debug('Received message: %s', msg)
if msg == self.SHUTDOWN_MSG:
self.running = False
else:
self.dispatcher.dispatch(msg)
except Exception:
log.exception('Exception in mule message handling')
finally:
self.__unlock(lock)
log.debug('Released lock')
log.info('Application stack message dispatcher thread exiting')
# TODO: start_if_needed would be called on a web worker by the stack's register_message_handler function if a
# function were registered in a web handler, that should probably be prevented.
[docs] def start(self):
""" Post-fork initialization.
This is mainly done here for the future possibility that we'll be able to run mules post-fork without exec()ing. In a programmed mule it could be done at __init__ time.
"""
if self.stack._is_mule:
if not uwsgi.in_farm():
raise RuntimeError('Mule %s is not in a farm! Set `farm = <pool_name>:%s` in uWSGI configuration'
% (uwsgi.mule_id(),
','.join(map(str, range(1, len([x for x in self.stack._configured_mules if x.endswith('galaxy/main.py')]) + 1)))))
elif len(self.stack._farms) > 1:
raise RuntimeError('Mule %s is in multiple farms! This configuration is not supported due to locking issues' % uwsgi.mule_id())
# only mules receive messages so don't bother starting the dispatcher if we're not a mule (although
# currently it doesn't have any registered handlers and so wouldn't start anyway)
super(UWSGIFarmMessageTransport, self).start()
[docs] def shutdown(self):
if self.stack._is_mule:
super(UWSGIFarmMessageTransport, self).shutdown()
[docs] def send_message(self, msg, dest):
log.debug('Sending message to farm %s: %s', dest, msg)
uwsgi.farm_msg(dest, msg)