This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web.stack

"""Web application stack operations
from __future__ import print_function

import inspect
import logging
import os

# The uwsgi module is automatically injected by the parent uwsgi
# process and only exists that way.  If anything works, this is a
# uwsgi-managed process.
    import uwsgi
except ImportError:
    uwsgi = None

    from uwsgidecorators import postfork as uwsgi_postfork
    uwsgi_postfork = lambda x: x    # noqa: E731
    if uwsgi is not None and hasattr(uwsgi, 'numproc'):
        print("WARNING: This is a uwsgi process but the uwsgidecorators library"
              " is unavailable.  This is likely due to using an external (not"
              " in Galaxy's virtualenv) uwsgi and you may experience errors. "
              "HINT:\n  {venv}/bin/pip install uwsgidecorators".format(
                  venv=os.environ.get('VIRTUAL_ENV', '/path/to/venv')))

log = logging.getLogger(__name__)

[docs]class ApplicationStack(object): name = None prohibited_middleware = frozenset()
[docs] @classmethod def register_postfork_function(cls, f, *args, **kwargs): f(*args, **kwargs)
[docs] def workers(self): return []
[docs] def allowed_middleware(self, middleware): if hasattr(middleware, '__name__'): middleware = middleware.__name__ return middleware not in self.prohibited_middleware
[docs] def set_postfork_server_name(self, app): pass
[docs]class UWSGIApplicationStack(ApplicationStack): name = 'uWSGI' prohibited_middleware = frozenset([ 'wrap_in_static', 'EvalException', ]) postfork_functions = []
[docs] @classmethod def register_postfork_function(cls, f, *args, **kwargs): cls.postfork_functions.append((f, args, kwargs))
[docs] def workers(self): return uwsgi.workers()
[docs] def set_postfork_server_name(self, app): app.config.server_name += ".%d" % uwsgi.worker_id()
[docs]class PasteApplicationStack(ApplicationStack): name = 'Python Paste'
[docs]class WeblessApplicationStack(ApplicationStack): name = 'Webless'
[docs]def application_stack_class(): """Returns the correct ApplicationStack class for the stack under which this Galaxy process is running. """ if uwsgi is not None and hasattr(uwsgi, 'numproc'): return UWSGIApplicationStack else: # cleverer ideas welcome for frame in inspect.stack(): if frame[1].endswith(os.path.join('pastescript', 'loadwsgi.py')): return PasteApplicationStack return WeblessApplicationStack
[docs]def application_stack_instance(): stack_class = application_stack_class() return stack_class()
[docs]def register_postfork_function(f, *args, **kwargs): application_stack_class().register_postfork_function(f, *args, **kwargs)
@uwsgi_postfork def _do_postfork(): for f, args, kwargs in [t for t in UWSGIApplicationStack.postfork_functions]: f(*args, **kwargs)