Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web_stack.handlers
"""Utilities for dealing with the Galaxy 'handler' process pattern.
A 'handler' is a named Python process running the Galaxy application responsible
for some activity such as queuing up jobs or scheduling workflows.
"""
import logging
import os
import random
from collections.abc import (
Iterable,
Sequence,
)
from enum import Enum
from typing import (
Any,
Callable,
Protocol,
TYPE_CHECKING,
TypeVar,
Union,
)
from sqlalchemy.orm import object_session
from typing_extensions import (
Concatenate,
Literal,
)
from galaxy.exceptions import HandlerAssignmentError
from galaxy.util import (
ExecutionTimer,
listify,
unicodify,
)
if TYPE_CHECKING:
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import Element
log = logging.getLogger(__name__)
[docs]
class HANDLER_ASSIGNMENT_METHODS(str, Enum):
MEM_SELF = "mem-self"
DB_SELF = "db-self"
DB_PREASSIGN = "db-preassign"
DB_TRANSACTION_ISOLATION = "db-transaction-isolation"
DB_SKIP_LOCKED = "db-skip-locked"
[docs]
class HandlerAssignmentSkip(Exception):
"""Exception for handler assignment methods to raise if the next method should be tried."""
T = TypeVar("T")
[docs]
class ConfiguresHandlers:
DEFAULT_HANDLER_TAG = "_default_"
DEFAULT_BASE_HANDLER_POOLS: tuple[str, ...] = ()
[docs]
def __init__(self, app: "MinimalManagerApp") -> None:
self.app = app
self.handler_assignment_methods: list[HANDLER_ASSIGNMENT_METHODS] = []
self.handler_assignment_methods_configured = False
self.handler_max_grab: Union[int, None] = None
self.handlers: dict[str, list[str]] = {}
[docs]
def add_handler(self, handler_id: str, tags: list[str]) -> None:
if handler_id not in self.handlers:
self.handlers[handler_id] = [handler_id]
for tag in tags:
if tag in self.handlers and handler_id not in self.handlers[tag]:
self.handlers[tag].append(handler_id)
else:
self.handlers[tag] = [handler_id]
[docs]
@staticmethod
def xml_to_dict(config, config_element: Union["Element", None]) -> dict[str, Any]:
processes: dict[str, dict[str, list[str]]] = {}
handling_config_dict: dict[str, Any] = {"processes": processes}
# Parse handlers
if config_element is not None:
for handler in ConfiguresHandlers._findall_with_required(config_element, "handler"):
handler_id = handler.get("id")
assert handler_id is not None # guaranteed by _findall_with_required()
if handler_id in processes:
log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id)
else:
log.debug("Read definition for handler '%s'", handler_id)
plugins = []
for plugin in ConfiguresHandlers._findall_with_required(handler, "plugin", ["id"]):
plugin_id = plugin.get("id")
assert plugin_id is not None # guaranteed by _findall_with_required()
plugins.append(plugin_id)
tags = [x.strip() for x in handler.get("tags", ConfiguresHandlers.DEFAULT_HANDLER_TAG).split(",")]
handler_def = {"tags": tags}
if plugins:
handler_def["plugins"] = plugins
processes[handler_id] = handler_def
default_handler = ConfiguresHandlers.get_xml_default(config, config_element)
if default_handler:
handling_config_dict["default"] = default_handler
assign = listify(config_element.attrib.get("assign_with", []), do_strip=True)
if len(assign) > 0:
handling_config_dict["assign"] = assign
max_grab_str = config_element.attrib.get("max_grab", None)
if max_grab_str:
handling_config_dict["max_grab"] = int(max_grab_str)
ready_window_size_str = config_element.attrib.get("ready_window_size", None)
if ready_window_size_str:
handling_config_dict["ready_window_size"] = int(ready_window_size_str)
return handling_config_dict
def _init_handlers(self, handling_config_dict: Union[dict, None]) -> None:
handling_config_dict = handling_config_dict or {}
for handler_id, process in handling_config_dict.get("processes", {}).items():
process = process or {}
if handler_id in self.handlers:
log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id)
else:
log.debug("Read definition for handler '%s'", handler_id)
self._parse_handler(handler_id, process)
self.add_handler(handler_id, process.get("tags") or [self.DEFAULT_HANDLER_TAG])
self.default_handler_id = self._ensure_default_set(
handling_config_dict.get("default"), list(self.handlers.keys()), required=False
)
def _init_handler_assignment_methods(self, handling_config_dict: Union[dict, None] = None) -> None:
handling_config_dict = handling_config_dict or {}
self.__is_handler: Union[bool, None] = None
# This is set by the stack job handler init code
self.pool_for_tag: dict[str, str] = {}
self._handler_assignment_method_methods: dict[
HANDLER_ASSIGNMENT_METHODS,
Callable[Concatenate[ModelWithHandler, HANDLER_ASSIGNMENT_METHODS, Union[str, None], bool, ...], str],
] = {
HANDLER_ASSIGNMENT_METHODS.MEM_SELF: self._assign_mem_self_handler,
HANDLER_ASSIGNMENT_METHODS.DB_SELF: self._assign_db_self_handler,
HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN: self._assign_db_preassign_handler,
HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._assign_db_tag,
HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: self._assign_db_tag,
}
if handling_config_dict:
for method in handling_config_dict.get("assign", []):
method = method.lower()
assert HANDLER_ASSIGNMENT_METHODS.has_value(
method
), "Invalid job handler assignment method '{}', must be one of: {}".format(
method, ", ".join(h.value for h in HANDLER_ASSIGNMENT_METHODS)
)
self.handler_assignment_methods.append(method)
self.handler_assignment_methods_configured = True
if self.handler_assignment_methods == [HANDLER_ASSIGNMENT_METHODS.MEM_SELF]:
self.app.config.track_jobs_in_database = False
if (max_grab := handling_config_dict.get("max_grab")) is not None:
self.handler_max_grab = int(max_grab)
def _set_default_handler_assignment_methods(self) -> None:
if not self.handler_assignment_methods_configured:
if not self.app.config.track_jobs_in_database:
# DEPRECATED: You should just set mem_self as the only method if you want this
log.warning(
"The `track_jobs_in_database` option is deprecated, please set `%s` as the job"
" handler assignment method in the job handler configuration",
HANDLER_ASSIGNMENT_METHODS.MEM_SELF,
)
self.handler_assignment_methods = [HANDLER_ASSIGNMENT_METHODS.MEM_SELF]
else:
self.handler_assignment_methods = [self.app.application_stack.get_preferred_handler_assignment_method()]
# If the stack has handler pools it can override these defaults
self.app.application_stack.init_job_handling(self)
log.info(
"%s: No job handler assignment method is set, defaulting to '%s', set the `assign_with` attribute"
" on <handlers> to override the default",
self.__class__.__name__,
self.handler_assignment_methods[0],
)
def _parse_handler(self, handler_id, handler_def):
pass
[docs]
@staticmethod
def get_xml_default(config, parent: "Element"):
rval = parent.get("default")
if "default_from_environ" in parent.attrib:
environ_var = unicodify(parent.attrib["default_from_environ"])
rval = os.environ.get(environ_var, rval)
elif "default_from_config" in parent.attrib:
config_val = unicodify(parent.attrib["default_from_config"])
rval = config.config_dict.get(config_val, rval)
return rval
def _get_default(
self, config, parent: "Element", names: list[str], auto: bool = False, required: bool = True
) -> Union[str, None]:
"""
Returns the default attribute set in a parent tag like <handlers> or
<destinations>, or return the ID of the child, if there is no explicit
default and only one child.
:param parent: Object representing a tag that may or may not have a 'default' attribute.
:type parent: ``lxml.etree._Element``
:param names: The list of destination or handler IDs or tags that were loaded.
:type names: list of str
:param auto: Automatically set a default if there is no default in the parent tag and there is only one child.
:type auto: bool
:param required: Require a default to be set or determined automatically, else raise Exception
:type required: bool
:returns: str -- id or tag representing the default.
"""
rval = ConfiguresHandlers.get_xml_default(config, parent)
return self._ensure_default_set(rval, names, auto=auto, required=required)
def _ensure_default_set(
self, rval: Union[str, None], names: list[str], auto: bool = False, required: bool = True
) -> Union[str, None]:
if rval is not None:
# If the parent element has a 'default' attribute, use the id or tag in that attribute
if required and rval not in names:
raise Exception(f"default attribute '{rval}' does not match a defined id or tag in a child element")
log.debug(f"default set to child with id or tag '{rval}'")
elif auto and len(names) == 1:
log.info(f"Setting default to child with id '{names[0]}'")
rval = names[0]
elif required:
raise Exception("No default specified, please specify a valid id or tag with the 'default' attribute")
return rval
@staticmethod
def _findall_with_required(
parent: "Element", match: str, attribs: Union[Iterable[str], None] = None
) -> list["Element"]:
"""Like ``lxml.etree.Element.findall()``, except only returns children that have the specified attribs.
:param parent: Parent element in which to find.
:type parent: ``lxml.etree._Element``
:param match: Name of child elements to find.
:type match: str
:param attribs: List of required attributes in children elements.
:type attribs: list of str
:returns: list of ``lxml.etree._Element``
"""
rval: list[Element] = []
if attribs is None:
attribs = ("id",)
for elem in parent.findall(match):
for attrib in attribs:
if attrib not in elem.attrib:
log.warning(f"required '{attrib}' attribute is missing from <{match}> element")
break
else:
rval.append(elem)
return rval
@property
def deterministic_handler_assignment(self):
return self.handler_assignment_methods and any(
filter(lambda x: x == HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN), self.handler_assignment_methods
)
def _get_is_handler(self) -> bool:
"""Indicate whether the current server is configured as a handler.
:return: bool
"""
if self.__is_handler is not None:
return self.__is_handler
if (
HANDLER_ASSIGNMENT_METHODS.DB_SELF in self.handler_assignment_methods
or HANDLER_ASSIGNMENT_METHODS.MEM_SELF in self.handler_assignment_methods
):
return True
for collection in self.handlers.values():
if self.app.config.server_name in collection:
return True
if (
not self.handlers
and not self.handler_assignment_methods_configured
and (
HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION in self.handler_assignment_methods
or HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED in self.handler_assignment_methods
)
):
return True
return False
def _set_is_handler(self, value: bool) -> None:
self.__is_handler = value
is_handler = property(_get_is_handler, _set_is_handler)
def _get_single_item(self, collection: Sequence[T], index: Union[int, None] = None) -> T:
"""Given a collection of handlers or destinations, return one item from the collection at random."""
# Done like this to avoid random under the assumption it's faster to avoid it
if len(collection) == 1:
return collection[0]
elif index is None:
return random.choice(collection)
else:
return collection[index % len(collection)]
@property
def handler_tags(self):
"""Get an iterable of all configured handler tags."""
return filter(lambda k: self.handlers[k] != [k], self.handlers.keys())
@property
def self_handler_tags(self):
"""Get an iterable of the current process's configured handler tags."""
return [k for k in self.handler_tags if self.app.config.server_name in self.handlers[k]] or [
self.DEFAULT_HANDLER_TAG
]
# If these get to be any more complex we should probably modularize them, or at least move to a separate class
def _assign_handler_direct(
self, obj: ModelWithHandler, configured: Union[str, None], flush: bool = True
) -> Union[str, Literal[False]]:
"""Directly assign a handler if the object has been preconfigured to a known single static handler.
:param obj: Same as :method:`ConfiguresHandlers.assign_handler()`.
:param configured: Same as :method:`ConfiguresHandlers.assign_handler()`.
:returns: str -- A valid handler ID, or False if no handler was assigned.
"""
if self.app.config.track_jobs_in_database and configured:
try:
handlers = self.handlers[configured]
except KeyError:
handlers = None
if handlers == [configured]:
obj.set_handler(configured)
if flush:
_timed_flush_obj(obj)
return configured
return False
def _assign_mem_self_handler(
self,
obj: ModelWithHandler,
method: HANDLER_ASSIGNMENT_METHODS,
configured: Union[str, None],
flush: bool,
queue_callback=None,
**kwargs,
) -> str:
"""Assign object to this handler using this process's in-memory queue.
This method ignores all handler configuration.
:param obj: Same as :method:`ConfiguresHandlers.assign_handler()`.
:param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`.
:param configured: Ignored.
:param queue_callback: Callback to be executed when the job should be queued (i.e. a callback to the handler's
``put()`` method). No arguments are passed.
:type queue_callback: callable
:returns: str -- This process's server name (handler ID).
"""
assert (
queue_callback is not None
), f"Cannot perform '{HANDLER_ASSIGNMENT_METHODS.MEM_SELF}' handler assignment: `queue_callback` is None"
if configured:
log.warning(
"(%s) Ignoring handler assignment to '%s' because configured handler assignment method"
" '' overrides per-tool handler assignment",
obj.log_str(),
HANDLER_ASSIGNMENT_METHODS.MEM_SELF,
configured,
)
if flush:
_timed_flush_obj(obj)
queue_callback()
return self.app.config.server_name
def _assign_db_self_handler(
self,
obj: ModelWithHandler,
method: HANDLER_ASSIGNMENT_METHODS,
configured: Union[str, None],
flush: bool,
**kwargs,
) -> str:
"""Assign object to this process by setting its ``handler`` column in the database to this process.
This only occurs if there is not an explicitly configured handler assignment for the object. Otherwise, it is
passed to the DB_PREASSIGN method for assignment.
:param obj: Same as :method:`ConfiguresHandlers.assign_handler()`.
:param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`.
:param configured: Same as :method:`ConfiguresHandlers.assign_handler()`.
:returns: str -- The assigned handler ID.
"""
if configured:
return self._handler_assignment_method_methods[HANDLER_ASSIGNMENT_METHODS.DB_PREASSIGN](
obj, method, configured, flush, **kwargs
)
obj.set_handler(self.app.config.server_name)
if flush:
_timed_flush_obj(obj)
return self.app.config.server_name
def _assign_db_preassign_handler(
self,
obj: ModelWithHandler,
method: HANDLER_ASSIGNMENT_METHODS,
configured: Union[str, None],
flush: bool,
index: Union[int, None] = None,
**kwargs,
) -> str:
"""Assign object to a handler by setting its ``handler`` column in the database to a handler selected at random
from the known handlers in the appropriate tag.
Given a handler ID or tag, return a handler matching it, of those handlers that are statically configured in
the job configuration, or known via preconfigured pools.
:param obj: Same as :method:`ConfiguresHandlers.assign_handler()`.
:param method: Assignment method currently being checked.
:type method: Value in :data:`HANDLER_ASSIGNMENT_METHODS`.
:param configured: Same as :method:`ConfiguresHandlers.assign_handler()`.
:param index: Generate "consistent" "random" handlers with this index if specified.
:type index: int
:raises KeyError: if the configured or default handler is not a known handler ID or tag.
:returns: str -- A valid job handler ID.
"""
handler = configured
if handler is None:
handler = self.default_handler_id or self.DEFAULT_HANDLER_TAG
# Get a random handler ID from the possible handlers. If the admin has configured a tool with a handler tag that
# does not exist, or if there are no default handlers and configured is None, this will raise KeyError to
# assign_handler, which should log it, try the next method (if any), and if no other methods succeed, raise
# HandlerAssigmentError.
handler_id = self._get_single_item(self.handlers[handler], index=index)
if handler != handler_id:
log.debug(
"(%s) Selected handler '%s' by random choice from handler tag '%s'", obj.log_str(), handler_id, handler
)
obj.set_handler(handler_id)
if flush:
_timed_flush_obj(obj)
return handler_id
def _assign_db_tag(
self,
obj: ModelWithHandler,
method: HANDLER_ASSIGNMENT_METHODS,
configured: Union[str, None],
flush: bool,
**kwargs,
) -> str:
"""Assign object to a handler by setting its ``handler`` column in the database to either the configured handler
ID or tag, or to the default tag (or ``_default_``)
:param obj: Same as :method:`ConfiguresHandlers.assign_handler()`.
:param method: Same as :method:`ConfiguresHandlers._assign_db_preassign_handler()`.
:param configured: Same as :method:`ConfiguresHandlers.assign_handler()`.
:returns: str -- The assigned handler pool.
"""
handler = configured
if handler is None:
handler = self.default_handler_id or self.DEFAULT_HANDLER_TAG
obj.set_handler(handler)
if flush:
_timed_flush_obj(obj)
return handler
[docs]
def assign_handler(self, obj: ModelWithHandler, configured: Union[str, None] = None, flush: bool = True, **kwargs):
"""Set a job handler, flush obj
Called assignment methods should raise py:class:`HandlerAssignmentSkip` to indicate that the next method
should be tried.
:param obj: Model object to assign a handler to.
:type obj: model object with ``set_handler()`` and ``log_str()`` methods.
:param configured: Preconfigured handler (ID, tag, or None) for the given object.
:type configured: str or None.
:returns: str -- The assigned handler ID or tag.
"""
# It's a bit awkward that the method that actually hands off job execution is in the JobConfiguration, but
# that's currently the best place for it. It's worth noting that this method is also part of the
# WorkflowSchedulingManager, which acts like a combined JobConfiguration and JobManager. Combining those two
# classes would probably be reasonable (and would remove the need for the queue callback).
if handler := self._assign_handler_direct(obj, configured, flush=flush):
log.info(
"(%s) Skipped handler assignment logic due to explicit configuration` to a single handler: %s",
obj.log_str(),
configured,
)
return handler
for method in self.handler_assignment_methods:
try:
handler = self._handler_assignment_method_methods[method](obj, method, configured, flush, **kwargs)
log.info("(%s) Handler '%s' assigned using '%s' assignment method", obj.log_str(), handler, method)
return handler
except HandlerAssignmentSkip:
log.debug(
"(%s) Handler assignment method '%s' did not assign a handler, trying next method",
obj.log_str(),
method,
)
except Exception:
log.exception("Caught exception in handler assignment method: %s", method)
else:
# Ideally we could just expunge the object from the SA session here, but in most cases, some of its related
# objects have already been created, so instead we'll just have to fail it.
log.error("(%s) Failed to select handler", obj.log_str())
raise HandlerAssignmentError("Job handler assignment failed.", obj=obj)
def _timed_flush_obj(obj):
obj_flush_timer = ExecutionTimer()
sa_session = object_session(obj)
sa_session.commit()
log.info(f"Flushed transaction for {obj.log_str()} {obj_flush_timer}")