Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.services.workflows

import logging
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
    Union,
)

from galaxy import (
    exceptions,
    web,
)
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.workflows import (
    RefactorResponse,
    WorkflowContentsManager,
    WorkflowSerializer,
    WorkflowsManager,
)
from galaxy.model import StoredWorkflow
from galaxy.schema.invocation import WorkflowInvocationResponse
from galaxy.schema.schema import (
    InvocationsStateCounts,
    WorkflowIndexQueryPayload,
)
from galaxy.schema.workflows import (
    InvokeWorkflowPayload,
    StoredWorkflowDetailed,
)
from galaxy.util.tool_shed.tool_shed_registry import Registry
from galaxy.webapps.galaxy.services.base import ServiceBase
from galaxy.webapps.galaxy.services.notifications import NotificationService
from galaxy.webapps.galaxy.services.sharable import ShareableService
from galaxy.workflow.run import queue_invoke
from galaxy.workflow.run_request import build_workflow_run_configs

log = logging.getLogger(__name__)


[docs]class WorkflowIndexPayload(WorkflowIndexQueryPayload): missing_tools: bool = False
[docs]class WorkflowsService(ServiceBase):
[docs] def __init__( self, workflows_manager: WorkflowsManager, workflow_contents_manager: WorkflowContentsManager, serializer: WorkflowSerializer, tool_shed_registry: Registry, notification_service: NotificationService, ): self._workflows_manager = workflows_manager self._workflow_contents_manager = workflow_contents_manager self._serializer = serializer self.shareable_service = ShareableService(workflows_manager, serializer, notification_service) self._tool_shed_registry = tool_shed_registry
[docs] def index( self, trans: ProvidesUserContext, payload: WorkflowIndexPayload, include_total_count: bool = False, ) -> Tuple[List[Dict[str, Any]], Optional[int]]: user = trans.user missing_tools = payload.missing_tools query, total_matches = self._workflows_manager.index_query(trans, payload, include_total_count) rval = [] for wf in query.all(): item = wf.to_dict( value_mapper={"id": trans.security.encode_id, "latest_workflow_id": trans.security.encode_id} ) encoded_id = trans.security.encode_id(wf.id) item["annotations"] = [x.annotation for x in wf.annotations] item["url"] = web.url_for("workflow", id=encoded_id) item["owner"] = wf.user.username item["source_metadata"] = wf.latest_workflow.source_metadata if not payload.skip_step_counts: item["number_of_steps"] = wf.latest_workflow.step_count item["show_in_tool_panel"] = False if user is not None: item["show_in_tool_panel"] = wf.show_in_tool_panel(user_id=user.id) rval.append(item) if missing_tools: workflows_missing_tools = [] workflows = [] workflows_by_toolshed = {} for value in rval: stored_workflow = self._workflows_manager.get_stored_workflow(trans, value["id"], by_stored_id=True) tools = self._workflow_contents_manager.get_all_tools(stored_workflow.latest_workflow) missing_tool_ids = [ tool["tool_id"] for tool in tools if trans.app.toolbox.is_missing_shed_tool(tool["tool_id"]) ] if len(missing_tool_ids) > 0: value["missing_tools"] = missing_tool_ids workflows_missing_tools.append(value) for workflow in workflows_missing_tools: for tool_id in workflow["missing_tools"]: toolshed, _, owner, name, tool, version = tool_id.split("/") shed_url = self.__get_full_shed_url(toolshed) repo_identifier = "/".join((toolshed, owner, name)) if repo_identifier not in workflows_by_toolshed: workflows_by_toolshed[repo_identifier] = dict( shed=shed_url.rstrip("/"), repository=name, owner=owner, tools=[tool_id], workflows=[workflow["name"]], ) else: if tool_id not in workflows_by_toolshed[repo_identifier]["tools"]: workflows_by_toolshed[repo_identifier]["tools"].append(tool_id) if workflow["name"] not in workflows_by_toolshed[repo_identifier]["workflows"]: workflows_by_toolshed[repo_identifier]["workflows"].append(workflow["name"]) for repo_tag in workflows_by_toolshed: workflows.append(workflows_by_toolshed[repo_tag]) return workflows, total_matches return rval, total_matches
[docs] def invoke_workflow( self, trans, workflow_id, payload: InvokeWorkflowPayload, ) -> Union[WorkflowInvocationResponse, List[WorkflowInvocationResponse]]: if trans.anonymous: raise exceptions.AuthenticationRequired("You need to be logged in to run workflows.") trans.check_user_activation() # Get workflow + accessibility check. by_stored_id = not payload.instance stored_workflow = self._workflows_manager.get_stored_accessible_workflow(trans, workflow_id, by_stored_id) version = payload.version if version is None and payload.instance: workflow = stored_workflow.get_internal_version_by_id(workflow_id) else: workflow = stored_workflow.get_internal_version(version) run_configs = build_workflow_run_configs(trans, workflow, payload.model_dump(exclude_unset=True)) is_batch = payload.batch if not is_batch and len(run_configs) != 1: raise exceptions.RequestParameterInvalidException("Must specify 'batch' to use batch parameters.") require_exact_tool_versions = payload.require_exact_tool_versions tools = self._workflow_contents_manager.get_all_tools(workflow) missing_tools = [ tool for tool in tools if not trans.app.toolbox.has_tool( tool["tool_id"], tool_version=tool["tool_version"], exact=require_exact_tool_versions ) ] if missing_tools: missing_tools_message = "Workflow was not invoked; the following required tools are not installed: " if require_exact_tool_versions: missing_tools_message += ", ".join( [f"{tool['tool_id']} (version {tool['tool_version']})" for tool in missing_tools] ) else: missing_tools_message += ", ".join([tool["tool_id"] for tool in missing_tools]) raise exceptions.MessageException(missing_tools_message) invocations = [] for run_config in run_configs: workflow_scheduler_id = payload.scheduler # TODO: workflow scheduler hints work_request_params = dict(scheduler=workflow_scheduler_id) workflow_invocation = queue_invoke( trans=trans, workflow=workflow, workflow_run_config=run_config, request_params=work_request_params, flush=False, ) invocations.append(workflow_invocation) trans.sa_session.commit() encoded_invocations = [WorkflowInvocationResponse(**invocation.to_dict()) for invocation in invocations] if is_batch: return encoded_invocations else: return encoded_invocations[0]
[docs] def delete(self, trans, workflow_id): workflow_to_delete = self._workflows_manager.get_stored_workflow(trans, workflow_id) self._workflows_manager.check_security(trans, workflow_to_delete) self._workflows_manager.delete(workflow_to_delete)
[docs] def undelete(self, trans, workflow_id): workflow_to_undelete = self._workflows_manager.get_stored_workflow(trans, workflow_id) self._workflows_manager.check_security(trans, workflow_to_undelete) self._workflows_manager.undelete(workflow_to_undelete)
[docs] def get_versions(self, trans, workflow_id, instance: bool): stored_workflow: StoredWorkflow = self._workflows_manager.get_stored_accessible_workflow( trans, workflow_id, by_stored_id=not instance ) return [ {"version": i, "update_time": w.update_time.isoformat(), "steps": len(w.steps)} for i, w in enumerate(reversed(stored_workflow.workflows)) ]
[docs] def invocation_counts(self, trans, workflow_id, instance: bool) -> InvocationsStateCounts: stored_workflow: StoredWorkflow = self._workflows_manager.get_stored_accessible_workflow( trans, workflow_id, by_stored_id=not instance ) return stored_workflow.invocation_counts()
[docs] def get_workflow_menu(self, trans, payload): ids_in_menu = [x.stored_workflow_id for x in trans.user.stored_workflow_menu_entries] workflows = self._get_workflows_list( trans, payload, ) return {"ids_in_menu": ids_in_menu, "workflows": workflows}
[docs] def refactor( self, trans, workflow_id, payload, instance: bool, ) -> RefactorResponse: stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance) return self._workflow_contents_manager.refactor(trans, stored_workflow, payload)
[docs] def show_workflow(self, trans, workflow_id, instance, legacy, version) -> StoredWorkflowDetailed: stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance) if stored_workflow.importable is False and stored_workflow.user != trans.user and not trans.user_is_admin: wf_count = 0 if not trans.user else trans.user.count_stored_workflow_user_assocs(stored_workflow) if wf_count == 0: message = "Workflow is neither importable, nor owned by or shared with current user" raise exceptions.ItemAccessibilityException(message) if legacy: style = "legacy" else: style = "instance" if version is None and instance: # A Workflow instance may not be the latest workflow version attached to StoredWorkflow. # This figures out the correct version so that we return the correct Workflow and version. for i, workflow in enumerate(reversed(stored_workflow.workflows)): if workflow.id == workflow_id: version = i break detailed_workflow = StoredWorkflowDetailed( **self._workflow_contents_manager.workflow_to_dict(trans, stored_workflow, style=style, version=version) ) return detailed_workflow
def _get_workflows_list( self, trans: ProvidesUserContext, payload, ): workflows, _ = self.index(trans, payload) return workflows def __get_full_shed_url(self, url): for shed_url in self._tool_shed_registry.tool_sheds.values(): if url in shed_url: return shed_url return None