""" The class defines the stock Galaxy workflow scheduling plugin - currently
it simply schedules the whole workflow up front when offered.
"""
import logging
from typing import TYPE_CHECKING
from galaxy.work import context
from galaxy.workflow import (
run,
run_request,
)
from . import ActiveWorkflowSchedulingPlugin
if TYPE_CHECKING:
from galaxy.model import WorkflowInvocation
log = logging.getLogger(__name__)
[docs]class CoreWorkflowSchedulingPlugin(ActiveWorkflowSchedulingPlugin):
plugin_type = "core"
[docs] def __init__(self, **kwds):
pass
[docs] def startup(self, app):
self.app = app
[docs] def shutdown(self):
pass
[docs] def schedule(self, workflow_invocation: "WorkflowInvocation") -> None:
workflow = workflow_invocation.workflow
history = workflow_invocation.history
request_context = context.WorkRequestContext(
app=self.app, history=history, user=history.user
) # trans-like object not tied to a web-thread.
workflow_run_config = run_request.workflow_request_to_run_config(workflow_invocation)
run.schedule(
trans=request_context,
workflow=workflow,
workflow_run_config=workflow_run_config,
workflow_invocation=workflow_invocation,
)
__all__ = ("CoreWorkflowSchedulingPlugin",)