Source code for galaxy.web_stack.handlers

"""Utilities for dealing with the Galaxy 'handler' process pattern.

A 'handler' is a named Python process running the Galaxy application responsible
for some activity such as queuing up jobs or scheduling workflows.
"""

import logging
import os
import random
from enum import Enum
from typing import Tuple

from sqlalchemy.orm import object_session

from galaxy.exceptions import HandlerAssignmentError
from galaxy.model.base import transaction
from galaxy.util import (
    ExecutionTimer,
    listify,
)

log = logging.getLogger(__name__)


[docs]class HANDLER_ASSIGNMENT_METHODS(str, Enum): MEM_SELF = "mem-self" DB_SELF = "db-self" DB_PREASSIGN = "db-preassign" DB_TRANSACTION_ISOLATION = "db-transaction-isolation" DB_SKIP_LOCKED = "db-skip-locked"
[docs] @classmethod def has_value(cls, value): return value in cls._value2member_map_
[docs]class HandlerAssignmentSkip(Exception): """Exception for handler assignment methods to raise if the next method should be tried."""
[docs]class ConfiguresHandlers: DEFAULT_HANDLER_TAG = "_default_" DEFAULT_BASE_HANDLER_POOLS: Tuple[str, ...] = ()
[docs] def add_handler(self, handler_id, tags): if handler_id not in self.handlers: self.handlers[handler_id] = (handler_id,) for tag in tags: if tag in self.handlers and handler_id not in self.handlers[tag]: self.handlers[tag].append(handler_id) else: self.handlers[tag] = [handler_id]
[docs] @staticmethod def xml_to_dict(config, config_element): handling_config_dict = {} processes = {} handling_config_dict["processes"] = processes # Parse handlers if config_element is not None: for handler in ConfiguresHandlers._findall_with_required(config_element, "handler"): handler_id = handler.get("id") if handler_id in processes: log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id) else: log.debug("Read definition for handler '%s'", handler_id) plugins = [] for plugin in ConfiguresHandlers._findall_with_required(handler, "plugin", ["id"]): plugins.append(plugin.get("id")) tags = [x.strip() for x in handler.get("tags", ConfiguresHandlers.DEFAULT_HANDLER_TAG).split(",")] handler_def = {"tags": tags} if plugins: handler_def["plugins"] = plugins processes[handler_id] = handler_def default_handler = ConfiguresHandlers.get_xml_default(config, config_element) if default_handler: handling_config_dict["default"] = default_handler assign = listify(config_element.attrib.get("assign_with", []), do_strip=True) if len(assign) > 0: handling_config_dict["assign"] = assign max_grab_str = config_element.attrib.get("max_grab", None) if max_grab_str: handling_config_dict["max_grab"] = int(max_grab_str) ready_window_size_str = config_element.attrib.get("ready_window_size", None) if ready_window_size_str: handling_config_dict["ready_window_size"] = int(ready_window_size_str) return handling_config_dict
def _init_handlers(self, handling_config_dict=None): handling_config_dict = handling_config_dict or {} for handler_id, process in handling_config_dict.get("processes", {}).items(): process = process or {} if handler_id in self.handlers: log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id) else: log.debug("Read definition for handler '%s'", handler_id) self._parse_handler(handler_id, process) self.add_handler(handler_id, process.get("tags") or [self.DEFAULT_HANDLER_TAG]) self.default_handler_id = self._ensure_default_set( handling_config_dict.get("default"), list(self.handlers.keys()), required=False ) def _init_handler_assignment_methods(self, handling_config_dict=None): handling_config_dict = handling_config_dict or {} self.__is_handler = None # This is set by the stack job handler init code self.pool_for_tag = {} self._handler_assignment_method_methods = { HANDLER_ASSIGNMENT_METHODS.MEM_SELF: self._assign_mem_self_handler, HANDLER_ASSIGNMENT_METHODS.DB_SELF: self._assign_db_self_handler, HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN: self._assign_db_preassign_handler, HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._assign_db_tag, HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: self._assign_db_tag, } if handling_config_dict: for method in handling_config_dict.get("assign", []): method = method.lower() assert HANDLER_ASSIGNMENT_METHODS.has_value( method ), "Invalid job handler assignment method '{}', must be one of: {}".format( method, ", ".join(h.value for h in HANDLER_ASSIGNMENT_METHODS) ) try: self.handler_assignment_methods.append(method) except AttributeError: self.handler_assignment_methods_configured = True self.handler_assignment_methods = [method] if self.handler_assignment_methods == [HANDLER_ASSIGNMENT_METHODS.MEM_SELF]: self.app.config.track_jobs_in_database = False self.handler_max_grab = handling_config_dict.get("max_grab", self.handler_max_grab) if self.handler_max_grab is not None: self.handler_max_grab = int(self.handler_max_grab) def _set_default_handler_assignment_methods(self): if not self.handler_assignment_methods_configured: if not self.app.config.track_jobs_in_database: # DEPRECATED: You should just set mem_self as the only method if you want this log.warning( "The `track_jobs_in_database` option is deprecated, please set `%s` as the job" " handler assignment method in the job handler configuration", HANDLER_ASSIGNMENT_METHODS.MEM_SELF, ) self.handler_assignment_methods = [HANDLER_ASSIGNMENT_METHODS.MEM_SELF] else: self.handler_assignment_methods = [self.app.application_stack.get_preferred_handler_assignment_method()] # If the stack has handler pools it can override these defaults self.app.application_stack.init_job_handling(self) log.info( "%s: No job handler assignment method is set, defaulting to '%s', set the `assign_with` attribute" " on <handlers> to override the default", self.__class__.__name__, self.handler_assignment_methods[0], ) def _parse_handler(self, handler_id, handler_def): pass
[docs] @staticmethod def get_xml_default(config, parent): rval = parent.get("default") if "default_from_environ" in parent.attrib: environ_var = parent.attrib["default_from_environ"] rval = os.environ.get(environ_var, rval) elif "default_from_config" in parent.attrib: config_val = parent.attrib["default_from_config"] rval = config.config_dict.get(config_val, rval) return rval
def _get_default(self, config, parent, names, auto=False, required=True): """ Returns the default attribute set in a parent tag like <handlers> or <destinations>, or return the ID of the child, if there is no explicit default and only one child. :param parent: Object representing a tag that may or may not have a 'default' attribute. :type parent: ``lxml.etree._Element`` :param names: The list of destination or handler IDs or tags that were loaded. :type names: list of str :param auto: Automatically set a default if there is no default in the parent tag and there is only one child. :type auto: bool :param required: Require a default to be set or determined automatically, else raise Exception :type required: bool :returns: str -- id or tag representing the default. """ rval = ConfiguresHandlers.get_xml_default(config, parent) return self._ensure_default_set(rval, names, auto=auto, required=required) def _ensure_default_set(self, rval, names, auto=False, required=True): if rval is not None: # If the parent element has a 'default' attribute, use the id or tag in that attribute if required and rval not in names: raise Exception(f"default attribute '{rval}' does not match a defined id or tag in a child element") log.debug(f"default set to child with id or tag '{rval}'") elif auto and len(names) == 1: log.info(f"Setting default to child with id '{names[0]}'") rval = names[0] elif required: raise Exception("No default specified, please specify a valid id or tag with the 'default' attribute") return rval @staticmethod def _findall_with_required(parent, match, attribs=None): """Like ``lxml.etree.Element.findall()``, except only returns children that have the specified attribs. :param parent: Parent element in which to find. :type parent: ``lxml.etree._Element`` :param match: Name of child elements to find. :type match: str :param attribs: List of required attributes in children elements. :type attribs: list of str :returns: list of ``lxml.etree._Element`` """ rval = [] if attribs is None: attribs = ("id",) for elem in parent.findall(match): for attrib in attribs: if attrib not in elem.attrib: log.warning(f"required '{attrib}' attribute is missing from <{match}> element") break else: rval.append(elem) return rval @property def deterministic_handler_assignment(self): return self.handler_assignment_methods and any( filter(lambda x: x == HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN), self.handler_assignment_methods ) def _get_is_handler(self): """Indicate whether the current server is configured as a handler. :return: bool """ if self.__is_handler is not None: return self.__is_handler if ( HANDLER_ASSIGNMENT_METHODS.DB_SELF in self.handler_assignment_methods or HANDLER_ASSIGNMENT_METHODS.MEM_SELF in self.handler_assignment_methods ): return True for collection in self.handlers.values(): if self.app.config.server_name in collection: return True if ( not self.handlers and not self.handler_assignment_methods_configured and ( HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION in self.handler_assignment_methods or HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED in self.handler_assignment_methods ) ): return True return False def _set_is_handler(self, value): self.__is_handler = value is_handler = property(_get_is_handler, _set_is_handler) def _get_single_item(self, collection, index=None): """Given a collection of handlers or destinations, return one item from the collection at random.""" # Done like this to avoid random under the assumption it's faster to avoid it if len(collection) == 1: return collection[0] elif index is None: return random.choice(collection) else: return collection[index % len(collection)] @property def handler_tags(self): """Get an iterable of all configured handler tags.""" return filter(lambda k: isinstance(self.handlers[k], list), self.handlers.keys()) @property def self_handler_tags(self): """Get an iterable of the current process's configured handler tags.""" return [k for k in self.handler_tags if self.app.config.server_name in self.handlers[k]] or [ self.DEFAULT_HANDLER_TAG ] # If these get to be any more complex we should probably modularize them, or at least move to a separate class def _assign_handler_direct(self, obj, configured, flush=True): """Directly assign a handler if the object has been preconfigured to a known single static handler. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :returns: str -- A valid handler ID, or False if no handler was assigned. """ if self.app.config.track_jobs_in_database and configured: try: handlers = self.handlers[configured] except KeyError: handlers = None if handlers == (configured,): obj.set_handler(configured) if flush: _timed_flush_obj(obj) return configured return False def _assign_mem_self_handler(self, obj, method, configured, queue_callback=None, flush=True, **kwargs): """Assign object to this handler using this process's in-memory queue. This method ignores all handler configuration. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`. :param configured: Ignored. :param queue_callback: Callback to be executed when the job should be queued (i.e. a callback to the handler's ``put()`` method). No arguments are passed. :type queue_callback: callable :returns: str -- This process's server name (handler ID). """ assert ( queue_callback is not None ), f"Cannot perform '{HANDLER_ASSIGNMENT_METHODS.MEM_SELF}' handler assignment: `queue_callback` is None" if configured: log.warning( "(%s) Ignoring handler assignment to '%s' because configured handler assignment method" " '' overrides per-tool handler assignment", obj.log_str(), HANDLER_ASSIGNMENT_METHODS.MEM_SELF, configured, ) if flush: _timed_flush_obj(obj) queue_callback() return self.app.config.server_name def _assign_db_self_handler(self, obj, method, configured, flush=True, **kwargs): """Assign object to this process by setting its ``handler`` column in the database to this process. This only occurs if there is not an explicitly configured handler assignment for the object. Otherwise, it is passed to the DB_PREASSIGN method for assignment. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :returns: str -- The assigned handler ID. """ if configured: return self._handler_assignment_method_methods[HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN]( obj, method, configured, **kwargs ) obj.set_handler(self.app.config.server_name) if flush: _timed_flush_obj(obj) return self.app.config.server_name def _assign_db_preassign_handler(self, obj, method, configured, index=None, flush=True, **kwargs): """Assign object to a handler by setting its ``handler`` column in the database to a handler selected at random from the known handlers in the appropriate tag. Given a handler ID or tag, return a handler matching it, of those handlers that are statically configured in the job configuration, or known via preconfigured pools. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Assignment method currently being checked. :type method: Value in :data:`HANDLER_ASSIGNMENT_METHODS`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :param index: Generate "consistent" "random" handlers with this index if specified. :type index: int :raises KeyError: if the configured or default handler is not a known handler ID or tag. :returns: str -- A valid job handler ID. """ handler = configured if handler is None: handler = self.default_handler_id or self.DEFAULT_HANDLER_TAG # Get a random handler ID from the possible handlers. If the admin has configured a tool with a handler tag that # does not exist, or if there are no default handlers and configured is None, this will raise KeyError to # assign_handler, which should log it, try the next method (if any), and if no other methods succeed, raise # HandlerAssigmentError. handler_id = self._get_single_item(self.handlers[handler], index=index) if handler != handler_id: log.debug( "(%s) Selected handler '%s' by random choice from handler tag '%s'", obj.log_str(), handler_id, handler ) obj.set_handler(handler_id) if flush: _timed_flush_obj(obj) return handler_id def _assign_db_tag(self, obj, method, configured, flush=True, **kwargs): """Assign object to a handler by setting its ``handler`` column in the database to either the configured handler ID or tag, or to the default tag (or ``_default_``) :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :returns: str -- The assigned handler pool. """ handler = configured if handler is None: handler = self.default_handler_id or self.DEFAULT_HANDLER_TAG obj.set_handler(handler) if flush: _timed_flush_obj(obj) return handler
[docs] def assign_handler(self, obj, configured=None, **kwargs): """Set a job handler, flush obj Called assignment methods should raise py:class:`HandlerAssignmentSkip` to indicate that the next method should be tried. :param obj: Object to assign a handler to (must be a model object with ``handler`` attribute and ``log_str`` callable). :type obj: instance of :class:`galaxy.model.Job` or other model object with a ``set_handler()`` method. :param configured: Preconfigured handler (ID, tag, or None) for the given object. :type configured: str or None. :returns: bool -- True on successful assignment, False otherwise. """ # It's a bit awkward that the method that actually hands off job execution is in the JobConfiguration, but # that's currently the best place for it. It's worth noting that this method is also part of the # WorkflowSchedulingManager, which acts like a combined JobConfiguration and JobManager. Combining those two # classes would probably be reasonable (and would remove the need for the queue callback). if self._assign_handler_direct(obj, configured, flush=kwargs.get("flush", True)): log.info( "(%s) Skipped handler assignment logic due to explicit configuration` to a single handler: %s", obj.log_str(), configured, ) return True for method in self.handler_assignment_methods: try: handler = self._handler_assignment_method_methods[method](obj, method, configured=configured, **kwargs) log.info("(%s) Handler '%s' assigned using '%s' assignment method", obj.log_str(), handler, method) return handler except HandlerAssignmentSkip: log.debug( "(%s) Handler assignment method '%s' did not assign a handler, trying next method", obj.log_str(), method, ) except Exception: log.exception("Caught exception in handler assignment method: %s", method) else: # Ideally we could just expunge the object from the SA session here, but in most cases, some of its related # objects have already been created, so instead we'll just have to fail it. log.error("(%s) Failed to select handler", obj.log_str()) raise HandlerAssignmentError("Job handler assignment failed.", obj=obj)
def _timed_flush_obj(obj): obj_flush_timer = ExecutionTimer() sa_session = object_session(obj) with transaction(sa_session): sa_session.commit() log.info(f"Flushed transaction for {obj.log_str()} {obj_flush_timer}")