Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web_stack.transport
"""Web application stack operations
"""
import logging
import threading
from galaxy.util import unicodify
try:
import uwsgi
except ImportError:
uwsgi = None
log = logging.getLogger(__name__)
[docs]class ApplicationStackTransport:
SHUTDOWN_MSG = '__SHUTDOWN__'
[docs] def __init__(self, app, stack, dispatcher=None):
""" Pre-fork initialization.
"""
self.app = app
self.stack = stack
self.can_run = False
self.running = False
self.dispatcher = dispatcher
self.dispatcher_thread = None
def _dispatch_messages(self):
pass
[docs] def start_if_needed(self):
# Don't unnecessarily start a thread that we don't need.
if self.can_run and not self.running and not self.dispatcher_thread and self.dispatcher and self.dispatcher.handler_count:
self.running = True
self.dispatcher_thread = threading.Thread(name=self.__class__.__name__ + ".dispatcher_thread", target=self._dispatch_messages)
self.dispatcher_thread.start()
log.info('%s dispatcher started', self.__class__.__name__)
[docs] def stop_if_unneeded(self):
if self.can_run and self.running and self.dispatcher_thread and self.dispatcher and not self.dispatcher.handler_count:
self.running = False
self.dispatcher_thread.join()
self.dispatcher_thread = None
log.info('%s dispatcher stopped', self.__class__.__name__)
[docs] def start(self):
""" Post-fork initialization.
"""
self.can_run = True
self.start_if_needed()
[docs] def shutdown(self):
self.running = False
if self.dispatcher_thread:
log.info('Joining application stack transport dispatcher thread')
self.dispatcher_thread.join()
self.dispatcher_thread = None
[docs]class UWSGIFarmMessageTransport(ApplicationStackTransport):
""" Communication via uWSGI Mule Farm messages. Communication is unidirectional (workers -> mules).
"""
# Define any static lock names here, additional locks will be appended for each configured farm's message handler
_locks = []
[docs] def init_late_prefork(self):
num = int(uwsgi.opt.get('locks', 0)) + 1
need = len(self.stack._lock_farms)
if num < need:
raise RuntimeError('Need %i uWSGI locks but only %i exist(s): Set `locks = %i` in uWSGI configuration' % (need, num, need - 1))
self._locks.extend(['RECV_MSG_FARM_' + x for x in sorted(self.stack._lock_farms)])
# this would be nice, but in my 2.0.15 uWSGI, the uwsgi module has no set_option function, and I don't know if it'd work even if the function existed as documented
# if len(self.lock_map) > 1:
# uwsgi.set_option('locks', len(self.lock_map))
# log.debug('Created %s uWSGI locks' % len(self.lock_map))
[docs] def __init__(self, app, stack, dispatcher=None):
super().__init__(app, stack, dispatcher=dispatcher)
def __lock(self, name_or_id):
try:
uwsgi.lock(name_or_id)
except TypeError:
uwsgi.lock(self._locks.index(name_or_id))
def __unlock(self, name_or_id):
try:
uwsgi.unlock(name_or_id)
except TypeError:
uwsgi.unlock(self._locks.index(name_or_id))
def _farm_recv_msg_lock_num(self):
return self._locks.index('RECV_MSG_FARM_' + self.stack._farm_name)
def _dispatch_messages(self):
# this could be moved to the base class if locking was abstracted and a get_message method was added
log.info('Application stack message dispatcher thread starting up')
# we are going to do this a lot, so cache the lock number
lock = self._farm_recv_msg_lock_num()
while self.running:
msg = None
self.__lock(lock)
try:
log.debug('Acquired message lock, waiting for new message')
msg = unicodify(uwsgi.farm_get_msg())
log.debug('Received message: %s', msg)
if msg == self.SHUTDOWN_MSG:
self.running = False
else:
self.dispatcher.dispatch(msg)
except Exception:
log.exception('Exception in mule message handling')
finally:
self.__unlock(lock)
log.debug('Released lock')
log.info('Application stack message dispatcher thread exiting')
# TODO: start_if_needed would be called on a web worker by the stack's register_message_handler function if a
# function were registered in a web handler, that should probably be prevented.
[docs] def start(self):
""" Post-fork initialization.
This is mainly done here for the future possibility that we'll be able to run mules post-fork without exec()ing. In a programmed mule it could be done at __init__ time.
"""
if self.stack._is_mule:
if not uwsgi.in_farm():
raise RuntimeError('Mule %s is not in a farm! Set `farm = <pool_name>:%s` in uWSGI configuration'
% (uwsgi.mule_id(),
','.join(map(str, range(1, len([x for x in self.stack._configured_mules if x.endswith('galaxy/main.py')]) + 1)))))
elif len(self.stack._farms) > 1:
raise RuntimeError('Mule %s is in multiple farms! This configuration is not supported due to locking issues' % uwsgi.mule_id())
# only mules receive messages so don't bother starting the dispatcher if we're not a mule (although
# currently it doesn't have any registered handlers and so wouldn't start anyway)
super().start()
[docs] def send_message(self, msg, dest):
log.debug('Sending message to farm %s: %s', dest, msg)
uwsgi.farm_msg(dest, msg)