Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web.stack.transport

"""Web application stack operations
"""
from __future__ import absolute_import

import logging
import threading

from galaxy.util import unicodify

try:
    import uwsgi
except ImportError:
    uwsgi = None


log = logging.getLogger(__name__)


[docs]class ApplicationStackTransport(object): SHUTDOWN_MSG = '__SHUTDOWN__'
[docs] def __init__(self, app, stack, dispatcher=None): """ Pre-fork initialization. """ self.app = app self.stack = stack self.can_run = False self.running = False self.dispatcher = dispatcher self.dispatcher_thread = None
[docs] def init_late_prefork(self): pass
def _dispatch_messages(self): pass
[docs] def start_if_needed(self): # Don't unnecessarily start a thread that we don't need. if self.can_run and not self.running and not self.dispatcher_thread and self.dispatcher and self.dispatcher.handler_count: self.running = True self.dispatcher_thread = threading.Thread(name=self.__class__.__name__ + ".dispatcher_thread", target=self._dispatch_messages) self.dispatcher_thread.start() log.info('%s dispatcher started', self.__class__.__name__)
[docs] def stop_if_unneeded(self): if self.can_run and self.running and self.dispatcher_thread and self.dispatcher and not self.dispatcher.handler_count: self.running = False self.dispatcher_thread.join() self.dispatcher_thread = None log.info('%s dispatcher stopped', self.__class__.__name__)
[docs] def start(self): """ Post-fork initialization. """ self.can_run = True self.start_if_needed()
[docs] def send_message(self, msg, dest): pass
[docs] def shutdown(self): self.running = False if self.dispatcher_thread: log.info('Joining application stack transport dispatcher thread') self.dispatcher_thread.join() self.dispatcher_thread = None
[docs]class UWSGIFarmMessageTransport(ApplicationStackTransport): """ Communication via uWSGI Mule Farm messages. Communication is unidirectional (workers -> mules). """ # Define any static lock names here, additional locks will be appended for each configured farm's message handler _locks = []
[docs] def init_late_prefork(self): num = int(uwsgi.opt.get('locks', 0)) + 1 need = len(self.stack._lock_farms) if num < need: raise RuntimeError('Need %i uWSGI locks but only %i exist(s): Set `locks = %i` in uWSGI configuration' % (need, num, need - 1)) self._locks.extend(['RECV_MSG_FARM_' + x for x in sorted(self.stack._lock_farms)])
# this would be nice, but in my 2.0.15 uWSGI, the uwsgi module has no set_option function, and I don't know if it'd work even if the function existed as documented # if len(self.lock_map) > 1: # uwsgi.set_option('locks', len(self.lock_map)) # log.debug('Created %s uWSGI locks' % len(self.lock_map))
[docs] def __init__(self, app, stack, dispatcher=None): super(UWSGIFarmMessageTransport, self).__init__(app, stack, dispatcher=dispatcher)
def __lock(self, name_or_id): try: uwsgi.lock(name_or_id) except TypeError: uwsgi.lock(self._locks.index(name_or_id)) def __unlock(self, name_or_id): try: uwsgi.unlock(name_or_id) except TypeError: uwsgi.unlock(self._locks.index(name_or_id)) def _farm_recv_msg_lock_num(self): return self._locks.index('RECV_MSG_FARM_' + self.stack._farm_name) def _dispatch_messages(self): # this could be moved to the base class if locking was abstracted and a get_message method was added log.info('Application stack message dispatcher thread starting up') # we are going to do this a lot, so cache the lock number lock = self._farm_recv_msg_lock_num() while self.running: msg = None self.__lock(lock) try: log.debug('Acquired message lock, waiting for new message') msg = unicodify(uwsgi.farm_get_msg()) log.debug('Received message: %s', msg) if msg == self.SHUTDOWN_MSG: self.running = False else: self.dispatcher.dispatch(msg) except Exception: log.exception('Exception in mule message handling') finally: self.__unlock(lock) log.debug('Released lock') log.info('Application stack message dispatcher thread exiting') # TODO: start_if_needed would be called on a web worker by the stack's register_message_handler function if a # function were registered in a web handler, that should probably be prevented.
[docs] def start(self): """ Post-fork initialization. This is mainly done here for the future possibility that we'll be able to run mules post-fork without exec()ing. In a programmed mule it could be done at __init__ time. """ if self.stack._is_mule: if not uwsgi.in_farm(): raise RuntimeError('Mule %s is not in a farm! Set `farm = <pool_name>:%s` in uWSGI configuration' % (uwsgi.mule_id(), ','.join(map(str, range(1, len([x for x in self.stack._configured_mules if x.endswith('galaxy/main.py')]) + 1))))) elif len(self.stack._farms) > 1: raise RuntimeError('Mule %s is in multiple farms! This configuration is not supported due to locking issues' % uwsgi.mule_id()) # only mules receive messages so don't bother starting the dispatcher if we're not a mule (although # currently it doesn't have any registered handlers and so wouldn't start anyway) super(UWSGIFarmMessageTransport, self).start()
[docs] def shutdown(self): if self.stack._is_mule: super(UWSGIFarmMessageTransport, self).shutdown()
[docs] def send_message(self, msg, dest): log.debug('Sending message to farm %s: %s', dest, msg) uwsgi.farm_msg(dest, msg)