Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.services.workflows
import logging
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
from galaxy import web
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.notification import NotificationManager
from galaxy.managers.workflows import (
WorkflowContentsManager,
WorkflowSerializer,
WorkflowsManager,
)
from galaxy.schema.schema import WorkflowIndexQueryPayload
from galaxy.util.tool_shed.tool_shed_registry import Registry
from galaxy.webapps.galaxy.services.base import ServiceBase
from galaxy.webapps.galaxy.services.sharable import ShareableService
log = logging.getLogger(__name__)
[docs]class WorkflowsService(ServiceBase):
[docs] def __init__(
self,
workflows_manager: WorkflowsManager,
workflow_contents_manager: WorkflowContentsManager,
serializer: WorkflowSerializer,
tool_shed_registry: Registry,
notification_manager: NotificationManager,
):
self._workflows_manager = workflows_manager
self._workflow_contents_manager = workflow_contents_manager
self._serializer = serializer
self.shareable_service = ShareableService(workflows_manager, serializer, notification_manager)
self._tool_shed_registry = tool_shed_registry
[docs] def index(
self,
trans: ProvidesUserContext,
payload: WorkflowIndexPayload,
include_total_count: bool = False,
) -> Tuple[List[Dict[str, Any]], Optional[int]]:
user = trans.user
missing_tools = payload.missing_tools
query, total_matches = self._workflows_manager.index_query(trans, payload, include_total_count)
rval = []
for wf in query.all():
item = wf.to_dict(value_mapper={"id": trans.security.encode_id})
encoded_id = trans.security.encode_id(wf.id)
item["annotations"] = [x.annotation for x in wf.annotations]
item["url"] = web.url_for("workflow", id=encoded_id)
item["owner"] = wf.user.username
item["source_metadata"] = wf.latest_workflow.source_metadata
if not payload.skip_step_counts:
item["number_of_steps"] = wf.latest_workflow.step_count
item["show_in_tool_panel"] = False
if user is not None:
item["show_in_tool_panel"] = wf.show_in_tool_panel(user_id=user.id)
rval.append(item)
if missing_tools:
workflows_missing_tools = []
workflows = []
workflows_by_toolshed = dict()
for value in rval:
stored_workflow = self._workflows_manager.get_stored_workflow(trans, value["id"], by_stored_id=True)
tools = self._workflow_contents_manager.get_all_tools(stored_workflow.latest_workflow)
missing_tool_ids = [
tool["tool_id"] for tool in tools if trans.app.toolbox.is_missing_shed_tool(tool["tool_id"])
]
if len(missing_tool_ids) > 0:
value["missing_tools"] = missing_tool_ids
workflows_missing_tools.append(value)
for workflow in workflows_missing_tools:
for tool_id in workflow["missing_tools"]:
toolshed, _, owner, name, tool, version = tool_id.split("/")
shed_url = self.__get_full_shed_url(toolshed)
repo_identifier = "/".join((toolshed, owner, name))
if repo_identifier not in workflows_by_toolshed:
workflows_by_toolshed[repo_identifier] = dict(
shed=shed_url.rstrip("/"),
repository=name,
owner=owner,
tools=[tool_id],
workflows=[workflow["name"]],
)
else:
if tool_id not in workflows_by_toolshed[repo_identifier]["tools"]:
workflows_by_toolshed[repo_identifier]["tools"].append(tool_id)
if workflow["name"] not in workflows_by_toolshed[repo_identifier]["workflows"]:
workflows_by_toolshed[repo_identifier]["workflows"].append(workflow["name"])
for repo_tag in workflows_by_toolshed:
workflows.append(workflows_by_toolshed[repo_tag])
return workflows, total_matches
return rval, total_matches
[docs] def delete(self, trans, workflow_id):
workflow_to_delete = self._workflows_manager.get_stored_workflow(trans, workflow_id)
self._workflows_manager.check_security(trans, workflow_to_delete)
self._workflows_manager.delete(workflow_to_delete)
[docs] def undelete(self, trans, workflow_id):
workflow_to_undelete = self._workflows_manager.get_stored_workflow(trans, workflow_id)
self._workflows_manager.check_security(trans, workflow_to_undelete)
self._workflows_manager.undelete(workflow_to_undelete)
[docs] def get_versions(self, trans, workflow_id, instance):
stored_workflow = self._workflows_manager.get_stored_accessible_workflow(
trans, workflow_id, by_stored_id=not instance
)
return [
{"version": i, "update_time": w.update_time.isoformat(), "steps": len(w.steps)}
for i, w in enumerate(reversed(stored_workflow.workflows))
]
def _get_workflows_list(
self,
trans: ProvidesUserContext,
payload,
):
workflows, _ = self.index(trans, payload)
return workflows
def __get_full_shed_url(self, url):
for shed_url in self._tool_shed_registry.tool_sheds.values():
if url in shed_url:
return shed_url
return None