Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for tool_shed.util.workflow_util
""" Tool shed helper methods for dealing with workflows - only two methods are
utilized outside of this modules - generate_workflow_image and import_workflow.
"""
import json
import logging
import os
import galaxy.tools
import galaxy.tools.parameters
from galaxy.tools.repositories import ValidationContext
from galaxy.util.sanitize_html import sanitize_html
from galaxy.workflow.modules import (
module_types,
ToolModule,
WorkflowModuleFactory
)
from galaxy.workflow.render import WorkflowCanvas
from galaxy.workflow.steps import attach_ordered_steps
from tool_shed.tools import tool_validator
from tool_shed.util import (
encoding_util,
metadata_util,
repository_util
)
log = logging.getLogger(__name__)
[docs]class RepoToolModule(ToolModule):
type = "tool"
[docs] def __init__(self, trans, repository_id, changeset_revision, tools_metadata, tool_id):
self.trans = trans
self.tools_metadata = tools_metadata
self.tool_id = tool_id
self.tool = None
self.errors = None
self.tool_version = None
if trans.webapp.name == 'tool_shed':
# We're in the tool shed.
with ValidationContext.from_app(trans.app) as validation_context:
tv = tool_validator.ToolValidator(validation_context)
for tool_dict in tools_metadata:
if self.tool_id in [tool_dict['id'], tool_dict['guid']]:
repository, self.tool, valid, message = tv.load_tool_from_changeset_revision(repository_id,
changeset_revision,
tool_dict['tool_config'])
if self.tool is None and message or not valid:
self.errors = 'unavailable'
break
else:
# We're in Galaxy.
self.tool = trans.app.toolbox.get_tool(self.tool_id, tool_version=self.tool_version)
if self.tool is None:
self.errors = 'unavailable'
self.post_job_actions = {}
self.workflow_outputs = []
self.state = None
[docs] @classmethod
def from_dict(Class, trans, step_dict, repository_id, changeset_revision, tools_metadata, secure=True):
tool_id = step_dict['tool_id']
module = Class(trans, repository_id, changeset_revision, tools_metadata, tool_id)
module.state = galaxy.tools.DefaultToolState()
if module.tool is not None:
module.state.decode(step_dict["tool_state"], module.tool, module.trans.app)
module.errors = step_dict.get("tool_errors", None)
return module
[docs] @classmethod
def from_workflow_step(Class, trans, step, repository_id, changeset_revision, tools_metadata):
module = Class(trans, repository_id, changeset_revision, tools_metadata, step.tool_id)
module.state = galaxy.tools.DefaultToolState()
module.recover_state(step.tool_inputs)
module.errors = module.get_errors()
return module
[docs] def get_data_inputs(self):
data_inputs = []
def callback(input, prefixed_name, prefixed_label, **kwargs):
if isinstance(input, galaxy.tools.parameters.basic.DataToolParameter):
data_inputs.append(dict(name=prefixed_name,
label=prefixed_label,
extensions=input.extensions))
if self.tool:
try:
galaxy.tools.parameters.visit_input_values(self.tool.inputs, self.state.inputs, callback)
except Exception:
# TODO have this actually use default parameters? Fix at
# refactor, needs to be discussed wrt: reproducibility though.
log.exception("Tool parse failed for %s -- this indicates incompatibility of local tool version with expected version by the workflow.", self.tool.id)
return data_inputs
[docs] def get_data_outputs(self):
data_outputs = []
if self.tool:
data_inputs = None
for name, tool_output in self.tool.outputs.items():
if tool_output.format_source is not None:
# Default to special name "input" which remove restrictions on connections
formats = ['input']
if data_inputs is None:
data_inputs = self.get_data_inputs()
# Find the input parameter referenced by format_source
for di in data_inputs:
# Input names come prefixed with conditional and repeat names separated by '|',
# so remove prefixes when comparing with format_source.
if di['name'] is not None and di['name'].split('|')[-1] == tool_output.format_source:
formats = di['extensions']
else:
formats = [tool_output.format]
for change_elem in tool_output.change_format:
for when_elem in change_elem.findall('when'):
format = when_elem.get('format', None)
if format and format not in formats:
formats.append(format)
data_outputs.append(dict(name=name, extensions=formats))
return data_outputs
[docs]class RepoWorkflowModuleFactory(WorkflowModuleFactory):
[docs] def from_dict(self, trans, repository_id, changeset_revision, step_dict, tools_metadata, **kwd):
"""Return module initialized from the data in dictionary `step_dict`."""
type = step_dict['type']
assert type in self.module_types
module_method_kwds = dict(**kwd)
if type == "tool":
module_method_kwds['repository_id'] = repository_id
module_method_kwds['changeset_revision'] = changeset_revision
module_method_kwds['tools_metadata'] = tools_metadata
return self.module_types[type].from_dict(trans, step_dict, **module_method_kwds)
[docs] def from_workflow_step(self, trans, repository_id, changeset_revision, tools_metadata, step):
"""Return module initialized from the WorkflowStep object `step`."""
type = step.type
module_method_kwds = dict()
if type == "tool":
module_method_kwds['repository_id'] = repository_id
module_method_kwds['changeset_revision'] = changeset_revision
module_method_kwds['tools_metadata'] = tools_metadata
return self.module_types[type].from_workflow_step(trans, step, **module_method_kwds)
tool_shed_module_types = module_types.copy()
tool_shed_module_types['tool'] = RepoToolModule
module_factory = RepoWorkflowModuleFactory(tool_shed_module_types)
[docs]def generate_workflow_image(trans, workflow_name, repository_metadata_id=None, repository_id=None):
"""
Return an svg image representation of a workflow dictionary created when the workflow was exported. This method is called
from both Galaxy and the tool shed. When called from the tool shed, repository_metadata_id will have a value and repository_id
will be None. When called from Galaxy, repository_metadata_id will be None and repository_id will have a value.
"""
workflow_name = encoding_util.tool_shed_decode(workflow_name)
if trans.webapp.name == 'tool_shed':
# We're in the tool shed.
repository_metadata = metadata_util.get_repository_metadata_by_id(trans.app, repository_metadata_id)
repository_id = trans.security.encode_id(repository_metadata.repository_id)
changeset_revision = repository_metadata.changeset_revision
metadata = repository_metadata.metadata
else:
# We're in Galaxy.
repository = repository_util.get_tool_shed_repository_by_id(trans.app, repository_id)
changeset_revision = repository.changeset_revision
metadata = repository.metadata
# metadata[ 'workflows' ] is a list of tuples where each contained tuple is
# [ <relative path to the .ga file in the repository>, <exported workflow dict> ]
for workflow_tup in metadata['workflows']:
workflow_dict = workflow_tup[1]
if workflow_dict['name'] == workflow_name:
break
if 'tools' in metadata:
tools_metadata = metadata['tools']
else:
tools_metadata = []
workflow, missing_tool_tups = get_workflow_from_dict(trans=trans,
workflow_dict=workflow_dict,
tools_metadata=tools_metadata,
repository_id=repository_id,
changeset_revision=changeset_revision)
workflow_canvas = WorkflowCanvas()
canvas = workflow_canvas.canvas
# Store px width for boxes of each step.
for step in workflow.steps:
step.upgrade_messages = {}
module = module_factory.from_workflow_step(trans, repository_id, changeset_revision, tools_metadata, step)
tool_errors = module.type == 'tool' and not module.tool
module_data_inputs = get_workflow_data_inputs(step, module)
module_data_outputs = get_workflow_data_outputs(step, module, workflow.steps)
module_name = get_workflow_module_name(module, missing_tool_tups)
workflow_canvas.populate_data_for_step(
step,
module_name,
module_data_inputs,
module_data_outputs,
tool_errors=tool_errors
)
workflow_canvas.add_steps(highlight_errors=True)
workflow_canvas.finish()
trans.response.set_content_type("image/svg+xml")
return canvas.tostring()
[docs]def get_workflow_data_inputs(step, module):
if module.type == 'tool':
if module.tool:
return module.get_data_inputs()
else:
data_inputs = []
for wfsc in step.input_connections:
data_inputs_dict = {}
data_inputs_dict['extensions'] = ['']
data_inputs_dict['name'] = wfsc.input_name
data_inputs_dict['label'] = 'Unknown'
data_inputs.append(data_inputs_dict)
return data_inputs
return module.get_data_inputs()
[docs]def get_workflow_data_outputs(step, module, steps):
if module.type == 'tool':
if module.tool:
return module.get_data_outputs()
else:
data_outputs = []
data_outputs_dict = {}
data_outputs_dict['extensions'] = ['input']
found = False
for workflow_step in steps:
for wfsc in workflow_step.input_connections:
if step.label == wfsc.output_step.label:
data_outputs_dict['name'] = wfsc.output_name
found = True
break
if found:
break
if not found:
# We're at the last step of the workflow.
data_outputs_dict['name'] = 'output'
data_outputs.append(data_outputs_dict)
return data_outputs
return module.get_data_outputs()
[docs]def get_workflow_from_dict(trans, workflow_dict, tools_metadata, repository_id, changeset_revision):
"""
Return an in-memory Workflow object from the dictionary object created when it was exported. This method is called from
both Galaxy and the tool shed to retrieve a Workflow object that can be displayed as an SVG image. This method is also
called from Galaxy to retrieve a Workflow object that can be used for saving to the Galaxy database.
"""
trans.workflow_building_mode = True
workflow = trans.model.Workflow()
workflow.name = workflow_dict['name']
workflow.has_errors = False
steps = []
# Keep ids for each step that we need to use to make connections.
steps_by_external_id = {}
# Keep track of tools required by the workflow that are not available in
# the tool shed repository. Each tuple in the list of missing_tool_tups
# will be ( tool_id, tool_name, tool_version ).
missing_tool_tups = []
# First pass to build step objects and populate basic values
for step_dict in workflow_dict['steps'].values():
# Create the model class for the step
step = trans.model.WorkflowStep()
step.label = step_dict.get('label', None)
step.position = step_dict['position']
module = module_factory.from_dict(trans, repository_id, changeset_revision, step_dict, tools_metadata=tools_metadata)
if module.type == 'tool' and module.tool is None:
# A required tool is not available in the current repository.
step.tool_errors = 'unavailable'
missing_tool_tup = (step_dict['tool_id'], step_dict['name'], step_dict['tool_version'])
if missing_tool_tup not in missing_tool_tups:
missing_tool_tups.append(missing_tool_tup)
module.save_to_step(step)
if step.tool_errors:
workflow.has_errors = True
# Stick this in the step temporarily.
step.temp_input_connections = step_dict['input_connections']
if trans.webapp.name == 'galaxy':
annotation = step_dict.get('annotation', '')
if annotation:
annotation = sanitize_html(annotation)
new_step_annotation = trans.model.WorkflowStepAnnotationAssociation()
new_step_annotation.annotation = annotation
new_step_annotation.user = trans.user
step.annotations.append(new_step_annotation)
# Unpack and add post-job actions.
post_job_actions = step_dict.get('post_job_actions', {})
for pja_dict in post_job_actions.values():
trans.model.PostJobAction(pja_dict['action_type'],
step,
pja_dict['output_name'],
pja_dict['action_arguments'])
steps.append(step)
steps_by_external_id[step_dict['id']] = step
# Second pass to deal with connections between steps.
for step in steps:
# Input connections.
for input_name, conn_dict in step.temp_input_connections.items():
if conn_dict:
step_input = step.get_or_add_input(input_name)
output_step = steps_by_external_id[conn_dict['id']]
conn = trans.model.WorkflowStepConnection()
conn.input_step_input = step_input
conn.output_step = output_step
conn.output_name = conn_dict['output_name']
del step.temp_input_connections
# Order the steps if possible.
attach_ordered_steps(workflow, steps)
# Return the in-memory Workflow object for display or later persistence to the Galaxy database.
return workflow, missing_tool_tups
[docs]def get_workflow_module_name(module, missing_tool_tups):
module_name = module.get_name()
if module.type == 'tool' and module_name == 'unavailable':
for missing_tool_tup in missing_tool_tups:
missing_tool_id, missing_tool_name, missing_tool_version = missing_tool_tup
if missing_tool_id == module.tool_id:
module_name = '%s' % missing_tool_name
break
return module_name
[docs]def import_workflow(trans, repository, workflow_name):
"""Import a workflow contained in an installed tool shed repository into Galaxy (this method is called only from Galaxy)."""
status = 'done'
message = ''
changeset_revision = repository.changeset_revision
metadata = repository.metadata
workflows = metadata.get('workflows', [])
tools_metadata = metadata.get('tools', [])
workflow_dict = None
for workflow_data_tuple in workflows:
# The value of workflow_data_tuple is ( relative_path_to_workflow_file, exported_workflow_dict ).
relative_path_to_workflow_file, exported_workflow_dict = workflow_data_tuple
if exported_workflow_dict['name'] == workflow_name:
# If the exported workflow is available on disk, import it.
if os.path.exists(relative_path_to_workflow_file):
workflow_file = open(relative_path_to_workflow_file, 'rb')
workflow_data = workflow_file.read()
workflow_file.close()
workflow_dict = json.loads(workflow_data)
else:
# Use the current exported_workflow_dict.
workflow_dict = exported_workflow_dict
break
if workflow_dict:
# Create workflow if possible.
workflow, missing_tool_tups = get_workflow_from_dict(trans=trans,
workflow_dict=workflow_dict,
tools_metadata=tools_metadata,
repository_id=repository.id,
changeset_revision=changeset_revision)
# Save the workflow in the Galaxy database. Pass workflow_dict along to create annotation at this point.
stored_workflow = save_workflow(trans, workflow, workflow_dict)
# Use the latest version of the saved workflow.
workflow = stored_workflow.latest_workflow
if workflow_name:
workflow.name = workflow_name
# Provide user feedback and show workflow list.
if workflow.has_errors:
message += "Imported, but some steps in this workflow have validation errors. "
status = "error"
if workflow.has_cycles:
message += "Imported, but this workflow contains cycles. "
status = "error"
else:
message += "Workflow <b>%s</b> imported successfully. " % workflow.name
if missing_tool_tups:
name_and_id_str = ''
for missing_tool_tup in missing_tool_tups:
tool_id, tool_name, other = missing_tool_tup
name_and_id_str += 'name: %s, id: %s' % (str(tool_id), str(tool_name))
message += "The following tools required by this workflow are missing from this Galaxy instance: %s. " % name_and_id_str
else:
workflow = None
message += 'The workflow named %s is not included in the metadata for revision %s of repository %s' % \
(str(workflow_name), str(changeset_revision), str(repository.name))
status = 'error'
return workflow, status, message
[docs]def save_workflow(trans, workflow, workflow_dict=None):
"""Use the received in-memory Workflow object for saving to the Galaxy database."""
stored = trans.model.StoredWorkflow()
stored.name = workflow.name
workflow.stored_workflow = stored
stored.latest_workflow = workflow
stored.user = trans.user
if workflow_dict and workflow_dict.get('annotation', ''):
annotation = sanitize_html(workflow_dict['annotation'])
new_annotation = trans.model.StoredWorkflowAnnotationAssociation()
new_annotation.annotation = annotation
new_annotation.user = trans.user
stored.annotations.append(new_annotation)
trans.sa_session.add(stored)
trans.sa_session.flush()
# Add a new entry to the Workflows menu.
if trans.user.stored_workflow_menu_entries is None:
trans.user.stored_workflow_menu_entries = []
menuEntry = trans.model.StoredWorkflowMenuEntry()
menuEntry.stored_workflow = stored
trans.user.stored_workflow_menu_entries.append(menuEntry)
trans.sa_session.flush()
return stored