Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_workflows_from_yaml
import json
import os
from galaxy_test.base.workflow_fixtures import (
WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
WORKFLOW_RUNTIME_PARAMETER_SIMPLE,
WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES,
WORKFLOW_SIMPLE_CAT_TWICE,
WORKFLOW_WITH_OUTPUT_ACTIONS,
WORKFLOW_WITH_OUTPUTS,
)
from .test_workflows import BaseWorkflowsApiTestCase
WORKFLOWS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
[docs]class TestWorkflowsFromYamlApi(BaseWorkflowsApiTestCase):
def _upload_and_download(self, yaml_workflow, **kwds):
style = None
if "style" in kwds:
style = kwds.pop("style")
workflow_id = self._upload_yaml_workflow(yaml_workflow, **kwds)
return self.workflow_populator.download_workflow(workflow_id, style=style)
[docs] def test_simple_upload(self):
workflow = self._upload_and_download(WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES, client_convert=False)
assert workflow["annotation"].startswith("Simple workflow that ")
tool_count = {"random_lines1": 0, "cat1": 0}
input_found = False
for step in workflow["steps"].values():
step_type = step["type"]
if step_type == "data_input":
assert step["label"] == "the_input"
input_found = True
else:
tool_id = step["tool_id"]
tool_count[tool_id] += 1
if tool_id == "random_lines1":
assert step["label"] == "random_line_label"
assert input_found
assert tool_count["random_lines1"] == 1
assert tool_count["cat1"] == 2
workflow_as_format2 = self._upload_and_download(
WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES, client_convert=False, style="format2"
)
assert workflow_as_format2["doc"].startswith("Simple workflow that")
[docs] def test_simple_output_actions(self):
history_id = self.dataset_populator.new_history()
self._run_jobs(
WORKFLOW_WITH_OUTPUT_ACTIONS,
test_data="""
input1: "hello world"
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert not details1["visible"]
assert details1["name"] == "the new value", details1
details2 = self.dataset_populator.get_history_dataset_details(history_id, hid=3)
assert details2["visible"]
[docs] def test_inputs_to_steps(self):
history_id = self.dataset_populator.new_history()
self._run_jobs(
WORKFLOW_SIMPLE_CAT_TWICE,
test_data={"input1": "hello world"},
history_id=history_id,
round_trip_format_conversion=True,
)
contents1 = self.dataset_populator.get_history_dataset_content(history_id)
assert contents1.strip() == "hello world\nhello world"
[docs] def test_outputs(self):
workflow_id = self._upload_yaml_workflow(WORKFLOW_WITH_OUTPUTS, round_trip_format_conversion=True)
workflow = self._get(f"workflows/{workflow_id}/download").json()
assert workflow["steps"]["1"]["workflow_outputs"][0]["output_name"] == "out_file1"
assert workflow["steps"]["1"]["workflow_outputs"][0]["label"] == "wf_output_1"
workflow = self.workflow_populator.download_workflow(workflow_id, style="format2")
[docs] def test_runtime_inputs(self):
workflow = self._upload_and_download(WORKFLOW_RUNTIME_PARAMETER_SIMPLE)
assert len(workflow["steps"]) == 2
runtime_step = workflow["steps"]["1"]
for runtime_input in runtime_step["inputs"]:
if runtime_input["name"] == "num_lines":
break
assert runtime_input["description"].startswith("runtime parameter for tool")
tool_state = json.loads(runtime_step["tool_state"])
assert "num_lines" in tool_state
self._assert_is_runtime_input(tool_state["num_lines"])
[docs] def test_subworkflow_simple(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
outer_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
steps:
- tool_id: random_lines1
state:
num_lines: 1
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
in:
inner_input: first_cat/out_file1
""",
client_convert=False,
)
workflow = self.workflow_populator.download_workflow(workflow_id)
by_label = self._steps_by_label(workflow)
if "nested_workflow" not in by_label:
template = "Workflow [%s] does not contain label 'nested_workflow'."
message = template % workflow
raise AssertionError(message)
subworkflow_step = by_label["nested_workflow"]
assert subworkflow_step["type"] == "subworkflow"
assert len(subworkflow_step["subworkflow"]["steps"]) == 2
subworkflow_connections = subworkflow_step["input_connections"]
assert len(subworkflow_connections) == 1
subworkflow_connection = subworkflow_connections["inner_input"]
assert subworkflow_connection["input_subworkflow_step_id"] == 0
workflow_reupload_id = self.import_workflow(workflow)["id"]
workflow_reupload = self._get(f"workflows/{workflow_reupload_id}/download").json()
by_label = self._steps_by_label(workflow_reupload)
subworkflow_step = by_label["nested_workflow"]
assert subworkflow_step["type"] == "subworkflow"
assert len(subworkflow_step["subworkflow"]["steps"]) == 2
subworkflow_connections = subworkflow_step["input_connections"]
assert len(subworkflow_connections) == 1
subworkflow_connection = subworkflow_connections["inner_input"]
assert subworkflow_connection["input_subworkflow_step_id"] == 0
# content = self.dataset_populator.get_history_dataset_content( history_id )
# assert content == "chr5\t131424298\t131424460\tCCDS4149.1_cds_0_0_chr5_131424299_f\t0\t+\n"
[docs] def test_subworkflow_duplicate(self):
duplicate_subworkflow_invocate_wf = """
format-version: "v2.0"
$graph:
- id: nested
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
inner_output:
outputSource: inner_cat/out_file1
steps:
inner_cat:
tool_id: cat
in:
input1: inner_input
queries_0|input2: inner_input
- id: main
class: GalaxyWorkflow
inputs:
outer_input: data
steps:
outer_cat:
tool_id: cat
in:
input1: outer_input
nested_workflow_1:
run: '#nested'
in:
inner_input: outer_cat/out_file1
nested_workflow_2:
run: '#nested'
in:
inner_input: nested_workflow_1/inner_output
"""
history_id = self.dataset_populator.new_history()
self._run_jobs(
duplicate_subworkflow_invocate_wf,
test_data={"outer_input": "hello world"},
history_id=history_id,
client_convert=False,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert content == "hello world\nhello world\nhello world\nhello world\n"
[docs] def test_pause(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
test_input:
type: input
first_cat:
tool_id: cat1
state:
input1:
$link: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
"""
)
self.workflow_populator.dump_workflow(workflow_id)
[docs] def test_implicit_connections(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
third_cat:
tool_id: cat1
connect:
$step: second_cat
state:
input1:
$link: test_input
"""
)
self.workflow_populator.dump_workflow(workflow_id)
[docs] def test_conditional_ints(self, history_id):
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
test_input:
tool_id: disambiguate_cond
state:
p3:
use: true
files:
attach_files: false
""",
test_data={},
history_id=history_id,
round_trip_format_conversion=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "no file specified" in content
assert "7 7 4" in content
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
test_input:
tool_id: disambiguate_cond
state:
p3:
use: true
p3v: 5
files:
attach_files: false
""",
test_data={},
history_id=history_id,
round_trip_format_conversion=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "no file specified" in content
assert "7 7 5" in content
[docs] def test_workflow_embed_tool(self):
history_id = self.dataset_populator.new_history()
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
- type: input
label: input1
- tool_id: cat1
label: first_cat
state:
input1:
$link: 0
- label: embed1
run:
class: GalaxyTool
command: echo 'hello world 2' > $output1
outputs:
output1:
format: txt
- tool_id: cat1
state:
input1:
$link: first_cat/out_file1
queries:
- input2:
$link: embed1/output1
test_data:
input1: "hello world"
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert content == "hello world\nhello world 2\n"
[docs] def test_workflow_import_tool(self):
history_id = self.dataset_populator.new_history()
workflow_path = os.path.join(WORKFLOWS_DIRECTORY, "embed_test_1.gxwf.yml")
jobs_descriptions = {"test_data": {"input1": "hello world"}}
self._run_jobs(workflow_path, source_type="path", jobs_descriptions=jobs_descriptions, history_id=history_id)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert content == "hello world\nhello world 2\n"
[docs] def test_parameter_default_rep(self):
workflow = self._upload_and_download(WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT)
int_input = self._steps_by_label(workflow)["int_input"]
int_input_state = json.loads(int_input["tool_state"])
assert int_input_state["default"] == 3
assert int_input_state["optional"] is True
assert int_input_state["parameter_type"] == "integer"
def _steps_by_label(self, workflow_as_dict):
by_label = {}
assert "steps" in workflow_as_dict, workflow_as_dict
for step in workflow_as_dict["steps"].values():
by_label[step["label"]] = step
return by_label