Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.services.workflows
import logging
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
from galaxy import (
exceptions,
web,
)
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.workflows import (
RefactorResponse,
WorkflowContentsManager,
WorkflowSerializer,
WorkflowsManager,
)
from galaxy.model import StoredWorkflow
from galaxy.schema.invocation import WorkflowInvocationResponse
from galaxy.schema.schema import (
InvocationsStateCounts,
WorkflowIndexQueryPayload,
)
from galaxy.schema.workflows import (
InvokeWorkflowPayload,
StoredWorkflowDetailed,
)
from galaxy.util.tool_shed.tool_shed_registry import Registry
from galaxy.webapps.galaxy.services.base import ServiceBase
from galaxy.webapps.galaxy.services.notifications import NotificationService
from galaxy.webapps.galaxy.services.sharable import ShareableService
from galaxy.workflow.run import queue_invoke
from galaxy.workflow.run_request import build_workflow_run_configs
log = logging.getLogger(__name__)
[docs]class WorkflowsService(ServiceBase):
[docs] def __init__(
self,
workflows_manager: WorkflowsManager,
workflow_contents_manager: WorkflowContentsManager,
serializer: WorkflowSerializer,
tool_shed_registry: Registry,
notification_service: NotificationService,
):
self._workflows_manager = workflows_manager
self._workflow_contents_manager = workflow_contents_manager
self._serializer = serializer
self.shareable_service = ShareableService(workflows_manager, serializer, notification_service)
self._tool_shed_registry = tool_shed_registry
[docs] def index(
self,
trans: ProvidesUserContext,
payload: WorkflowIndexPayload,
include_total_count: bool = False,
) -> Tuple[List[Dict[str, Any]], Optional[int]]:
user = trans.user
missing_tools = payload.missing_tools
query, total_matches = self._workflows_manager.index_query(trans, payload, include_total_count)
rval = []
for wf in query.all():
item = wf.to_dict(
value_mapper={"id": trans.security.encode_id, "latest_workflow_id": trans.security.encode_id}
)
encoded_id = trans.security.encode_id(wf.id)
item["annotations"] = [x.annotation for x in wf.annotations]
item["url"] = web.url_for("workflow", id=encoded_id)
item["owner"] = wf.user.username
item["source_metadata"] = wf.latest_workflow.source_metadata
if not payload.skip_step_counts:
item["number_of_steps"] = wf.latest_workflow.step_count
item["show_in_tool_panel"] = False
if user is not None:
item["show_in_tool_panel"] = wf.show_in_tool_panel(user_id=user.id)
rval.append(item)
if missing_tools:
workflows_missing_tools = []
workflows = []
workflows_by_toolshed = {}
for value in rval:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, value["id"], by_stored_id=True)
tools = self._workflow_contents_manager.get_all_tools(stored_workflow.latest_workflow)
missing_tool_ids = [
tool["tool_id"] for tool in tools if trans.app.toolbox.is_missing_shed_tool(tool["tool_id"])
]
if len(missing_tool_ids) > 0:
value["missing_tools"] = missing_tool_ids
workflows_missing_tools.append(value)
for workflow in workflows_missing_tools:
for tool_id in workflow["missing_tools"]:
toolshed, _, owner, name, tool, version = tool_id.split("/")
shed_url = self.__get_full_shed_url(toolshed)
repo_identifier = "/".join((toolshed, owner, name))
if repo_identifier not in workflows_by_toolshed:
workflows_by_toolshed[repo_identifier] = dict(
shed=shed_url.rstrip("/"),
repository=name,
owner=owner,
tools=[tool_id],
workflows=[workflow["name"]],
)
else:
if tool_id not in workflows_by_toolshed[repo_identifier]["tools"]:
workflows_by_toolshed[repo_identifier]["tools"].append(tool_id)
if workflow["name"] not in workflows_by_toolshed[repo_identifier]["workflows"]:
workflows_by_toolshed[repo_identifier]["workflows"].append(workflow["name"])
for repo_tag in workflows_by_toolshed:
workflows.append(workflows_by_toolshed[repo_tag])
return workflows, total_matches
return rval, total_matches
[docs] def invoke_workflow(
self,
trans,
workflow_id,
payload: InvokeWorkflowPayload,
) -> Union[WorkflowInvocationResponse, List[WorkflowInvocationResponse]]:
if trans.anonymous:
raise exceptions.AuthenticationRequired("You need to be logged in to run workflows.")
trans.check_user_activation()
# Get workflow + accessibility check.
by_stored_id = not payload.instance
stored_workflow = self._workflows_manager.get_stored_accessible_workflow(trans, workflow_id, by_stored_id)
version = payload.version
if version is None and payload.instance:
workflow = stored_workflow.get_internal_version_by_id(workflow_id)
else:
workflow = stored_workflow.get_internal_version(version)
run_configs = build_workflow_run_configs(trans, workflow, payload.model_dump(exclude_unset=True))
is_batch = payload.batch
if not is_batch and len(run_configs) != 1:
raise exceptions.RequestParameterInvalidException("Must specify 'batch' to use batch parameters.")
require_exact_tool_versions = payload.require_exact_tool_versions
tools = self._workflow_contents_manager.get_all_tools(workflow)
missing_tools = [
tool
for tool in tools
if not trans.app.toolbox.has_tool(
tool["tool_id"], tool_version=tool["tool_version"], exact=require_exact_tool_versions
)
]
if missing_tools:
missing_tools_message = "Workflow was not invoked; the following required tools are not installed: "
if require_exact_tool_versions:
missing_tools_message += ", ".join(
[f"{tool['tool_id']} (version {tool['tool_version']})" for tool in missing_tools]
)
else:
missing_tools_message += ", ".join([tool["tool_id"] for tool in missing_tools])
raise exceptions.MessageException(missing_tools_message)
invocations = []
for run_config in run_configs:
workflow_scheduler_id = payload.scheduler
# TODO: workflow scheduler hints
work_request_params = dict(scheduler=workflow_scheduler_id)
workflow_invocation = queue_invoke(
trans=trans,
workflow=workflow,
workflow_run_config=run_config,
request_params=work_request_params,
flush=False,
)
invocations.append(workflow_invocation)
trans.sa_session.commit()
encoded_invocations = [WorkflowInvocationResponse(**invocation.to_dict()) for invocation in invocations]
if is_batch:
return encoded_invocations
else:
return encoded_invocations[0]
[docs] def delete(self, trans, workflow_id):
workflow_to_delete = self._workflows_manager.get_stored_workflow(trans, workflow_id)
self._workflows_manager.check_security(trans, workflow_to_delete)
self._workflows_manager.delete(workflow_to_delete)
[docs] def undelete(self, trans, workflow_id):
workflow_to_undelete = self._workflows_manager.get_stored_workflow(trans, workflow_id)
self._workflows_manager.check_security(trans, workflow_to_undelete)
self._workflows_manager.undelete(workflow_to_undelete)
[docs] def get_versions(self, trans, workflow_id, instance: bool):
stored_workflow: StoredWorkflow = self._workflows_manager.get_stored_accessible_workflow(
trans, workflow_id, by_stored_id=not instance
)
return [
{"version": i, "update_time": w.update_time.isoformat(), "steps": len(w.steps)}
for i, w in enumerate(reversed(stored_workflow.workflows))
]
[docs] def invocation_counts(self, trans, workflow_id, instance: bool) -> InvocationsStateCounts:
stored_workflow: StoredWorkflow = self._workflows_manager.get_stored_accessible_workflow(
trans, workflow_id, by_stored_id=not instance
)
return stored_workflow.invocation_counts()
[docs] def refactor(
self,
trans,
workflow_id,
payload,
instance: bool,
) -> RefactorResponse:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance)
return self._workflow_contents_manager.refactor(trans, stored_workflow, payload)
[docs] def show_workflow(self, trans, workflow_id, instance, legacy, version) -> StoredWorkflowDetailed:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance)
if stored_workflow.importable is False and stored_workflow.user != trans.user and not trans.user_is_admin:
wf_count = 0 if not trans.user else trans.user.count_stored_workflow_user_assocs(stored_workflow)
if wf_count == 0:
message = "Workflow is neither importable, nor owned by or shared with current user"
raise exceptions.ItemAccessibilityException(message)
if legacy:
style = "legacy"
else:
style = "instance"
if version is None and instance:
# A Workflow instance may not be the latest workflow version attached to StoredWorkflow.
# This figures out the correct version so that we return the correct Workflow and version.
for i, workflow in enumerate(reversed(stored_workflow.workflows)):
if workflow.id == workflow_id:
version = i
break
detailed_workflow = StoredWorkflowDetailed(
**self._workflow_contents_manager.workflow_to_dict(trans, stored_workflow, style=style, version=version)
)
return detailed_workflow
def _get_workflows_list(
self,
trans: ProvidesUserContext,
payload,
):
workflows, _ = self.index(trans, payload)
return workflows
def __get_full_shed_url(self, url):
for shed_url in self._tool_shed_registry.tool_sheds.values():
if url in shed_url:
return shed_url
return None