Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web_stack.handlers

"""Utilities for dealing with the Galaxy 'handler' process pattern.

A 'handler' is a named Python process running the Galaxy application responsible
for some activity such as queuing up jobs or scheduling workflows.
"""

import logging
import os
import random
from collections.abc import (
    Iterable,
    Sequence,
)
from enum import Enum
from typing import (
    Any,
    Callable,
    Protocol,
    TYPE_CHECKING,
    TypeVar,
    Union,
)

from sqlalchemy.orm import object_session
from typing_extensions import (
    Concatenate,
    Literal,
)

from galaxy.exceptions import HandlerAssignmentError
from galaxy.util import (
    ExecutionTimer,
    listify,
    unicodify,
)

if TYPE_CHECKING:
    from galaxy.structured_app import MinimalManagerApp
    from galaxy.util import Element

log = logging.getLogger(__name__)


[docs] class HANDLER_ASSIGNMENT_METHODS(str, Enum): MEM_SELF = "mem-self" DB_SELF = "db-self" DB_PREASSIGN = "db-preassign" DB_TRANSACTION_ISOLATION = "db-transaction-isolation" DB_SKIP_LOCKED = "db-skip-locked"
[docs] @classmethod def has_value(cls, value): return value in cls._value2member_map_
[docs] class HandlerAssignmentSkip(Exception): """Exception for handler assignment methods to raise if the next method should be tried."""
[docs] class ModelWithHandler(Protocol):
[docs] def log_str(self) -> str: ...
[docs] def set_handler(self, handler: str) -> None: ...
T = TypeVar("T")
[docs] class ConfiguresHandlers: DEFAULT_HANDLER_TAG = "_default_" DEFAULT_BASE_HANDLER_POOLS: tuple[str, ...] = ()
[docs] def __init__(self, app: "MinimalManagerApp") -> None: self.app = app self.handler_assignment_methods: list[HANDLER_ASSIGNMENT_METHODS] = [] self.handler_assignment_methods_configured = False self.handler_max_grab: Union[int, None] = None self.handlers: dict[str, list[str]] = {}
[docs] def add_handler(self, handler_id: str, tags: list[str]) -> None: if handler_id not in self.handlers: self.handlers[handler_id] = [handler_id] for tag in tags: if tag in self.handlers and handler_id not in self.handlers[tag]: self.handlers[tag].append(handler_id) else: self.handlers[tag] = [handler_id]
[docs] @staticmethod def xml_to_dict(config, config_element: Union["Element", None]) -> dict[str, Any]: processes: dict[str, dict[str, list[str]]] = {} handling_config_dict: dict[str, Any] = {"processes": processes} # Parse handlers if config_element is not None: for handler in ConfiguresHandlers._findall_with_required(config_element, "handler"): handler_id = handler.get("id") assert handler_id is not None # guaranteed by _findall_with_required() if handler_id in processes: log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id) else: log.debug("Read definition for handler '%s'", handler_id) plugins = [] for plugin in ConfiguresHandlers._findall_with_required(handler, "plugin", ["id"]): plugin_id = plugin.get("id") assert plugin_id is not None # guaranteed by _findall_with_required() plugins.append(plugin_id) tags = [x.strip() for x in handler.get("tags", ConfiguresHandlers.DEFAULT_HANDLER_TAG).split(",")] handler_def = {"tags": tags} if plugins: handler_def["plugins"] = plugins processes[handler_id] = handler_def default_handler = ConfiguresHandlers.get_xml_default(config, config_element) if default_handler: handling_config_dict["default"] = default_handler assign = listify(config_element.attrib.get("assign_with", []), do_strip=True) if len(assign) > 0: handling_config_dict["assign"] = assign max_grab_str = config_element.attrib.get("max_grab", None) if max_grab_str: handling_config_dict["max_grab"] = int(max_grab_str) ready_window_size_str = config_element.attrib.get("ready_window_size", None) if ready_window_size_str: handling_config_dict["ready_window_size"] = int(ready_window_size_str) return handling_config_dict
def _init_handlers(self, handling_config_dict: Union[dict, None]) -> None: handling_config_dict = handling_config_dict or {} for handler_id, process in handling_config_dict.get("processes", {}).items(): process = process or {} if handler_id in self.handlers: log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id) else: log.debug("Read definition for handler '%s'", handler_id) self._parse_handler(handler_id, process) self.add_handler(handler_id, process.get("tags") or [self.DEFAULT_HANDLER_TAG]) self.default_handler_id = self._ensure_default_set( handling_config_dict.get("default"), list(self.handlers.keys()), required=False ) def _init_handler_assignment_methods(self, handling_config_dict: Union[dict, None] = None) -> None: handling_config_dict = handling_config_dict or {} self.__is_handler: Union[bool, None] = None # This is set by the stack job handler init code self.pool_for_tag: dict[str, str] = {} self._handler_assignment_method_methods: dict[ HANDLER_ASSIGNMENT_METHODS, Callable[Concatenate[ModelWithHandler, HANDLER_ASSIGNMENT_METHODS, Union[str, None], bool, ...], str], ] = { HANDLER_ASSIGNMENT_METHODS.MEM_SELF: self._assign_mem_self_handler, HANDLER_ASSIGNMENT_METHODS.DB_SELF: self._assign_db_self_handler, HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN: self._assign_db_preassign_handler, HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._assign_db_tag, HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: self._assign_db_tag, } if handling_config_dict: for method in handling_config_dict.get("assign", []): method = method.lower() assert HANDLER_ASSIGNMENT_METHODS.has_value( method ), "Invalid job handler assignment method '{}', must be one of: {}".format( method, ", ".join(h.value for h in HANDLER_ASSIGNMENT_METHODS) ) self.handler_assignment_methods.append(method) self.handler_assignment_methods_configured = True if self.handler_assignment_methods == [HANDLER_ASSIGNMENT_METHODS.MEM_SELF]: self.app.config.track_jobs_in_database = False if (max_grab := handling_config_dict.get("max_grab")) is not None: self.handler_max_grab = int(max_grab) def _set_default_handler_assignment_methods(self) -> None: if not self.handler_assignment_methods_configured: if not self.app.config.track_jobs_in_database: # DEPRECATED: You should just set mem_self as the only method if you want this log.warning( "The `track_jobs_in_database` option is deprecated, please set `%s` as the job" " handler assignment method in the job handler configuration", HANDLER_ASSIGNMENT_METHODS.MEM_SELF, ) self.handler_assignment_methods = [HANDLER_ASSIGNMENT_METHODS.MEM_SELF] else: self.handler_assignment_methods = [self.app.application_stack.get_preferred_handler_assignment_method()] # If the stack has handler pools it can override these defaults self.app.application_stack.init_job_handling(self) log.info( "%s: No job handler assignment method is set, defaulting to '%s', set the `assign_with` attribute" " on <handlers> to override the default", self.__class__.__name__, self.handler_assignment_methods[0], ) def _parse_handler(self, handler_id, handler_def): pass
[docs] @staticmethod def get_xml_default(config, parent: "Element"): rval = parent.get("default") if "default_from_environ" in parent.attrib: environ_var = unicodify(parent.attrib["default_from_environ"]) rval = os.environ.get(environ_var, rval) elif "default_from_config" in parent.attrib: config_val = unicodify(parent.attrib["default_from_config"]) rval = config.config_dict.get(config_val, rval) return rval
def _get_default( self, config, parent: "Element", names: list[str], auto: bool = False, required: bool = True ) -> Union[str, None]: """ Returns the default attribute set in a parent tag like <handlers> or <destinations>, or return the ID of the child, if there is no explicit default and only one child. :param parent: Object representing a tag that may or may not have a 'default' attribute. :type parent: ``lxml.etree._Element`` :param names: The list of destination or handler IDs or tags that were loaded. :type names: list of str :param auto: Automatically set a default if there is no default in the parent tag and there is only one child. :type auto: bool :param required: Require a default to be set or determined automatically, else raise Exception :type required: bool :returns: str -- id or tag representing the default. """ rval = ConfiguresHandlers.get_xml_default(config, parent) return self._ensure_default_set(rval, names, auto=auto, required=required) def _ensure_default_set( self, rval: Union[str, None], names: list[str], auto: bool = False, required: bool = True ) -> Union[str, None]: if rval is not None: # If the parent element has a 'default' attribute, use the id or tag in that attribute if required and rval not in names: raise Exception(f"default attribute '{rval}' does not match a defined id or tag in a child element") log.debug(f"default set to child with id or tag '{rval}'") elif auto and len(names) == 1: log.info(f"Setting default to child with id '{names[0]}'") rval = names[0] elif required: raise Exception("No default specified, please specify a valid id or tag with the 'default' attribute") return rval @staticmethod def _findall_with_required( parent: "Element", match: str, attribs: Union[Iterable[str], None] = None ) -> list["Element"]: """Like ``lxml.etree.Element.findall()``, except only returns children that have the specified attribs. :param parent: Parent element in which to find. :type parent: ``lxml.etree._Element`` :param match: Name of child elements to find. :type match: str :param attribs: List of required attributes in children elements. :type attribs: list of str :returns: list of ``lxml.etree._Element`` """ rval: list[Element] = [] if attribs is None: attribs = ("id",) for elem in parent.findall(match): for attrib in attribs: if attrib not in elem.attrib: log.warning(f"required '{attrib}' attribute is missing from <{match}> element") break else: rval.append(elem) return rval @property def deterministic_handler_assignment(self): return self.handler_assignment_methods and any( filter(lambda x: x == HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN), self.handler_assignment_methods ) def _get_is_handler(self) -> bool: """Indicate whether the current server is configured as a handler. :return: bool """ if self.__is_handler is not None: return self.__is_handler if ( HANDLER_ASSIGNMENT_METHODS.DB_SELF in self.handler_assignment_methods or HANDLER_ASSIGNMENT_METHODS.MEM_SELF in self.handler_assignment_methods ): return True for collection in self.handlers.values(): if self.app.config.server_name in collection: return True if ( not self.handlers and not self.handler_assignment_methods_configured and ( HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION in self.handler_assignment_methods or HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED in self.handler_assignment_methods ) ): return True return False def _set_is_handler(self, value: bool) -> None: self.__is_handler = value is_handler = property(_get_is_handler, _set_is_handler) def _get_single_item(self, collection: Sequence[T], index: Union[int, None] = None) -> T: """Given a collection of handlers or destinations, return one item from the collection at random.""" # Done like this to avoid random under the assumption it's faster to avoid it if len(collection) == 1: return collection[0] elif index is None: return random.choice(collection) else: return collection[index % len(collection)] @property def handler_tags(self): """Get an iterable of all configured handler tags.""" return filter(lambda k: self.handlers[k] != [k], self.handlers.keys()) @property def self_handler_tags(self): """Get an iterable of the current process's configured handler tags.""" return [k for k in self.handler_tags if self.app.config.server_name in self.handlers[k]] or [ self.DEFAULT_HANDLER_TAG ] # If these get to be any more complex we should probably modularize them, or at least move to a separate class def _assign_handler_direct( self, obj: ModelWithHandler, configured: Union[str, None], flush: bool = True ) -> Union[str, Literal[False]]: """Directly assign a handler if the object has been preconfigured to a known single static handler. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :returns: str -- A valid handler ID, or False if no handler was assigned. """ if self.app.config.track_jobs_in_database and configured: try: handlers = self.handlers[configured] except KeyError: handlers = None if handlers == [configured]: obj.set_handler(configured) if flush: _timed_flush_obj(obj) return configured return False def _assign_mem_self_handler( self, obj: ModelWithHandler, method: HANDLER_ASSIGNMENT_METHODS, configured: Union[str, None], flush: bool, queue_callback=None, **kwargs, ) -> str: """Assign object to this handler using this process's in-memory queue. This method ignores all handler configuration. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`. :param configured: Ignored. :param queue_callback: Callback to be executed when the job should be queued (i.e. a callback to the handler's ``put()`` method). No arguments are passed. :type queue_callback: callable :returns: str -- This process's server name (handler ID). """ assert ( queue_callback is not None ), f"Cannot perform '{HANDLER_ASSIGNMENT_METHODS.MEM_SELF}' handler assignment: `queue_callback` is None" if configured: log.warning( "(%s) Ignoring handler assignment to '%s' because configured handler assignment method" " '' overrides per-tool handler assignment", obj.log_str(), HANDLER_ASSIGNMENT_METHODS.MEM_SELF, configured, ) if flush: _timed_flush_obj(obj) queue_callback() return self.app.config.server_name def _assign_db_self_handler( self, obj: ModelWithHandler, method: HANDLER_ASSIGNMENT_METHODS, configured: Union[str, None], flush: bool, **kwargs, ) -> str: """Assign object to this process by setting its ``handler`` column in the database to this process. This only occurs if there is not an explicitly configured handler assignment for the object. Otherwise, it is passed to the DB_PREASSIGN method for assignment. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :returns: str -- The assigned handler ID. """ if configured: return self._handler_assignment_method_methods[HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN]( obj, method, configured, flush, **kwargs ) obj.set_handler(self.app.config.server_name) if flush: _timed_flush_obj(obj) return self.app.config.server_name def _assign_db_preassign_handler( self, obj: ModelWithHandler, method: HANDLER_ASSIGNMENT_METHODS, configured: Union[str, None], flush: bool, index: Union[int, None] = None, **kwargs, ) -> str: """Assign object to a handler by setting its ``handler`` column in the database to a handler selected at random from the known handlers in the appropriate tag. Given a handler ID or tag, return a handler matching it, of those handlers that are statically configured in the job configuration, or known via preconfigured pools. :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Assignment method currently being checked. :type method: Value in :data:`HANDLER_ASSIGNMENT_METHODS`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :param index: Generate "consistent" "random" handlers with this index if specified. :type index: int :raises KeyError: if the configured or default handler is not a known handler ID or tag. :returns: str -- A valid job handler ID. """ handler = configured if handler is None: handler = self.default_handler_id or self.DEFAULT_HANDLER_TAG # Get a random handler ID from the possible handlers. If the admin has configured a tool with a handler tag that # does not exist, or if there are no default handlers and configured is None, this will raise KeyError to # assign_handler, which should log it, try the next method (if any), and if no other methods succeed, raise # HandlerAssigmentError. handler_id = self._get_single_item(self.handlers[handler], index=index) if handler != handler_id: log.debug( "(%s) Selected handler '%s' by random choice from handler tag '%s'", obj.log_str(), handler_id, handler ) obj.set_handler(handler_id) if flush: _timed_flush_obj(obj) return handler_id def _assign_db_tag( self, obj: ModelWithHandler, method: HANDLER_ASSIGNMENT_METHODS, configured: Union[str, None], flush: bool, **kwargs, ) -> str: """Assign object to a handler by setting its ``handler`` column in the database to either the configured handler ID or tag, or to the default tag (or ``_default_``) :param obj: Same as :method:`ConfiguresHandlers.assign_handler()`. :param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`. :param configured: Same as :method:`ConfiguresHandlers.assign_handler()`. :returns: str -- The assigned handler pool. """ handler = configured if handler is None: handler = self.default_handler_id or self.DEFAULT_HANDLER_TAG obj.set_handler(handler) if flush: _timed_flush_obj(obj) return handler
[docs] def assign_handler(self, obj: ModelWithHandler, configured: Union[str, None] = None, flush: bool = True, **kwargs): """Set a job handler, flush obj Called assignment methods should raise py:class:`HandlerAssignmentSkip` to indicate that the next method should be tried. :param obj: Model object to assign a handler to. :type obj: model object with ``set_handler()`` and ``log_str()`` methods. :param configured: Preconfigured handler (ID, tag, or None) for the given object. :type configured: str or None. :returns: str -- The assigned handler ID or tag. """ # It's a bit awkward that the method that actually hands off job execution is in the JobConfiguration, but # that's currently the best place for it. It's worth noting that this method is also part of the # WorkflowSchedulingManager, which acts like a combined JobConfiguration and JobManager. Combining those two # classes would probably be reasonable (and would remove the need for the queue callback). if handler := self._assign_handler_direct(obj, configured, flush=flush): log.info( "(%s) Skipped handler assignment logic due to explicit configuration` to a single handler: %s", obj.log_str(), configured, ) return handler for method in self.handler_assignment_methods: try: handler = self._handler_assignment_method_methods[method](obj, method, configured, flush, **kwargs) log.info("(%s) Handler '%s' assigned using '%s' assignment method", obj.log_str(), handler, method) return handler except HandlerAssignmentSkip: log.debug( "(%s) Handler assignment method '%s' did not assign a handler, trying next method", obj.log_str(), method, ) except Exception: log.exception("Caught exception in handler assignment method: %s", method) else: # Ideally we could just expunge the object from the SA session here, but in most cases, some of its related # objects have already been created, so instead we'll just have to fail it. log.error("(%s) Failed to select handler", obj.log_str()) raise HandlerAssignmentError("Job handler assignment failed.", obj=obj)
def _timed_flush_obj(obj): obj_flush_timer = ExecutionTimer() sa_session = object_session(obj) sa_session.commit() log.info(f"Flushed transaction for {obj.log_str()} {obj_flush_timer}")