Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.schedulers.core
""" The class defines the stock Galaxy workflow scheduling plugin - currently
it simply schedules the whole workflow up front when offered.
"""
import logging
from typing import TYPE_CHECKING
from galaxy.work import context
from galaxy.workflow import (
run,
run_request,
)
from . import ActiveWorkflowSchedulingPlugin
if TYPE_CHECKING:
from galaxy.model import WorkflowInvocation
log = logging.getLogger(__name__)
[docs]class CoreWorkflowSchedulingPlugin(ActiveWorkflowSchedulingPlugin):
plugin_type = "core"
[docs] def schedule(self, workflow_invocation: "WorkflowInvocation") -> None:
workflow = workflow_invocation.workflow
history = workflow_invocation.history
request_context = context.WorkRequestContext(
app=self.app, history=history, user=history.user
) # trans-like object not tied to a web-thread.
workflow_run_config = run_request.workflow_request_to_run_config(workflow_invocation)
run.schedule(
trans=request_context,
workflow=workflow,
workflow_run_config=workflow_run_config,
workflow_invocation=workflow_invocation,
)
__all__ = ("CoreWorkflowSchedulingPlugin",)