Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web_stack

"""Web application stack operations."""

import inspect
import json
import logging
import multiprocessing
import os
import sys
import threading
from typing import (
    Callable,
    Dict,
    FrozenSet,
    List,
    Optional,
    Tuple,
    Type,
)
from urllib.request import install_opener

# The uwsgi module is automatically injected by the parent uwsgi process and only exists that way.  If anything works,
# this is a uwsgi-managed process.
try:
    import uwsgi
except ImportError:
    uwsgi = None

import yaml

from galaxy.util import unicodify
from galaxy.util.facts import get_facts
from galaxy.util.path import has_ext
from galaxy.util.properties import nice_config_parser
from .handlers import HANDLER_ASSIGNMENT_METHODS
from .message import ApplicationStackMessage, ApplicationStackMessageDispatcher
from .transport import ApplicationStackTransport, UWSGIFarmMessageTransport


log = logging.getLogger(__name__)


[docs]class ApplicationStackLogFilter(logging.Filter):
[docs] def filter(self, record): record.worker_id = None record.mule_id = None return True
[docs]class UWSGILogFilter(logging.Filter):
[docs] def filter(self, record): record.worker_id = uwsgi.worker_id() record.mule_id = uwsgi.mule_id() return True
[docs]class ApplicationStack: name: Optional[str] = None prohibited_middleware: FrozenSet[str] = frozenset() transport_class = ApplicationStackTransport log_filter_class: Type[logging.Filter] = ApplicationStackLogFilter log_format = '%(name)s %(levelname)s %(asctime)s [pN:%(processName)s,p:%(process)d,tN:%(threadName)s] %(message)s' # TODO: this belongs in the pool configuration server_name_template = '{server_name}' default_app_name = 'main'
[docs] @classmethod def log_filter(cls): return cls.log_filter_class()
[docs] @classmethod def get_app_kwds(cls, config_section, app_name=None, for_paste_app=False): return {}
[docs] @classmethod def register_postfork_function(cls, f, *args, **kwargs): f(*args, **kwargs)
[docs] def __init__(self, app=None, config=None): self.app = app self.config = config or (app and app.config) self.running = False self._supports_returning = None self._supports_skip_locked = None self._preferred_handler_assignment_method = None multiprocessing.current_process().name = getattr(self.config, 'server_name', 'main') if app: log.debug("%s initialized", self.__class__.__name__)
[docs] def supports_returning(self): if self._supports_returning is None: job_table = self.app.model.Job.__table__ stmt = job_table.update().where(job_table.c.id == -1).returning(job_table.c.id) try: self.app.model.session.execute(stmt) self._supports_returning = True except Exception: self._supports_returning = False return self._supports_returning
[docs] def supports_skip_locked(self): if self._supports_skip_locked is None: job_table = self.app.model.Job.__table__ stmt = job_table.select().where(job_table.c.id == -1).with_for_update(skip_locked=True) try: self.app.model.session.execute(stmt) self._supports_skip_locked = True except Exception: self._supports_skip_locked = False return self._supports_skip_locked
[docs] def get_preferred_handler_assignment_method(self): if self._preferred_handler_assignment_method is None: if self.app.application_stack.supports_skip_locked(): self._preferred_handler_assignment_method = HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED else: log.debug("Database does not support WITH FOR UPDATE statement, cannot use DB-SKIP-LOCKED handler assignment") self._preferred_handler_assignment_method = HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION return self._preferred_handler_assignment_method
def _set_default_job_handler_assignment_methods(self, job_config, base_pool): """Override in subclasses to set default job handler assignment methods if not explicitly configured by the administrator. Called once per job_config. """ def _init_job_handler_assignment_methods(self, job_config, base_pool): if not job_config.handler_assignment_methods_configured: self._set_default_job_handler_assignment_methods(job_config, base_pool) def _init_job_handler_subpools(self, job_config, base_pool): """Set up members of "subpools" ("base_pool.*") as handlers (including the base pool itself, if it exists). """ for pool_name in self.configured_pools: if pool_name == base_pool: tag = job_config.DEFAULT_HANDLER_TAG elif pool_name.startswith(f"{base_pool}."): tag = pool_name.replace(f"{base_pool}.", '', 1) else: continue # Pools are hierarchical (so that you can have e.g. workflow schedulers use the job handlers pool if no # workflow schedulers pool exists), so if a pool for a given tag has already been found higher in the # hierarchy, don't add members from a pool lower in the hierarchy. if tag not in job_config.pool_for_tag: if self.in_pool(pool_name): job_config.is_handler = True for handler in self.pool_members(pool_name): job_config.add_handler(handler, [tag]) job_config.pool_for_tag[tag] = pool_name
[docs] def init_job_handling(self, job_config): """Automatically add pools as handlers if they are named per predefined names and there is not an explicit job handler assignment configuration. Also automatically set the preferred assignment method if pool handlers are found and an assignment method is not explicitly configured by the administrator. """ stack_assignment_methods_configured = False for base_pool in job_config.DEFAULT_BASE_HANDLER_POOLS: if self.has_base_pool(base_pool): if not stack_assignment_methods_configured: self._init_job_handler_assignment_methods(job_config, base_pool) stack_assignment_methods_configured = True self._init_job_handler_subpools(job_config, base_pool)
[docs] def init_late_prefork(self): pass
[docs] def log_startup(self): log.info(f"Galaxy server instance '{self.config.server_name}' is running")
[docs] def start(self): # TODO: with a stack config the pools could be parsed here pass
[docs] def allowed_middleware(self, middleware): if hasattr(middleware, '__name__'): middleware = middleware.__name__ return middleware not in self.prohibited_middleware
[docs] def workers(self): return []
@property def pool_name(self): # TODO: ideally jobs would be mappable to handlers by pool name return None @property def configured_pools(self): return {}
[docs] def has_base_pool(self, pool_name): return self.has_pool(pool_name) or any(pool.startswith(f"{pool_name}.") for pool in self.configured_pools)
[docs] def has_pool(self, pool_name): return pool_name in self.configured_pools
[docs] def in_pool(self, pool_name): return False
[docs] def pool_members(self, pool_name): return None
@property def facts(self): facts = get_facts(config=self.config) facts.update({'pool_name': self.pool_name}) return facts
[docs] def set_postfork_server_name(self, app): new_server_name = self.server_name_template.format(**self.facts) if "GUNICORN_WORKER_ID" in os.environ: new_server_name = f"{new_server_name}.{os.environ['GUNICORN_WORKER_ID']}" multiprocessing.current_process().name = app.config.server_name = new_server_name log.debug('server_name set to: %s', new_server_name)
[docs] def register_message_handler(self, func, name=None): pass
[docs] def deregister_message_handler(self, func=None, name=None): pass
[docs] def send_message(self, dest, msg=None, target=None, params=None, **kwargs): pass
[docs] def shutdown(self): pass
[docs]class MessageApplicationStack(ApplicationStack):
[docs] def __init__(self, app=None, config=None): super().__init__(app=app, config=config) self.use_messaging = False self.dispatcher = ApplicationStackMessageDispatcher() self.transport = self.transport_class(app, stack=self, dispatcher=self.dispatcher)
[docs] def init_late_prefork(self): self.transport.init_late_prefork()
[docs] def start(self): super().start() if self.use_messaging and not self.running: self.transport.start() self.running = True
[docs] def register_message_handler(self, func, name=None): self.dispatcher.register_func(func, name) self.transport.start_if_needed()
[docs] def deregister_message_handler(self, func=None, name=None): self.dispatcher.deregister_func(func, name) self.transport.stop_if_unneeded()
[docs] def send_message(self, dest, msg=None, target=None, params=None, **kwargs): assert msg is not None or target is not None, "Either 'msg' or 'target' parameters must be set" if not msg: msg = ApplicationStackMessage( target=target, params=params, **kwargs ) self.transport.send_message(msg.encode(), dest)
[docs] def shutdown(self): if self.use_messaging and self.running: log.info('Application stack interface shutting down') self.transport.shutdown() self.running = False
[docs]class UWSGIApplicationStack(MessageApplicationStack): """Interface to the uWSGI application stack. Supports running additional webless Galaxy workers as mules. Mules must be farmed to be communicable via uWSGI mule messaging, unfarmed mules are not supported. Note that mules will use this as their stack class even though they start with the "webless" loading point. """ name = 'uWSGI' prohibited_middleware = frozenset({ 'build_url_map', 'EvalException', }) transport_class = UWSGIFarmMessageTransport log_filter_class = UWSGILogFilter log_format = '%(name)s %(levelname)s %(asctime)s [pN:%(processName)s,p:%(process)d,w:%(worker_id)s,m:%(mule_id)s,tN:%(threadName)s] %(message)s' server_name_template = '{server_name}.{pool_name}.{instance_id}' postfork_functions: List[Tuple[Callable, List, Dict]] = [] localhost_addrs = ('127.0.0.1', '[::1]') bind_all_addrs = ('', '0.0.0.0', '[::]') @staticmethod def _get_config_file(confs, loader, section): """uWSGI allows config merging, in which case the corresponding config file option will be a list. """ conf = None if isinstance(confs, list): gconfs = [_ for _ in confs if os.path.exists(_) and section in loader(open(_))] if len(gconfs) == 1: conf = gconfs[0] elif len(gconfs) == 0: log.warning('Could not locate a config file containing a Galaxy config from: %s', ', '.join(confs)) else: log.warning('Multiple config files contain Galaxy configs, merging is not supported: %s', ', '.join(gconfs)) else: conf = confs return conf @staticmethod def _socket_opt_to_str(opt, val): try: opt = unicodify(opt) val = unicodify(val) if val.startswith('='): val = unicodify(uwsgi.opt.get('shared-socket', [])[int(val.split('=')[1])]) proto = opt if opt != 'socket' else 'uwsgi' if proto == 'uwsgi' and ':' not in val: return f"uwsgi://{val}" else: proto = f"{proto}://" host, port = val.rsplit(':', 1) port = f":{port.split(',', 1)[0]}" if host in UWSGIApplicationStack.bind_all_addrs: host = UWSGIApplicationStack.localhost_addrs[0] return proto + host + port except (IndexError, AttributeError): return f'{opt} {val}' @staticmethod def _socket_opts(): for opt in ('https', 'http', 'socket'): if opt in uwsgi.opt: val = uwsgi.opt[opt] if isinstance(val, list): for v in val: yield (opt, v) else: yield (opt, val) @staticmethod def _serving_on(): for opt, val in UWSGIApplicationStack._socket_opts(): yield UWSGIApplicationStack._socket_opt_to_str(opt, val)
[docs] @classmethod def get_app_kwds(cls, config_section, app_name=None): kwds = { 'config_file': None, 'config_section': config_section, } uwsgi_opt = uwsgi.opt config_file = None # check for --set galaxy_config_file=<path>, this overrides whatever config file uWSGI was loaded with (which # may not actually include a Galaxy config) if uwsgi_opt.get("galaxy_config_file"): config_file = uwsgi_opt.get("galaxy_config_file") # check for --yaml or --json uWSGI config options next if config_file is None: config_file = (UWSGIApplicationStack._get_config_file(uwsgi_opt.get("yaml"), yaml.safe_load, config_section) or UWSGIApplicationStack._get_config_file(uwsgi_opt.get("json"), json.load, config_section)) # --ini and --ini-paste don't behave the same way, but this method will only be called by mules if the main # application was loaded with --ini-paste, so we can make some assumptions, most notably, uWSGI does not have # any way to set the app name when loading with paste.deploy:loadapp(), so hardcoding the alternate section # name to `app:main` is fine. has_ini_config = config_file is None and uwsgi_opt.get("ini") or uwsgi_opt.get("ini-paste") has_ini_config = has_ini_config or (config_file and has_ext(config_file, "ini", aliases=True, ignore="sample")) if has_ini_config: config_file = config_file or uwsgi_opt.get("ini") or uwsgi_opt.get("ini-paste") parser = nice_config_parser(config_file) if not parser.has_section(config_section) and parser.has_section('app:main'): kwds['config_section'] = 'app:main' kwds['config_file'] = unicodify(config_file) return kwds
[docs] @classmethod def register_postfork_function(cls, f, *args, **kwargs): if uwsgi.mule_id() == 0: cls.postfork_functions.append((f, args, kwargs)) else: # mules are forked from the master and run the master's postfork functions immediately before the forked # process is replaced. that is prevented in the _do_uwsgi_postfork function, and because programmed mules # are standalone non-forking processes, they should run postfork functions immediately f(*args, **kwargs)
[docs] def __init__(self, app=None, config=None): self._farms_dict = None self._mules_list = None self._farm_server_names_dict = None # If there's more than one farm *and* more than one farm will be using farm messaging, additional locks need to # be configured by the admin. This allows us to keep track of how many such farms are configured. self._lock_farms = set() super().__init__(app=app, config=config)
def _set_default_job_handler_assignment_methods(self, job_config, base_pool): # Disable DB_SELF if a valid farm (pool) is configured. Use mule messaging unless the job_config doesn't allow # it (e.g. workflow scheduling manager), in which case, use DB-SKIP-LOCKED or DB-TRANSACTION-ISOLATION. # # TODO MULTIPOOL: if there is no default in any base_pool (and no job_config.default_handler_id) then don't # remove DB_SELF conf_class_name = job_config.__class__.__name__ remove_methods = [HANDLER_ASSIGNMENT_METHODS.DB_SELF] if (HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE not in job_config.UNSUPPORTED_HANDLER_ASSIGNMENT_METHODS): add_method = HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE else: add_method = self.get_preferred_handler_assignment_method() remove_methods.append(HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE) log.debug("%s: No job handler assignment methods were configured but a uWSGI farm named '%s' exists," " automatically enabling the '%s' assignment method", conf_class_name, base_pool, add_method) for m in remove_methods: try: job_config.handler_assignment_methods.remove(m) log.debug("%s: Removed '%s' from handler assignment methods due to use of mules", conf_class_name, m) except ValueError: pass if add_method not in job_config.handler_assignment_methods: job_config.handler_assignment_methods.insert(0, add_method) log.debug("%s: handler assignment methods updated to: %s", conf_class_name, ', '.join(job_config.handler_assignment_methods)) def _init_job_handler_assignment_methods(self, job_config, base_pool): super()._init_job_handler_assignment_methods(job_config, base_pool) # Determine if stack messaging should be enabled if HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE in job_config.handler_assignment_methods: self.use_messaging = True def _init_job_handler_subpools(self, job_config, base_pool): super()._init_job_handler_subpools(job_config, base_pool) # Count the required number of uWSGI locks if job_config.use_messaging: for pool_name in self.configured_pools: if (pool_name == base_pool or pool_name.startswith(f"{base_pool}.")): self._lock_farms.add(pool_name) @property def _configured_mules(self): if self._mules_list is None: self._mules_list = _uwsgi_configured_mules() return self._mules_list @property def _is_mule(self): return uwsgi.mule_id() > 0 @property def configured_pools(self): if self._farms_dict is None: self._farms_dict = {} farms = uwsgi.opt.get('farm', []) farms = farms if isinstance(farms, list) else [farms] for farm in farms: farm = unicodify(farm) name, mules = farm.split(':', 1) self._farms_dict[name] = [int(m) for m in mules.split(',')] return self._farms_dict @property def _farms(self): farms = [] for farm, mules in self.configured_pools.items(): if uwsgi.mule_id() in mules: farms.append(farm) return farms def _mule_index_in_farm(self, farm_name, mule_id=None): mule_id = mule_id or uwsgi.mule_id() try: mules = self.configured_pools[farm_name] return mules.index(mule_id) except (KeyError, ValueError): return -1 @property def _farm_name(self): # TODO: to allow mules to be in multiple farms you'll need to start here try: return self._farms[0] except IndexError: return None @property def _farm_server_names(self): if self._farm_server_names_dict is None: self._farm_server_names_dict = {} for farm_name, mules in self.configured_pools.items(): server_names = [] facts = self.facts for mule in mules: facts.update({ 'pool_name': farm_name, 'server_id': mule, 'instance_id': self._mule_index_in_farm(farm_name, mule) + 1, }) server_names.append(self.server_name_template.format(**facts)) self._farm_server_names_dict[farm_name] = server_names return self._farm_server_names_dict @property def instance_id(self): if not self._is_mule: instance_id = uwsgi.worker_id() elif self._farm_name: return self._mule_index_in_farm(self._farm_name) + 1 else: instance_id = uwsgi.mule_id() return instance_id
[docs] def log_startup(self): msg = [f"Galaxy server instance '{self.config.server_name}' is running"] # Log the next messages when the first worker finishes starting. This # may not be the first to finish (so Galaxy could be serving already), # but it's a good approximation and gives the correct root_pid below # when there is no master process. if not self._is_mule and self.instance_id == 1: # We use the same text printed by Paste to not break scripts # grepping for this line. Here root_pid is the same that gets # written to file when using the --pidfile option of uwsgi root_pid = uwsgi.masterpid() or os.getpid() msg.append('Starting server in PID %d.' % root_pid) for s in UWSGIApplicationStack._serving_on(): msg.append(f"serving on {s}") if len(msg) == 1: msg.append('serving on unknown URL') log.info('\n'.join(msg))
[docs] def start(self): # Does a generalized `is_worker` attribute make sense? Hard to say w/o other stack paradigms. if self._is_mule and self._farm_name: # used by main.py to send a shutdown message on termination os.environ['_GALAXY_UWSGI_FARM_NAMES'] = ','.join(self._farms) super().start()
[docs] def in_pool(self, pool_name): if not self._is_mule: return False else: return pool_name in self._farms
[docs] def pool_members(self, pool_name): return self._farm_server_names.get(pool_name, None)
[docs] def workers(self): return uwsgi.workers()
@property def facts(self): facts = super().facts if not self._is_mule: facts.update({ 'pool_name': 'web', 'server_id': uwsgi.worker_id(), }) else: facts.update({ 'pool_name': self._farm_name, 'server_id': uwsgi.mule_id(), }) facts['instance_id'] = self.instance_id return facts
[docs] def shutdown(self): super().shutdown()
[docs]class PasteApplicationStack(ApplicationStack): name = 'Python Paste'
[docs]class GunicornApplicationStack(ApplicationStack): name = "Gunicorn" do_post_fork = "--preload" in os.environ.get("GUNICORN_CMD_ARGS", "") or "--preload" in sys.argv postfork_functions: List[Callable] = [] # Will be set to True by external hook late_postfork_event = threading.Event()
[docs] @classmethod def register_postfork_function(cls, f, *args, **kwargs): # do_post_fork determines if we need to run postfork functions if cls.do_post_fork: # if so, we call ApplicationStack.late_postfork once after forking ... if not cls.postfork_functions: os.register_at_fork(after_in_child=cls.late_postfork) # ... and store everything we need to run in ApplicationStack.postfork_functions cls.postfork_functions.append(lambda: f(*args, **kwargs)) else: f(*args, **kwargs)
[docs] @classmethod def run_postfork(cls): cls.late_postfork_event.wait(1) for f in cls.postfork_functions: f()
[docs] @classmethod def late_postfork(cls): # We can't run postfork functions immediately, because this is before the gunicorn `post_fork` hook runs, # and we depend on the `post_fork` hook to set a worker id. t = threading.Thread(target=cls.run_postfork) t.start()
[docs] def log_startup(self): msg = [f"Galaxy server instance '{self.config.server_name}' is running"] if "GUNICORN_LISTENERS" in os.environ: msg.append(f'serving on {os.environ["GUNICORN_LISTENERS"]}') log.info("\n".join(msg))
[docs]class WeblessApplicationStack(ApplicationStack): name = 'Webless' def _set_default_job_handler_assignment_methods(self, job_config, base_pool): # We will only get here if --attach-to-pool has been set so it is safe to assume that this handler is dynamic # and that we want to use one of the DB serialization methods. # # Disable DB_SELF if a valid pool is configured. Use DB "SKIP LOCKED" if the DB engine supports it, transaction # isolation if it doesn't, or DB_PREASSIGN if the job_config doesn't allow either. conf_class_name = job_config.__class__.__name__ remove_methods = [HANDLER_ASSIGNMENT_METHODS.DB_SELF] add_method = self.get_preferred_handler_assignment_method() log.debug("%s: No job handler assignment methods were configured but this server is configured to attach to the" " '%s' pool, automatically enabling the '%s' assignment method", conf_class_name, base_pool, add_method) for m in remove_methods: try: job_config.handler_assignment_methods.remove(m) log.debug("%s: Removed '%s' from handler assignment methods due to use of --attach-to-pool", conf_class_name, m) except ValueError: pass if add_method not in job_config.handler_assignment_methods: job_config.handler_assignment_methods.insert(0, add_method) log.debug("%s: handler assignment methods updated to: %s", conf_class_name, ', '.join(job_config.handler_assignment_methods))
[docs] def __init__(self, app=None, config=None): super().__init__(app=app, config=config) if self.app and self.config and self.config.attach_to_pools: log.debug("Will attach to pool(s): %s", ', '.join(self.config.attach_to_pools))
@property def configured_pools(self): return {p: self.config.server_name for p in self.config.attach_to_pools}
[docs] def in_pool(self, pool_name): return pool_name in self.config.attach_to_pools
[docs] def pool_members(self, pool_name): return (self.config.server_name,) if self.in_pool(pool_name) else None
[docs]def application_stack_class() -> Type[ApplicationStack]: """Returns the correct ApplicationStack class for the stack under which this Galaxy process is running. """ if "gunicorn" in os.environ.get("SERVER_SOFTWARE", ""): return GunicornApplicationStack elif uwsgi is not None and hasattr(uwsgi, "numproc"): return UWSGIApplicationStack else: # cleverer ideas welcome for frame in inspect.stack(): if frame[1].endswith(os.path.join('pastescript', 'loadwsgi.py')): return PasteApplicationStack return WeblessApplicationStack
[docs]def application_stack_instance(app=None, config=None) -> ApplicationStack: stack_class = application_stack_class() return stack_class(app=app, config=config)
[docs]def application_stack_log_filter(): return application_stack_class().log_filter_class()
[docs]def application_stack_log_formatter(): return logging.Formatter(fmt=application_stack_class().log_format)
[docs]def register_postfork_function(f, *args, **kwargs): application_stack_class().register_postfork_function(f, *args, **kwargs)
[docs]def get_app_kwds(config_section, app_name=None): return application_stack_class().get_app_kwds(config_section, app_name=app_name)
def _uwsgi_configured_mules(): mules = uwsgi.opt.get('mule', []) return [mules] if isinstance(mules, str) or mules is True else mules def _do_uwsgi_postfork(): for i, mule in enumerate(_uwsgi_configured_mules()): if mule is not True and i + 1 == uwsgi.mule_id(): # mules will inherit the postfork function list and call them immediately upon fork, but programmed mules # should not do that (they will call the postfork functions in-place as they start up after exec()) UWSGIApplicationStack.postfork_functions = [(_mule_fixup, (), {})] for f, args, kwargs in [t for t in UWSGIApplicationStack.postfork_functions]: log.debug('Calling postfork function: %s', f) f(*args, **kwargs) def _mule_fixup(): install_opener(None) if uwsgi: uwsgi.post_fork_hook = _do_uwsgi_postfork