Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.util.workflow_util

""" Tool shed helper methods for dealing with workflows - only two methods are
utilized outside of this modules - generate_workflow_image and import_workflow.
"""
import json
import logging
import os

import galaxy.tools
import galaxy.tools.parameters
from galaxy.tools.repositories import ValidationContext
from galaxy.util.sanitize_html import sanitize_html
from galaxy.workflow.modules import (
    module_types,
    ToolModule,
    WorkflowModuleFactory
)
from galaxy.workflow.render import WorkflowCanvas
from galaxy.workflow.steps import attach_ordered_steps
from tool_shed.tools import tool_validator
from tool_shed.util import (
    encoding_util,
    metadata_util,
    repository_util
)

log = logging.getLogger(__name__)


[docs]class RepoToolModule(ToolModule): type = "tool"
[docs] def __init__(self, trans, repository_id, changeset_revision, tools_metadata, tool_id): self.trans = trans self.tools_metadata = tools_metadata self.tool_id = tool_id self.tool = None self.errors = None self.tool_version = None if trans.webapp.name == 'tool_shed': # We're in the tool shed. with ValidationContext.from_app(trans.app) as validation_context: tv = tool_validator.ToolValidator(validation_context) for tool_dict in tools_metadata: if self.tool_id in [tool_dict['id'], tool_dict['guid']]: repository, self.tool, message = tv.load_tool_from_changeset_revision(repository_id, changeset_revision, tool_dict['tool_config']) if message and self.tool is None: self.errors = 'unavailable' break else: # We're in Galaxy. self.tool = trans.app.toolbox.get_tool(self.tool_id, tool_version=self.tool_version) if self.tool is None: self.errors = 'unavailable' self.post_job_actions = {} self.workflow_outputs = [] self.state = None
[docs] @classmethod def from_dict(Class, trans, step_dict, repository_id, changeset_revision, tools_metadata, secure=True): tool_id = step_dict['tool_id'] module = Class(trans, repository_id, changeset_revision, tools_metadata, tool_id) module.state = galaxy.tools.DefaultToolState() if module.tool is not None: module.state.decode(step_dict["tool_state"], module.tool, module.trans.app) module.errors = step_dict.get("tool_errors", None) return module
[docs] @classmethod def from_workflow_step(Class, trans, step, repository_id, changeset_revision, tools_metadata): module = Class(trans, repository_id, changeset_revision, tools_metadata, step.tool_id) module.state = galaxy.tools.DefaultToolState() module.recover_state(step.tool_inputs) module.errors = module.get_errors() return module
[docs] def get_data_inputs(self): data_inputs = [] def callback(input, prefixed_name, prefixed_label, **kwargs): if isinstance(input, galaxy.tools.parameters.basic.DataToolParameter): data_inputs.append(dict(name=prefixed_name, label=prefixed_label, extensions=input.extensions)) if self.tool: try: galaxy.tools.parameters.visit_input_values(self.tool.inputs, self.state.inputs, callback) except Exception: # TODO have this actually use default parameters? Fix at # refactor, needs to be discussed wrt: reproducibility though. log.exception("Tool parse failed for %s -- this indicates incompatibility of local tool version with expected version by the workflow.", self.tool.id) return data_inputs
[docs] def get_data_outputs(self): data_outputs = [] if self.tool: data_inputs = None for name, tool_output in self.tool.outputs.items(): if tool_output.format_source is not None: # Default to special name "input" which remove restrictions on connections formats = ['input'] if data_inputs is None: data_inputs = self.get_data_inputs() # Find the input parameter referenced by format_source for di in data_inputs: # Input names come prefixed with conditional and repeat names separated by '|', # so remove prefixes when comparing with format_source. if di['name'] is not None and di['name'].split('|')[-1] == tool_output.format_source: formats = di['extensions'] else: formats = [tool_output.format] for change_elem in tool_output.change_format: for when_elem in change_elem.findall('when'): format = when_elem.get('format', None) if format and format not in formats: formats.append(format) data_outputs.append(dict(name=name, extensions=formats)) return data_outputs
[docs]class RepoWorkflowModuleFactory(WorkflowModuleFactory):
[docs] def __init__(self, module_types): self.module_types = module_types
[docs] def from_dict(self, trans, repository_id, changeset_revision, step_dict, tools_metadata, **kwd): """Return module initialized from the data in dictionary `step_dict`.""" type = step_dict['type'] assert type in self.module_types module_method_kwds = dict(**kwd) if type == "tool": module_method_kwds['repository_id'] = repository_id module_method_kwds['changeset_revision'] = changeset_revision module_method_kwds['tools_metadata'] = tools_metadata return self.module_types[type].from_dict(trans, step_dict, **module_method_kwds)
[docs] def from_workflow_step(self, trans, repository_id, changeset_revision, tools_metadata, step): """Return module initialized from the WorkflowStep object `step`.""" type = step.type module_method_kwds = dict() if type == "tool": module_method_kwds['repository_id'] = repository_id module_method_kwds['changeset_revision'] = changeset_revision module_method_kwds['tools_metadata'] = tools_metadata return self.module_types[type].from_workflow_step(trans, step, **module_method_kwds)
tool_shed_module_types = module_types.copy() tool_shed_module_types['tool'] = RepoToolModule module_factory = RepoWorkflowModuleFactory(tool_shed_module_types)
[docs]def generate_workflow_image(trans, workflow_name, repository_metadata_id=None, repository_id=None): """ Return an svg image representation of a workflow dictionary created when the workflow was exported. This method is called from both Galaxy and the tool shed. When called from the tool shed, repository_metadata_id will have a value and repository_id will be None. When called from Galaxy, repository_metadata_id will be None and repository_id will have a value. """ workflow_name = encoding_util.tool_shed_decode(workflow_name) if trans.webapp.name == 'tool_shed': # We're in the tool shed. repository_metadata = metadata_util.get_repository_metadata_by_id(trans.app, repository_metadata_id) repository_id = trans.security.encode_id(repository_metadata.repository_id) changeset_revision = repository_metadata.changeset_revision metadata = repository_metadata.metadata else: # We're in Galaxy. repository = repository_util.get_tool_shed_repository_by_id(trans.app, repository_id) changeset_revision = repository.changeset_revision metadata = repository.metadata # metadata[ 'workflows' ] is a list of tuples where each contained tuple is # [ <relative path to the .ga file in the repository>, <exported workflow dict> ] for workflow_tup in metadata['workflows']: workflow_dict = workflow_tup[1] if workflow_dict['name'] == workflow_name: break if 'tools' in metadata: tools_metadata = metadata['tools'] else: tools_metadata = [] workflow, missing_tool_tups = get_workflow_from_dict(trans=trans, workflow_dict=workflow_dict, tools_metadata=tools_metadata, repository_id=repository_id, changeset_revision=changeset_revision) workflow_canvas = WorkflowCanvas() canvas = workflow_canvas.canvas # Store px width for boxes of each step. for step in workflow.steps: step.upgrade_messages = {} module = module_factory.from_workflow_step(trans, repository_id, changeset_revision, tools_metadata, step) tool_errors = module.type == 'tool' and not module.tool module_data_inputs = get_workflow_data_inputs(step, module) module_data_outputs = get_workflow_data_outputs(step, module, workflow.steps) module_name = get_workflow_module_name(module, missing_tool_tups) workflow_canvas.populate_data_for_step( step, module_name, module_data_inputs, module_data_outputs, tool_errors=tool_errors ) workflow_canvas.add_steps(highlight_errors=True) workflow_canvas.finish() trans.response.set_content_type("image/svg+xml") return canvas.tostring()
[docs]def get_workflow_data_inputs(step, module): if module.type == 'tool': if module.tool: return module.get_data_inputs() else: data_inputs = [] for wfsc in step.input_connections: data_inputs_dict = {} data_inputs_dict['extensions'] = [''] data_inputs_dict['name'] = wfsc.input_name data_inputs_dict['label'] = 'Unknown' data_inputs.append(data_inputs_dict) return data_inputs return module.get_data_inputs()
[docs]def get_workflow_data_outputs(step, module, steps): if module.type == 'tool': if module.tool: return module.get_data_outputs() else: data_outputs = [] data_outputs_dict = {} data_outputs_dict['extensions'] = ['input'] found = False for workflow_step in steps: for wfsc in workflow_step.input_connections: if step.label == wfsc.output_step.label: data_outputs_dict['name'] = wfsc.output_name found = True break if found: break if not found: # We're at the last step of the workflow. data_outputs_dict['name'] = 'output' data_outputs.append(data_outputs_dict) return data_outputs return module.get_data_outputs()
[docs]def get_workflow_from_dict(trans, workflow_dict, tools_metadata, repository_id, changeset_revision): """ Return an in-memory Workflow object from the dictionary object created when it was exported. This method is called from both Galaxy and the tool shed to retrieve a Workflow object that can be displayed as an SVG image. This method is also called from Galaxy to retrieve a Workflow object that can be used for saving to the Galaxy database. """ trans.workflow_building_mode = True workflow = trans.model.Workflow() workflow.name = workflow_dict['name'] workflow.has_errors = False steps = [] # Keep ids for each step that we need to use to make connections. steps_by_external_id = {} # Keep track of tools required by the workflow that are not available in # the tool shed repository. Each tuple in the list of missing_tool_tups # will be ( tool_id, tool_name, tool_version ). missing_tool_tups = [] # First pass to build step objects and populate basic values for step_dict in workflow_dict['steps'].values(): # Create the model class for the step step = trans.model.WorkflowStep() step.label = step_dict.get('label', None) step.position = step_dict['position'] module = module_factory.from_dict(trans, repository_id, changeset_revision, step_dict, tools_metadata=tools_metadata) if module.type == 'tool' and module.tool is None: # A required tool is not available in the current repository. step.tool_errors = 'unavailable' missing_tool_tup = (step_dict['tool_id'], step_dict['name'], step_dict['tool_version']) if missing_tool_tup not in missing_tool_tups: missing_tool_tups.append(missing_tool_tup) module.save_to_step(step) if step.tool_errors: workflow.has_errors = True # Stick this in the step temporarily. step.temp_input_connections = step_dict['input_connections'] if trans.webapp.name == 'galaxy': annotation = step_dict.get('annotation', '') if annotation: annotation = sanitize_html(annotation, 'utf-8', 'text/html') new_step_annotation = trans.model.WorkflowStepAnnotationAssociation() new_step_annotation.annotation = annotation new_step_annotation.user = trans.user step.annotations.append(new_step_annotation) # Unpack and add post-job actions. post_job_actions = step_dict.get('post_job_actions', {}) for pja_dict in post_job_actions.values(): trans.model.PostJobAction(pja_dict['action_type'], step, pja_dict['output_name'], pja_dict['action_arguments']) steps.append(step) steps_by_external_id[step_dict['id']] = step # Second pass to deal with connections between steps. for step in steps: # Input connections. for input_name, conn_dict in step.temp_input_connections.items(): if conn_dict: output_step = steps_by_external_id[conn_dict['id']] conn = trans.model.WorkflowStepConnection() conn.input_step = step conn.input_name = input_name conn.output_step = output_step conn.output_name = conn_dict['output_name'] step.input_connections.append(conn) del step.temp_input_connections # Order the steps if possible. attach_ordered_steps(workflow, steps) # Return the in-memory Workflow object for display or later persistence to the Galaxy database. return workflow, missing_tool_tups
[docs]def get_workflow_module_name(module, missing_tool_tups): module_name = module.get_name() if module.type == 'tool' and module_name == 'unavailable': for missing_tool_tup in missing_tool_tups: missing_tool_id, missing_tool_name, missing_tool_version = missing_tool_tup if missing_tool_id == module.tool_id: module_name = '%s' % missing_tool_name break return module_name
[docs]def import_workflow(trans, repository, workflow_name): """Import a workflow contained in an installed tool shed repository into Galaxy (this method is called only from Galaxy).""" status = 'done' message = '' changeset_revision = repository.changeset_revision metadata = repository.metadata workflows = metadata.get('workflows', []) tools_metadata = metadata.get('tools', []) workflow_dict = None for workflow_data_tuple in workflows: # The value of workflow_data_tuple is ( relative_path_to_workflow_file, exported_workflow_dict ). relative_path_to_workflow_file, exported_workflow_dict = workflow_data_tuple if exported_workflow_dict['name'] == workflow_name: # If the exported workflow is available on disk, import it. if os.path.exists(relative_path_to_workflow_file): workflow_file = open(relative_path_to_workflow_file, 'rb') workflow_data = workflow_file.read() workflow_file.close() workflow_dict = json.loads(workflow_data) else: # Use the current exported_workflow_dict. workflow_dict = exported_workflow_dict break if workflow_dict: # Create workflow if possible. workflow, missing_tool_tups = get_workflow_from_dict(trans=trans, workflow_dict=workflow_dict, tools_metadata=tools_metadata, repository_id=repository.id, changeset_revision=changeset_revision) # Save the workflow in the Galaxy database. Pass workflow_dict along to create annotation at this point. stored_workflow = save_workflow(trans, workflow, workflow_dict) # Use the latest version of the saved workflow. workflow = stored_workflow.latest_workflow if workflow_name: workflow.name = workflow_name # Provide user feedback and show workflow list. if workflow.has_errors: message += "Imported, but some steps in this workflow have validation errors. " status = "error" if workflow.has_cycles: message += "Imported, but this workflow contains cycles. " status = "error" else: message += "Workflow <b>%s</b> imported successfully. " % workflow.name if missing_tool_tups: name_and_id_str = '' for missing_tool_tup in missing_tool_tups: tool_id, tool_name, other = missing_tool_tup name_and_id_str += 'name: %s, id: %s' % (str(tool_id), str(tool_name)) message += "The following tools required by this workflow are missing from this Galaxy instance: %s. " % name_and_id_str else: workflow = None message += 'The workflow named %s is not included in the metadata for revision %s of repository %s' % \ (str(workflow_name), str(changeset_revision), str(repository.name)) status = 'error' return workflow, status, message
[docs]def save_workflow(trans, workflow, workflow_dict=None): """Use the received in-memory Workflow object for saving to the Galaxy database.""" stored = trans.model.StoredWorkflow() stored.name = workflow.name workflow.stored_workflow = stored stored.latest_workflow = workflow stored.user = trans.user if workflow_dict and workflow_dict.get('annotation', ''): annotation = sanitize_html(workflow_dict['annotation'], 'utf-8', 'text/html') new_annotation = trans.model.StoredWorkflowAnnotationAssociation() new_annotation.annotation = annotation new_annotation.user = trans.user stored.annotations.append(new_annotation) trans.sa_session.add(stored) trans.sa_session.flush() # Add a new entry to the Workflows menu. if trans.user.stored_workflow_menu_entries is None: trans.user.stored_workflow_menu_entries = [] menuEntry = trans.model.StoredWorkflowMenuEntry() menuEntry.stored_workflow = stored trans.user.stored_workflow_menu_entries.append(menuEntry) trans.sa_session.flush() return stored