Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_workflows_from_yaml

import json
import os

from galaxy_test.base.workflow_fixtures import (
    WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
    WORKFLOW_RUNTIME_PARAMETER_SIMPLE,
    WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES,
    WORKFLOW_SIMPLE_CAT_TWICE,
    WORKFLOW_WITH_OUTPUT_ACTIONS,
    WORKFLOW_WITH_OUTPUTS,
)
from .test_workflows import BaseWorkflowsApiTestCase

WORKFLOWS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))


[docs]class TestWorkflowsFromYamlApi(BaseWorkflowsApiTestCase):
[docs] def setUp(self): super().setUp()
def _upload_and_download(self, yaml_workflow, **kwds): style = None if "style" in kwds: style = kwds.pop("style") workflow_id = self._upload_yaml_workflow(yaml_workflow, **kwds) return self.workflow_populator.download_workflow(workflow_id, style=style)
[docs] def test_simple_upload(self): workflow = self._upload_and_download(WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES, client_convert=False) assert workflow["annotation"].startswith("Simple workflow that ") tool_count = {"random_lines1": 0, "cat1": 0} input_found = False for step in workflow["steps"].values(): step_type = step["type"] if step_type == "data_input": assert step["label"] == "the_input" input_found = True else: tool_id = step["tool_id"] tool_count[tool_id] += 1 if tool_id == "random_lines1": assert step["label"] == "random_line_label" assert input_found assert tool_count["random_lines1"] == 1 assert tool_count["cat1"] == 2 workflow_as_format2 = self._upload_and_download( WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES, client_convert=False, style="format2" ) assert workflow_as_format2["doc"].startswith("Simple workflow that")
[docs] def test_simple_output_actions(self): history_id = self.dataset_populator.new_history() self._run_jobs( WORKFLOW_WITH_OUTPUT_ACTIONS, test_data=""" input1: "hello world" """, history_id=history_id, ) details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=2) assert not details1["visible"] assert details1["name"] == "the new value", details1 details2 = self.dataset_populator.get_history_dataset_details(history_id, hid=3) assert details2["visible"]
[docs] def test_inputs_to_steps(self): history_id = self.dataset_populator.new_history() self._run_jobs( WORKFLOW_SIMPLE_CAT_TWICE, test_data={"input1": "hello world"}, history_id=history_id, round_trip_format_conversion=True, ) contents1 = self.dataset_populator.get_history_dataset_content(history_id) assert contents1.strip() == "hello world\nhello world"
[docs] def test_outputs(self): workflow_id = self._upload_yaml_workflow(WORKFLOW_WITH_OUTPUTS, round_trip_format_conversion=True) workflow = self._get(f"workflows/{workflow_id}/download").json() assert workflow["steps"]["1"]["workflow_outputs"][0]["output_name"] == "out_file1" assert workflow["steps"]["1"]["workflow_outputs"][0]["label"] == "wf_output_1" workflow = self.workflow_populator.download_workflow(workflow_id, style="format2")
[docs] def test_runtime_inputs(self): workflow = self._upload_and_download(WORKFLOW_RUNTIME_PARAMETER_SIMPLE) assert len(workflow["steps"]) == 2 runtime_step = workflow["steps"]["1"] for runtime_input in runtime_step["inputs"]: if runtime_input["name"] == "num_lines": break assert runtime_input["description"].startswith("runtime parameter for tool") tool_state = json.loads(runtime_step["tool_state"]) assert "num_lines" in tool_state self._assert_is_runtime_input(tool_state["num_lines"])
[docs] def test_subworkflow_simple(self): workflow_id = self._upload_yaml_workflow( """ class: GalaxyWorkflow inputs: outer_input: data steps: first_cat: tool_id: cat1 in: input1: outer_input nested_workflow: run: class: GalaxyWorkflow inputs: inner_input: data steps: - tool_id: random_lines1 state: num_lines: 1 input: $link: inner_input seed_source: seed_source_selector: set_seed seed: asdf in: inner_input: first_cat/out_file1 """, client_convert=False, ) workflow = self.workflow_populator.download_workflow(workflow_id) by_label = self._steps_by_label(workflow) if "nested_workflow" not in by_label: template = "Workflow [%s] does not contain label 'nested_workflow'." message = template % workflow raise AssertionError(message) subworkflow_step = by_label["nested_workflow"] assert subworkflow_step["type"] == "subworkflow" assert len(subworkflow_step["subworkflow"]["steps"]) == 2 subworkflow_connections = subworkflow_step["input_connections"] assert len(subworkflow_connections) == 1 subworkflow_connection = subworkflow_connections["inner_input"] assert subworkflow_connection["input_subworkflow_step_id"] == 0 workflow_reupload_id = self.import_workflow(workflow)["id"] workflow_reupload = self._get(f"workflows/{workflow_reupload_id}/download").json() by_label = self._steps_by_label(workflow_reupload) subworkflow_step = by_label["nested_workflow"] assert subworkflow_step["type"] == "subworkflow" assert len(subworkflow_step["subworkflow"]["steps"]) == 2 subworkflow_connections = subworkflow_step["input_connections"] assert len(subworkflow_connections) == 1 subworkflow_connection = subworkflow_connections["inner_input"] assert subworkflow_connection["input_subworkflow_step_id"] == 0
# content = self.dataset_populator.get_history_dataset_content( history_id ) # assert content == "chr5\t131424298\t131424460\tCCDS4149.1_cds_0_0_chr5_131424299_f\t0\t+\n"
[docs] def test_subworkflow_duplicate(self): duplicate_subworkflow_invocate_wf = """ format-version: "v2.0" $graph: - id: nested class: GalaxyWorkflow inputs: inner_input: data outputs: inner_output: outputSource: inner_cat/out_file1 steps: inner_cat: tool_id: cat in: input1: inner_input queries_0|input2: inner_input - id: main class: GalaxyWorkflow inputs: outer_input: data steps: outer_cat: tool_id: cat in: input1: outer_input nested_workflow_1: run: '#nested' in: inner_input: outer_cat/out_file1 nested_workflow_2: run: '#nested' in: inner_input: nested_workflow_1/inner_output """ history_id = self.dataset_populator.new_history() self._run_jobs( duplicate_subworkflow_invocate_wf, test_data={"outer_input": "hello world"}, history_id=history_id, client_convert=False, ) content = self.dataset_populator.get_history_dataset_content(history_id) assert content == "hello world\nhello world\nhello world\nhello world\n"
[docs] def test_pause(self): workflow_id = self._upload_yaml_workflow( """ class: GalaxyWorkflow steps: test_input: type: input first_cat: tool_id: cat1 state: input1: $link: test_input the_pause: type: pause in: input: first_cat/out_file1 second_cat: tool_id: cat1 in: input1: the_pause """ ) self.workflow_populator.dump_workflow(workflow_id)
[docs] def test_implicit_connections(self): workflow_id = self._upload_yaml_workflow( """ class: GalaxyWorkflow inputs: test_input: data steps: first_cat: tool_id: cat1 in: input1: test_input the_pause: type: pause in: input: first_cat/out_file1 second_cat: tool_id: cat1 in: input1: the_pause third_cat: tool_id: cat1 connect: $step: second_cat state: input1: $link: test_input """ ) self.workflow_populator.dump_workflow(workflow_id)
[docs] def test_conditional_ints(self, history_id): self._run_jobs( """ class: GalaxyWorkflow steps: test_input: tool_id: disambiguate_cond state: p3: use: true files: attach_files: false """, test_data={}, history_id=history_id, round_trip_format_conversion=True, ) content = self.dataset_populator.get_history_dataset_content(history_id) assert "no file specified" in content assert "7 7 4" in content self._run_jobs( """ class: GalaxyWorkflow steps: test_input: tool_id: disambiguate_cond state: p3: use: true p3v: 5 files: attach_files: false """, test_data={}, history_id=history_id, round_trip_format_conversion=True, ) content = self.dataset_populator.get_history_dataset_content(history_id) assert "no file specified" in content assert "7 7 5" in content
[docs] def test_workflow_embed_tool(self): history_id = self.dataset_populator.new_history() self._run_jobs( """ class: GalaxyWorkflow steps: - type: input label: input1 - tool_id: cat1 label: first_cat state: input1: $link: 0 - label: embed1 run: class: GalaxyTool command: echo 'hello world 2' > $output1 outputs: output1: format: txt - tool_id: cat1 state: input1: $link: first_cat/out_file1 queries: - input2: $link: embed1/output1 test_data: input1: "hello world" """, history_id=history_id, ) content = self.dataset_populator.get_history_dataset_content(history_id) assert content == "hello world\nhello world 2\n"
[docs] def test_workflow_import_tool(self): history_id = self.dataset_populator.new_history() workflow_path = os.path.join(WORKFLOWS_DIRECTORY, "embed_test_1.gxwf.yml") jobs_descriptions = {"test_data": {"input1": "hello world"}} self._run_jobs(workflow_path, source_type="path", jobs_descriptions=jobs_descriptions, history_id=history_id) content = self.dataset_populator.get_history_dataset_content(history_id) assert content == "hello world\nhello world 2\n"
[docs] def test_parameter_default_rep(self): workflow = self._upload_and_download(WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT) int_input = self._steps_by_label(workflow)["int_input"] int_input_state = json.loads(int_input["tool_state"]) assert int_input_state["default"] == 3 assert int_input_state["optional"] is True assert int_input_state["parameter_type"] == "integer"
def _steps_by_label(self, workflow_as_dict): by_label = {} assert "steps" in workflow_as_dict, workflow_as_dict for step in workflow_as_dict["steps"].values(): by_label[step["label"]] = step return by_label