Source code for galaxy.workflow.schedulers.core

""" The class defines the stock Galaxy workflow scheduling plugin - currently
it simply schedules the whole workflow up front when offered.

import logging
from typing import TYPE_CHECKING

from import context
from galaxy.workflow import (
from . import ActiveWorkflowSchedulingPlugin

    from galaxy.model import WorkflowInvocation

log = logging.getLogger(__name__)

[docs]class CoreWorkflowSchedulingPlugin(ActiveWorkflowSchedulingPlugin): plugin_type = "core"
[docs] def __init__(self, **kwds): pass
[docs] def startup(self, app): = app
[docs] def shutdown(self): pass
[docs] def schedule(self, workflow_invocation: "WorkflowInvocation") -> None: workflow = workflow_invocation.workflow history = workflow_invocation.history request_context = context.WorkRequestContext(, history=history, user=history.user ) # trans-like object not tied to a web-thread. workflow_run_config = run_request.workflow_request_to_run_config(workflow_invocation) run.schedule( trans=request_context, workflow=workflow, workflow_run_config=workflow_run_config, workflow_invocation=workflow_invocation, )
__all__ = ("CoreWorkflowSchedulingPlugin",)