import json
import os
from galaxy_test.base.workflow_fixtures import (
WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
WORKFLOW_RUNTIME_PARAMETER_SIMPLE,
WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES,
WORKFLOW_SIMPLE_CAT_TWICE,
WORKFLOW_WITH_OUTPUT_ACTIONS,
WORKFLOW_WITH_OUTPUTS,
)
from .test_workflows import BaseWorkflowsApiTestCase
WORKFLOWS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
[docs]class TestWorkflowsFromYamlApi(BaseWorkflowsApiTestCase):
[docs] def setUp(self):
super().setUp()
def _upload_and_download(self, yaml_workflow, **kwds):
style = None
if "style" in kwds:
style = kwds.pop("style")
workflow_id = self._upload_yaml_workflow(yaml_workflow, **kwds)
return self.workflow_populator.download_workflow(workflow_id, style=style)
[docs] def test_simple_upload(self):
workflow = self._upload_and_download(WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES, client_convert=False)
assert workflow["annotation"].startswith("Simple workflow that ")
tool_count = {"random_lines1": 0, "cat1": 0}
input_found = False
for step in workflow["steps"].values():
step_type = step["type"]
if step_type == "data_input":
assert step["label"] == "the_input"
input_found = True
else:
tool_id = step["tool_id"]
tool_count[tool_id] += 1
if tool_id == "random_lines1":
assert step["label"] == "random_line_label"
assert input_found
assert tool_count["random_lines1"] == 1
assert tool_count["cat1"] == 2
workflow_as_format2 = self._upload_and_download(
WORKFLOW_SIMPLE_CAT_AND_RANDOM_LINES, client_convert=False, style="format2"
)
assert workflow_as_format2["doc"].startswith("Simple workflow that")
[docs] def test_simple_output_actions(self):
history_id = self.dataset_populator.new_history()
self._run_jobs(
WORKFLOW_WITH_OUTPUT_ACTIONS,
test_data="""
input1: "hello world"
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert not details1["visible"]
assert details1["name"] == "the new value", details1
details2 = self.dataset_populator.get_history_dataset_details(history_id, hid=3)
assert details2["visible"]
[docs] def test_outputs(self):
workflow_id = self._upload_yaml_workflow(WORKFLOW_WITH_OUTPUTS, round_trip_format_conversion=True)
workflow = self._get(f"workflows/{workflow_id}/download").json()
assert workflow["steps"]["1"]["workflow_outputs"][0]["output_name"] == "out_file1"
assert workflow["steps"]["1"]["workflow_outputs"][0]["label"] == "wf_output_1"
workflow = self.workflow_populator.download_workflow(workflow_id, style="format2")
[docs] def test_subworkflow_simple(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
outer_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
steps:
- tool_id: random_lines1
state:
num_lines: 1
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
in:
inner_input: first_cat/out_file1
""",
client_convert=False,
)
workflow = self.workflow_populator.download_workflow(workflow_id)
by_label = self._steps_by_label(workflow)
if "nested_workflow" not in by_label:
template = "Workflow [%s] does not contain label 'nested_workflow'."
message = template % workflow
raise AssertionError(message)
subworkflow_step = by_label["nested_workflow"]
assert subworkflow_step["type"] == "subworkflow"
assert len(subworkflow_step["subworkflow"]["steps"]) == 2
subworkflow_connections = subworkflow_step["input_connections"]
assert len(subworkflow_connections) == 1
subworkflow_connection = subworkflow_connections["inner_input"]
assert subworkflow_connection["input_subworkflow_step_id"] == 0
workflow_reupload_id = self.import_workflow(workflow)["id"]
workflow_reupload = self._get(f"workflows/{workflow_reupload_id}/download").json()
by_label = self._steps_by_label(workflow_reupload)
subworkflow_step = by_label["nested_workflow"]
assert subworkflow_step["type"] == "subworkflow"
assert len(subworkflow_step["subworkflow"]["steps"]) == 2
subworkflow_connections = subworkflow_step["input_connections"]
assert len(subworkflow_connections) == 1
subworkflow_connection = subworkflow_connections["inner_input"]
assert subworkflow_connection["input_subworkflow_step_id"] == 0
# content = self.dataset_populator.get_history_dataset_content( history_id )
# assert content == "chr5\t131424298\t131424460\tCCDS4149.1_cds_0_0_chr5_131424299_f\t0\t+\n"
[docs] def test_subworkflow_duplicate(self):
duplicate_subworkflow_invocate_wf = """
format-version: "v2.0"
$graph:
- id: nested
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
inner_output:
outputSource: inner_cat/out_file1
steps:
inner_cat:
tool_id: cat
in:
input1: inner_input
queries_0|input2: inner_input
- id: main
class: GalaxyWorkflow
inputs:
outer_input: data
steps:
outer_cat:
tool_id: cat
in:
input1: outer_input
nested_workflow_1:
run: '#nested'
in:
inner_input: outer_cat/out_file1
nested_workflow_2:
run: '#nested'
in:
inner_input: nested_workflow_1/inner_output
"""
history_id = self.dataset_populator.new_history()
self._run_jobs(
duplicate_subworkflow_invocate_wf,
test_data={"outer_input": "hello world"},
history_id=history_id,
client_convert=False,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert content == "hello world\nhello world\nhello world\nhello world\n"
[docs] def test_pause(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
test_input:
type: input
first_cat:
tool_id: cat1
state:
input1:
$link: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
"""
)
self.workflow_populator.dump_workflow(workflow_id)
[docs] def test_implicit_connections(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
third_cat:
tool_id: cat1
connect:
$step: second_cat
state:
input1:
$link: test_input
"""
)
self.workflow_populator.dump_workflow(workflow_id)
[docs] def test_conditional_ints(self, history_id):
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
test_input:
tool_id: disambiguate_cond
state:
p3:
use: true
files:
attach_files: false
""",
test_data={},
history_id=history_id,
round_trip_format_conversion=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "no file specified" in content
assert "7 7 4" in content
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
test_input:
tool_id: disambiguate_cond
state:
p3:
use: true
p3v: 5
files:
attach_files: false
""",
test_data={},
history_id=history_id,
round_trip_format_conversion=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "no file specified" in content
assert "7 7 5" in content
[docs] def test_parameter_default_rep(self):
workflow = self._upload_and_download(WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT)
int_input = self._steps_by_label(workflow)["int_input"]
int_input_state = json.loads(int_input["tool_state"])
assert int_input_state["default"] == 3
assert int_input_state["optional"] is True
assert int_input_state["parameter_type"] == "integer"
def _steps_by_label(self, workflow_as_dict):
by_label = {}
assert "steps" in workflow_as_dict, workflow_as_dict
for step in workflow_as_dict["steps"].values():
by_label[step["label"]] = step
return by_label