Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web.stack.transport

"""Web application stack operations
"""
from __future__ import absolute_import

import logging
import sys
import threading

try:
    import uwsgi
except ImportError:
    uwsgi = None


log = logging.getLogger(__name__)


class ApplicationStackTransport(object):
    SHUTDOWN_MSG = '__SHUTDOWN__'

    def __init__(self, app, stack, dispatcher=None):
        """ Pre-fork initialization.
        """
        self.app = app
        self.stack = stack
        self.can_run = False
        self.running = False
        self.dispatcher = dispatcher
        self.dispatcher_thread = None

    def _dispatch_messages(self):
        pass

    def start_if_needed(self):
        # Don't unnecessarily start a thread that we don't need.
        if self.can_run and not self.running and not self.dispatcher_thread and self.dispatcher and self.dispatcher.handler_count:
            self.running = True
            self.dispatcher_thread = threading.Thread(name=self.__class__.__name__ + ".dispatcher_thread", target=self._dispatch_messages)
            self.dispatcher_thread.start()
            log.info('%s dispatcher started', self.__class__.__name__)

    def stop_if_unneeded(self):
        if self.can_run and self.running and self.dispatcher_thread and self.dispatcher and not self.dispatcher.handler_count:
            self.running = False
            self.dispatcher_thread.join()
            self.dispatcher_thread = None
            log.info('%s dispatcher stopped', self.__class__.__name__)

    def start(self):
        """ Post-fork initialization.
        """
        self.can_run = True
        self.start_if_needed()

    def send_message(self, msg, dest):
        pass

    def shutdown(self):
        self.running = False
        if self.dispatcher_thread:
            log.info('Joining application stack transport dispatcher thread')
            self.dispatcher_thread.join()
            self.dispatcher_thread = None


class UWSGIFarmMessageTransport(ApplicationStackTransport):
    """ Communication via uWSGI Mule Farm messages. Communication is unidirectional (workers -> mules).
    """
    # Define any static lock names here, additional locks will be appended for each configured farm's message handler
    _locks = []

    def __initialize_locks(self):
        num = int(uwsgi.opt.get('locks', 0)) + 1
        farms = self.stack._configured_farms.keys()
        need = len(farms)
        if num < need:
            raise RuntimeError('Need %i uWSGI locks but only %i exist(s): Set `locks = %i` in uWSGI configuration' % (need, num, need - 1))
            sys.exit(1)
        self._locks.extend(['RECV_MSG_FARM_' + x for x in farms])
        # this would be nice, but in my 2.0.15 uWSGI, the uwsgi module has no set_option function, and I don't know if it'd work even if the function existed as documented
        # if len(self.lock_map) > 1:
        #     uwsgi.set_option('locks', len(self.lock_map))
        #     log.debug('Created %s uWSGI locks' % len(self.lock_map))

    def __init__(self, app, stack, dispatcher=None):
        super(UWSGIFarmMessageTransport, self).__init__(app, stack, dispatcher=dispatcher)
        self.__initialize_locks()

    def __lock(self, name_or_id):
        try:
            uwsgi.lock(name_or_id)
        except TypeError:
            uwsgi.lock(self._locks.index(name_or_id))

    def __unlock(self, name_or_id):
        try:
            uwsgi.unlock(name_or_id)
        except TypeError:
            uwsgi.unlock(self._locks.index(name_or_id))

    def _farm_recv_msg_lock_num(self):
        return self._locks.index('RECV_MSG_FARM_' + self.stack._farm_name)

    def _dispatch_messages(self):
        # this could be moved to the base class if locking was abstracted and a get_message method was added
        log.info('Application stack message dispatcher thread starting up')
        # we are going to do this a lot, so cache the lock number
        lock = self._farm_recv_msg_lock_num()
        while self.running:
            msg = None
            self.__lock(lock)
            try:
                log.debug('Acquired message lock, waiting for new message')
                msg = uwsgi.farm_get_msg()
                log.debug('Received message: %s', msg)
                if msg == self.SHUTDOWN_MSG:
                    self.running = False
                else:
                    self.dispatcher.dispatch(msg)
            except Exception:
                log.exception('Exception in mule message handling')
            finally:
                self.__unlock(lock)
                log.debug('Released lock')
        log.info('Application stack message dispatcher thread exiting')

    # TODO: start_if_needed would be called on a web worker by the stack's register_message_handler function if a
    # function were registered in a web handler, that should probably be prevented.

    def start(self):
        """ Post-fork initialization.

        This is mainly done here for the future possibility that we'll be able to run mules post-fork without exec()ing. In a programmed mule it could be done at __init__ time.
        """
        if self.stack._is_mule:
            if not uwsgi.in_farm():
                raise RuntimeError('Mule %s is not in a farm! Set `farm = <pool_name>:%s` in uWSGI configuration'
                                   % (uwsgi.mule_id(),
                                      ','.join(map(str, range(1, len([x for x in self.stack._configured_mules if x.endswith('galaxy/main.py')]) + 1)))))
            elif len(self.stack._farms) > 1:
                raise RuntimeError('Mule %s is in multiple farms! This configuration is not supported due to locking issues' % uwsgi.mule_id())
            # only mules receive messages so don't bother starting the dispatcher if we're not a mule (although
            # currently it doesn't have any registered handlers and so wouldn't start anyway)
            super(UWSGIFarmMessageTransport, self).start()

    def shutdown(self):
        if self.stack._is_mule:
            super(UWSGIFarmMessageTransport, self).shutdown()

    def send_message(self, msg, dest):
        log.debug('Sending message to farm %s: %s', dest, msg)
        uwsgi.farm_msg(dest, msg)