import os
from functools import partial
import galaxy.workflow.schedulers
from galaxy import model
from galaxy.exceptions import HandlerAssignmentError
from galaxy.jobs.handler import InvocationGrabber
from galaxy.model.base import transaction
from galaxy.util import plugin_config
from galaxy.util.custom_logging import get_logger
from galaxy.util.monitors import Monitors
from galaxy.util.xml_macros import load
from galaxy.web_stack.handlers import ConfiguresHandlers
from galaxy.web_stack.message import WorkflowSchedulingMessage
log = get_logger(__name__)
DEFAULT_SCHEDULER_ID = "default" # well actually this should be called DEFAULT_DEFAULT_SCHEDULER_ID...
DEFAULT_SCHEDULER_PLUGIN_TYPE = "core"
EXCEPTION_MESSAGE_SHUTDOWN = "Exception raised while attempting to shutdown workflow scheduler."
EXCEPTION_MESSAGE_NO_SCHEDULERS = "Failed to defined workflow schedulers - no workflow schedulers defined."
EXCEPTION_MESSAGE_NO_DEFAULT_SCHEDULER = (
"Failed to defined workflow schedulers - no workflow scheduler found for default id '%s'."
)
EXCEPTION_MESSAGE_DUPLICATE_SCHEDULERS = (
"Failed to defined workflow schedulers - workflow scheduling plugin id '%s' duplicated."
)
EXCEPTION_MESSAGE_SERIALIZE = "Parallelization is not desired but handler assignment methods are non-deterministic. Set DB_PREASSIGN in workflow_schedulers_conf.xml."
[docs]class WorkflowSchedulingManager(ConfiguresHandlers):
"""A workflow scheduling manager based loosely on pattern established by
``galaxy.manager.JobManager``. Only schedules workflows on handler
processes.
"""
DEFAULT_BASE_HANDLER_POOLS = ("workflow-schedulers", "job-handlers")
[docs] def __init__(self, app):
self.app = app
self.__handlers_configured = False
self.workflow_schedulers = {}
self.active_workflow_schedulers = {}
# Passive workflow schedulers won't need to be monitored I guess.
self.request_monitor = None
self.handlers = {}
self.handler_assignment_methods_configured = False
self.handler_assignment_methods = None
self.handler_max_grab = None
self.default_handler_id = None
self.__plugin_classes = self.__plugins_dict()
self.__init_schedulers()
if self._is_workflow_handler():
log.debug("Starting workflow schedulers")
self.__start_schedulers()
if self.active_workflow_schedulers:
self.__start_request_monitor()
# When assinging handlers to workflows being queued - use job_conf
# if not explicit workflow scheduling handlers have be specified or
# else use those explicit workflow scheduling handlers (on self).
if self.__handlers_configured:
self.__handlers_config = self
else:
self.__handlers_config = app.job_config
if not self._is_workflow_handler():
# Process should not schedule workflows but should check for any unassigned to handlers
self.__startup_recovery()
def __startup_recovery(self):
sa_session = self.app.model.context
for invocation_id in model.WorkflowInvocation.poll_unhandled_workflow_ids(sa_session):
log.info(
"(%s) Handler unassigned at startup, resubmitting workflow invocation for assignment", invocation_id
)
workflow_invocation = sa_session.get(model.WorkflowInvocation, invocation_id)
self._assign_handler(workflow_invocation)
def _handle_setup_msg(self, workflow_invocation_id=None):
sa_session = self.app.model.context
workflow_invocation = sa_session.get(model.WorkflowInvocation, workflow_invocation_id)
if workflow_invocation.handler is None:
workflow_invocation.handler = self.app.config.server_name
sa_session.add(workflow_invocation)
with transaction(sa_session):
sa_session.commit()
else:
log.warning(
"(%s) Handler '%s' received setup message for workflow invocation but handler '%s' is"
" already assigned, ignoring",
workflow_invocation.id,
self.app.config.server_name,
workflow_invocation.handler,
)
def _is_workflow_handler(self):
# If we have explicitly configured handlers, check them.
# Else just make sure we are a job handler.
if self.__handlers_configured:
is_handler = self.is_handler
else:
is_handler = self.app.is_job_handler
return is_handler
def _queue_callback(self, workflow_invocation):
# There isn't an in-memory queue for workflow schedulers, so if MEM_SELF is used just assign with the DB
workflow_invocation.handler = self.app.config.server_name
sa_session = self.app.model.context
sa_session.add(workflow_invocation)
with transaction(sa_session):
sa_session.commit()
def _message_callback(self, workflow_invocation):
return WorkflowSchedulingMessage(task="setup", workflow_invocation_id=workflow_invocation.id)
def _assign_handler(self, workflow_invocation, flush=True):
# Use random-ish integer history_id to produce a consistent index to pick
# job handler with.
random_index = workflow_invocation.history.id
queue_callback = partial(self._queue_callback, workflow_invocation)
message_callback = partial(self._message_callback, workflow_invocation)
if self.app.config.parallelize_workflow_scheduling_within_histories:
random_index = None
return self.__handlers_config.assign_handler(
workflow_invocation,
configured=None,
index=random_index,
queue_callback=queue_callback,
message_callback=message_callback,
flush=flush,
)
[docs] def shutdown(self):
exception = None
for workflow_scheduler in self.workflow_schedulers.values():
try:
workflow_scheduler.shutdown()
except Exception as e:
exception = exception or e
log.exception(EXCEPTION_MESSAGE_SHUTDOWN)
if self.request_monitor:
try:
self.request_monitor.shutdown()
except Exception as e:
exception = exception or e
log.exception("Failed to shutdown workflow request monitor.")
if exception:
raise exception
[docs] def queue(self, workflow_invocation, request_params, flush=True):
workflow_invocation.set_state(model.WorkflowInvocation.states.NEW)
workflow_invocation.scheduler = request_params.get("scheduler", None) or self.default_scheduler_id
sa_session = self.app.model.context
sa_session.add(workflow_invocation)
# Assign handler
try:
self._assign_handler(workflow_invocation, flush=flush)
except HandlerAssignmentError:
raise RuntimeError(f"Unable to set a handler for workflow invocation '{workflow_invocation.id}'")
return workflow_invocation
def __start_schedulers(self):
for workflow_scheduler in self.workflow_schedulers.values():
workflow_scheduler.startup(self.app)
def __plugins_dict(self):
return plugin_config.plugins_dict(galaxy.workflow.schedulers, "plugin_type")
@property
def __stack_has_pool(self):
# TODO: In the future it should be possible to map workflows to handlers based on workflow params. When that
# happens, we'll need to defer pool checks until execution time.
return any(map(self.app.application_stack.has_pool, self.DEFAULT_BASE_HANDLER_POOLS))
def __init_schedulers(self):
config_file = self.app.config.workflow_schedulers_config_file
use_default_scheduler = False
if not config_file or (
not os.path.exists(config_file) and not self.app.config.is_set("workflow_schedulers_config_file")
):
log.info("No workflow schedulers plugin config file defined, using default scheduler.")
use_default_scheduler = True
elif not os.path.exists(config_file):
log.info(f"Cannot find workflow schedulers plugin config file '{config_file}', using default scheduler.")
use_default_scheduler = True
if use_default_scheduler:
self.__init_default_scheduler()
else:
self.DEFAULT_BASE_HANDLER_POOLS = ("workflow-schedulers",)
plugins_element = load(config_file).getroot()
self.__init_schedulers_for_element(plugins_element)
if not self.__handlers_configured and self.__stack_has_pool:
# Stack has a pool for us so override inherited config and use the pool
self.__init_handlers()
self.__handlers_configured = True
elif use_default_scheduler:
self._set_default_handler_assignment_methods()
def __init_default_scheduler(self):
self.default_scheduler_id = DEFAULT_SCHEDULER_ID
self.__init_plugin(DEFAULT_SCHEDULER_PLUGIN_TYPE)
def __init_schedulers_for_element(self, plugins_element):
plugins_kwds = dict(plugins_element.items())
self.default_scheduler_id = plugins_kwds.get("default", DEFAULT_SCHEDULER_ID)
for config_element in plugins_element:
config_element_tag = config_element.tag
if config_element_tag == "handlers":
self.__init_handlers(config_element)
# Determine the default handler(s)
self.default_handler_id = self._get_default(
self.app.config, config_element, list(self.handlers.keys()), required=False
)
else:
plugin_type = config_element_tag
plugin_element = config_element
# Configuring a scheduling plugin...
plugin_kwds = dict(plugin_element.items())
workflow_scheduler_id = plugin_kwds.get("id", None)
self.__init_plugin(plugin_type, workflow_scheduler_id, **plugin_kwds)
if not self.workflow_schedulers:
raise Exception(EXCEPTION_MESSAGE_NO_SCHEDULERS)
if self.default_scheduler_id not in self.workflow_schedulers:
raise Exception(EXCEPTION_MESSAGE_NO_DEFAULT_SCHEDULER % self.default_scheduler_id)
if (
self.app.config.parallelize_workflow_scheduling_within_histories
and not self.deterministic_handler_assignment
):
if self.__handlers_configured:
# There's an explicit configuration, the admin should fix it.
raise Exception(EXCEPTION_MESSAGE_SERIALIZE)
else:
log.warning(EXCEPTION_MESSAGE_SERIALIZE)
def __init_handlers(self, config_element=None):
assert not self.__handlers_configured
handling_config_dict = ConfiguresHandlers.xml_to_dict(self.app.config, config_element)
self._init_handler_assignment_methods(handling_config_dict)
self._init_handlers(handling_config_dict)
if not self.handler_assignment_methods_configured:
self._set_default_handler_assignment_methods()
else:
self.app.application_stack.init_job_handling(self)
log.info("Workflow scheduling handler assignment method(s): %s", ", ".join(self.handler_assignment_methods))
for tag, handlers in [(t, h) for t, h in self.handlers.items() if isinstance(h, list)]:
log.info("Tag [%s] handlers: %s", tag, ", ".join(handlers))
self.__handlers_configured = True
def __init_plugin(self, plugin_type, workflow_scheduler_id=None, **kwds):
workflow_scheduler_id = workflow_scheduler_id or self.default_scheduler_id
if workflow_scheduler_id in self.workflow_schedulers:
raise Exception(EXCEPTION_MESSAGE_DUPLICATE_SCHEDULERS % workflow_scheduler_id)
workflow_scheduler = self.__plugin_classes[plugin_type](**kwds)
self.workflow_schedulers[workflow_scheduler_id] = workflow_scheduler
if isinstance(workflow_scheduler, galaxy.workflow.schedulers.ActiveWorkflowSchedulingPlugin):
self.active_workflow_schedulers[workflow_scheduler_id] = workflow_scheduler
def __start_request_monitor(self):
self.request_monitor = WorkflowRequestMonitor(self.app, self)
self.app.application_stack.register_postfork_function(self.request_monitor.start)
[docs]class WorkflowRequestMonitor(Monitors):
[docs] def __init__(self, app, workflow_scheduling_manager):
self.app = app
self.workflow_scheduling_manager = workflow_scheduling_manager
self._init_monitor_thread(
name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor, config=app.config
)
self.invocation_grabber = None
self_handler_tags = set(self.app.job_config.self_handler_tags)
self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id)
handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method(
self.workflow_scheduling_manager.handler_assignment_methods
)
if handler_assignment_method:
self.invocation_grabber = InvocationGrabber(
app=app,
handler_assignment_method=handler_assignment_method,
max_grab=self.workflow_scheduling_manager.handler_max_grab,
self_handler_tags=self_handler_tags,
handler_tags=self_handler_tags,
)
def __monitor(self):
to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers
while self.monitor_running:
try:
if self.invocation_grabber:
self.invocation_grabber.grab_unhandled_items()
monitor_step_timer = self.app.execution_timer_factory.get_timer(
"internal.galaxy.workflows.scheduling_manager.monitor_step",
"Workflow scheduling manager monitor step complete.",
)
for workflow_scheduler_id, workflow_scheduler in to_monitor.items():
if not self.monitor_running:
return
self.__schedule(workflow_scheduler_id, workflow_scheduler)
log.trace(monitor_step_timer.to_str())
except Exception:
log.exception("An exception occured scheduling while scheduling workflows")
self._monitor_sleep(self.app.config.workflow_monitor_sleep)
def __schedule(self, workflow_scheduler_id, workflow_scheduler):
invocation_ids = self.__active_invocation_ids(workflow_scheduler_id)
for invocation_id in invocation_ids:
log.debug("Attempting to schedule workflow invocation [%s]", invocation_id)
self.__attempt_schedule(invocation_id, workflow_scheduler)
if not self.monitor_running:
return
def __attempt_schedule(self, invocation_id, workflow_scheduler):
with self.app.model.context() as session:
workflow_invocation = session.get(model.WorkflowInvocation, invocation_id)
try:
if workflow_invocation.state == workflow_invocation.states.CANCELLING:
workflow_invocation.cancel_invocation_steps()
workflow_invocation.mark_cancelled()
session.commit()
return False
if not workflow_invocation or not workflow_invocation.active:
return False
# This ensures we're only ever working on the 'first' active
# workflow invocation in a given history, to force sequential
# activation.
if self.app.config.history_local_serial_workflow_scheduling:
for i in workflow_invocation.history.workflow_invocations:
if i.active and i.id < workflow_invocation.id:
return False
workflow_scheduler.schedule(workflow_invocation)
log.debug("Workflow invocation [%s] scheduled", workflow_invocation.id)
except Exception:
# TODO: eventually fail this - or fail it right away?
log.exception("Exception raised while attempting to schedule workflow request.")
return False
# A workflow was obtained and scheduled...
return True
def __active_invocation_ids(self, scheduler_id):
handler = self.app.config.server_name
return model.WorkflowInvocation.poll_active_workflow_ids(
self.app.model.engine,
scheduler=scheduler_id,
handler=handler,
)
[docs] def start(self):
self.monitor_thread.start()
[docs] def shutdown(self):
self.shutdown_monitor()