Source code for galaxy.workflow.schedulers.core

""" The class defines the stock Galaxy workflow scheduling plugin - currently
it simply schedules the whole workflow up front when offered.
"""

import logging
from typing import TYPE_CHECKING

from galaxy.work import context
from galaxy.workflow import (
    run,
    run_request,
)
from . import ActiveWorkflowSchedulingPlugin

if TYPE_CHECKING:
    from galaxy.model import WorkflowInvocation


log = logging.getLogger(__name__)


[docs]class CoreWorkflowSchedulingPlugin(ActiveWorkflowSchedulingPlugin): plugin_type = "core"
[docs] def __init__(self, **kwds): pass
[docs] def startup(self, app): self.app = app
[docs] def shutdown(self): pass
[docs] def schedule(self, workflow_invocation: "WorkflowInvocation") -> None: workflow = workflow_invocation.workflow history = workflow_invocation.history request_context = context.WorkRequestContext( app=self.app, history=history, user=history.user ) # trans-like object not tied to a web-thread. workflow_run_config = run_request.workflow_request_to_run_config(workflow_invocation) run.schedule( trans=request_context, workflow=workflow, workflow_run_config=workflow_run_config, workflow_invocation=workflow_invocation, )
__all__ = ("CoreWorkflowSchedulingPlugin",)