import logging
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
from galaxy import (
exceptions,
web,
)
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.workflows import (
RefactorResponse,
WorkflowContentsManager,
WorkflowSerializer,
WorkflowsManager,
)
from galaxy.model import StoredWorkflow
from galaxy.schema.invocation import WorkflowInvocationResponse
from galaxy.schema.schema import (
InvocationsStateCounts,
WorkflowIndexQueryPayload,
)
from galaxy.schema.workflows import (
InvokeWorkflowPayload,
StoredWorkflowDetailed,
)
from galaxy.util.tool_shed.tool_shed_registry import Registry
from galaxy.webapps.galaxy.services.base import ServiceBase
from galaxy.webapps.galaxy.services.notifications import NotificationService
from galaxy.webapps.galaxy.services.sharable import ShareableService
from galaxy.workflow.run import queue_invoke
from galaxy.workflow.run_request import build_workflow_run_configs
log = logging.getLogger(__name__)
[docs]
class WorkflowIndexPayload(WorkflowIndexQueryPayload):
missing_tools: bool = False
[docs]
class WorkflowsService(ServiceBase):
[docs]
def __init__(
self,
workflows_manager: WorkflowsManager,
workflow_contents_manager: WorkflowContentsManager,
serializer: WorkflowSerializer,
tool_shed_registry: Registry,
notification_service: NotificationService,
):
self._workflows_manager = workflows_manager
self._workflow_contents_manager = workflow_contents_manager
self._serializer = serializer
self.shareable_service = ShareableService(workflows_manager, serializer, notification_service)
self._tool_shed_registry = tool_shed_registry
[docs]
def index(
self,
trans: ProvidesUserContext,
payload: WorkflowIndexPayload,
include_total_count: bool = False,
) -> Tuple[List[Dict[str, Any]], Optional[int]]:
user = trans.user
missing_tools = payload.missing_tools
query, total_matches = self._workflows_manager.index_query(trans, payload, include_total_count)
rval = []
for wf in query.all():
item = wf.to_dict(
value_mapper={"id": trans.security.encode_id, "latest_workflow_id": trans.security.encode_id}
)
encoded_id = trans.security.encode_id(wf.id)
item["annotations"] = [x.annotation for x in wf.annotations]
item["url"] = web.url_for("workflow", id=encoded_id)
item["owner"] = wf.user.username
item["source_metadata"] = wf.latest_workflow.source_metadata
if not payload.skip_step_counts:
item["number_of_steps"] = wf.latest_workflow.step_count
item["show_in_tool_panel"] = False
if user is not None:
item["show_in_tool_panel"] = wf.show_in_tool_panel(user_id=user.id)
rval.append(item)
if missing_tools:
workflows_missing_tools = []
workflows = []
workflows_by_toolshed = {}
for value in rval:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, value["id"], by_stored_id=True)
tools = self._workflow_contents_manager.get_all_tools(stored_workflow.latest_workflow)
missing_tool_ids = [
tool["tool_id"] for tool in tools if trans.app.toolbox.is_missing_shed_tool(tool["tool_id"])
]
if len(missing_tool_ids) > 0:
value["missing_tools"] = missing_tool_ids
workflows_missing_tools.append(value)
for workflow in workflows_missing_tools:
for tool_id in workflow["missing_tools"]:
toolshed, _, owner, name, tool, version = tool_id.split("/")
shed_url = self.__get_full_shed_url(toolshed)
repo_identifier = "/".join((toolshed, owner, name))
if repo_identifier not in workflows_by_toolshed:
workflows_by_toolshed[repo_identifier] = dict(
shed=shed_url.rstrip("/"),
repository=name,
owner=owner,
tools=[tool_id],
workflows=[workflow["name"]],
)
else:
if tool_id not in workflows_by_toolshed[repo_identifier]["tools"]:
workflows_by_toolshed[repo_identifier]["tools"].append(tool_id)
if workflow["name"] not in workflows_by_toolshed[repo_identifier]["workflows"]:
workflows_by_toolshed[repo_identifier]["workflows"].append(workflow["name"])
for repo_tag in workflows_by_toolshed:
workflows.append(workflows_by_toolshed[repo_tag])
return workflows, total_matches
return rval, total_matches
[docs]
def invoke_workflow(
self,
trans,
workflow_id,
payload: InvokeWorkflowPayload,
) -> Union[WorkflowInvocationResponse, List[WorkflowInvocationResponse]]:
if trans.anonymous:
raise exceptions.AuthenticationRequired("You need to be logged in to run workflows.")
trans.check_user_activation()
# Get workflow + accessibility check.
by_stored_id = not payload.instance
stored_workflow = self._workflows_manager.get_stored_accessible_workflow(trans, workflow_id, by_stored_id)
version = payload.version
if version is None and payload.instance:
workflow = stored_workflow.get_internal_version_by_id(workflow_id)
else:
workflow = stored_workflow.get_internal_version(version)
run_configs = build_workflow_run_configs(trans, workflow, payload.model_dump(exclude_unset=True))
is_batch = payload.batch
if not is_batch and len(run_configs) != 1:
raise exceptions.RequestParameterInvalidException("Must specify 'batch' to use batch parameters.")
require_exact_tool_versions = payload.require_exact_tool_versions
tools = self._workflow_contents_manager.get_all_tools(workflow)
missing_tools = [
tool
for tool in tools
if not trans.app.toolbox.has_tool(
tool["tool_id"],
tool_version=tool["tool_version"],
tool_uuid=tool["tool_uuid"],
exact=require_exact_tool_versions,
user=trans.user,
)
]
if missing_tools:
missing_tools_message = "Workflow was not invoked; the following required tools are not installed: "
if require_exact_tool_versions:
missing_tools_message += ", ".join(
[f"{tool['tool_id']} (version {tool['tool_version']})" for tool in missing_tools]
)
else:
missing_tools_message += ", ".join([tool["tool_id"] for tool in missing_tools])
raise exceptions.MessageException(missing_tools_message)
invocations = []
for run_config in run_configs:
workflow_scheduler_id = payload.scheduler
# TODO: workflow scheduler hints
work_request_params = dict(scheduler=workflow_scheduler_id)
workflow_invocation = queue_invoke(
trans=trans,
workflow=workflow,
workflow_run_config=run_config,
request_params=work_request_params,
flush=False,
)
invocations.append(workflow_invocation)
trans.sa_session.commit()
encoded_invocations = [WorkflowInvocationResponse(**invocation.to_dict()) for invocation in invocations]
if is_batch:
return encoded_invocations
else:
return encoded_invocations[0]
[docs]
def delete(self, trans, workflow_id):
workflow_to_delete = self._workflows_manager.get_stored_workflow(trans, workflow_id)
self._workflows_manager.check_security(trans, workflow_to_delete)
self._workflows_manager.delete(workflow_to_delete)
[docs]
def undelete(self, trans, workflow_id):
workflow_to_undelete = self._workflows_manager.get_stored_workflow(trans, workflow_id)
self._workflows_manager.check_security(trans, workflow_to_undelete)
self._workflows_manager.undelete(workflow_to_undelete)
[docs]
def get_versions(self, trans, workflow_id, instance: bool):
stored_workflow: StoredWorkflow = self._workflows_manager.get_stored_accessible_workflow(
trans, workflow_id, by_stored_id=not instance
)
return [
{"version": i, "update_time": w.update_time.isoformat(), "steps": len(w.steps)}
for i, w in enumerate(reversed(stored_workflow.workflows))
]
[docs]
def invocation_counts(self, trans, workflow_id, instance: bool) -> InvocationsStateCounts:
stored_workflow: StoredWorkflow = self._workflows_manager.get_stored_accessible_workflow(
trans, workflow_id, by_stored_id=not instance
)
return stored_workflow.invocation_counts()
[docs]
def refactor(
self,
trans,
workflow_id,
payload,
instance: bool,
) -> RefactorResponse:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance)
return self._workflow_contents_manager.refactor(trans, stored_workflow, payload)
[docs]
def show_workflow(self, trans, workflow_id, instance, legacy, version) -> StoredWorkflowDetailed:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, workflow_id, by_stored_id=not instance)
if stored_workflow.importable is False and stored_workflow.user != trans.user and not trans.user_is_admin:
wf_count = 0 if not trans.user else trans.user.count_stored_workflow_user_assocs(stored_workflow)
if wf_count == 0:
message = "Workflow is neither importable, nor owned by or shared with current user"
raise exceptions.ItemAccessibilityException(message)
if legacy:
style = "legacy"
else:
style = "instance"
if version is None and instance:
# A Workflow instance may not be the latest workflow version attached to StoredWorkflow.
# This figures out the correct version so that we return the correct Workflow and version.
for i, workflow in enumerate(reversed(stored_workflow.workflows)):
if workflow.id == workflow_id:
version = i
break
detailed_workflow = StoredWorkflowDetailed(
**self._workflow_contents_manager.workflow_to_dict(trans, stored_workflow, style=style, version=version)
)
return detailed_workflow
def _get_workflows_list(
self,
trans: ProvidesUserContext,
payload,
):
workflows, _ = self.index(trans, payload)
return workflows
def __get_full_shed_url(self, url):
for shed_url in self._tool_shed_registry.tool_sheds.values():
if url in shed_url:
return shed_url
return None