Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_workflow_extraction
import functools
import operator
from collections import namedtuple
from json import (
dumps,
loads,
)
from typing import Optional
from galaxy_test.base.populators import (
skip_without_tool,
summarize_instance_history_on_error,
)
from .test_workflows import BaseWorkflowsApiTestCase
[docs]class TestWorkflowExtractionApi(BaseWorkflowsApiTestCase):
[docs] @skip_without_tool("cat1")
@summarize_instance_history_on_error
def test_extract_from_history(self, history_id):
# Run the simple test workflow and extract it back out from history
cat1_job_id = self.__setup_and_run_cat1_workflow(history_id=history_id)
contents = self._history_contents(history_id)
input_hids = [c["hid"] for c in contents[0:2]]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_basic",
dataset_ids=input_hids,
job_ids=[cat1_job_id],
)
assert downloaded_workflow["name"] == "test import from history"
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @summarize_instance_history_on_error
def test_extract_with_copied_inputs(self, history_id):
old_history_id = self.dataset_populator.new_history()
# Run the simple test workflow and extract it back out from history
self.__setup_and_run_cat1_workflow(history_id=old_history_id)
# Bug cannot mess up hids or these don't extract correctly. See Trello card here:
# https://trello.com/c/mKzLbM2P
# # create dummy dataset to complicate hid mapping
# self.dataset_populator.new_dataset( history_id, content="dummydataset" )
# offset = 1
offset = 0
old_contents = self._history_contents(old_history_id)
for old_dataset in old_contents:
self.__copy_content_to_history(history_id, old_dataset)
new_contents = self._history_contents(history_id)
input_hids = [c["hid"] for c in new_contents[(offset + 0) : (offset + 2)]]
cat1_job_id = self.__job_id(history_id, new_contents[(offset + 2)]["id"])
def reimport_jobs_ids(new_history_id):
return [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "cat1"]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
dataset_ids=input_hids,
job_ids=[cat1_job_id],
)
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @summarize_instance_history_on_error
def test_extract_with_copied_inputs_reimported(self, history_id):
old_history_id = self.dataset_populator.new_history()
# Run the simple test workflow and extract it back out from history
self.__setup_and_run_cat1_workflow(history_id=old_history_id)
offset = 0
old_contents = self._history_contents(old_history_id)
for old_dataset in old_contents:
self.__copy_content_to_history(history_id, old_dataset)
new_contents = self._history_contents(history_id)
input_hids = [c["hid"] for c in new_contents[(offset + 0) : (offset + 2)]]
def reimport_jobs_ids(new_history_id):
return [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "cat1"]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_with_copied_inputs",
reimport_jobs_ids=reimport_jobs_ids,
dataset_ids=input_hids,
)
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @skip_without_tool("random_lines1")
@summarize_instance_history_on_error
def test_extract_mapping_workflow_from_history(self, history_id):
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair(history_id)
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_with_mapping",
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] def test_extract_copied_mapping_from_history(self, history_id):
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair(history_id)
new_history_id = self.dataset_populator.copy_history(history_id).json()["id"]
# API test is somewhat contrived since there is no good way
# to retrieve job_id1, job_id2 like this for copied dataset
# collections I don't think.
downloaded_workflow = self._extract_and_download_workflow(
new_history_id,
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] def test_extract_copied_mapping_from_history_reimported(self, history_id):
import unittest
raise unittest.SkipTest(
"Mapping connection for copied collections not yet implemented in history import/export"
)
old_history_id = self.dataset_populator.new_history() # type: ignore[unreachable]
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_singleton(old_history_id)
old_contents = self._history_contents(old_history_id)
for old_content in old_contents:
self.__copy_content_to_history(history_id, old_content)
def reimport_jobs_ids(new_history_id):
rval = [
j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "random_lines1"
]
assert len(rval) == 2
print(rval)
return rval
# API test is somewhat contrived since there is no good way
# to retrieve job_id1, job_id2 like this for copied dataset
# collections I don't think.
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_from_history_with_mapped_collection_reimport",
reimport_jobs_ids=reimport_jobs_ids,
reimport_wait_on_history_length=9, # see comments in _extract about eliminating this magic constant.
dataset_collection_ids=[hdca["hid"]],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] @skip_without_tool("random_lines1")
@skip_without_tool("multi_data_param")
def test_extract_reduction_from_history(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"], wait=True
).json()["outputs"][0]
hdca_id = hdca["id"]
inputs1 = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "num_lines": 2}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {
"f1": {"src": "hdca", "id": implicit_hdca1["id"]},
"f2": {"src": "hdca", "id": implicit_hdca1["id"]},
}
reduction_run_output = self.dataset_populator.run_tool(
tool_id="multi_data_param",
inputs=inputs2,
history_id=history_id,
)
job_id2 = reduction_run_output["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id2, assert_ok=True)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_with_reduction",
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
assert len(downloaded_workflow["steps"]) == 3
collect_step_idx = self._assert_first_step_is_paired_input(downloaded_workflow)
tool_steps = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=2)
random_lines_map_step = tool_steps[0]
reduction_step = tool_steps[1]
assert "tool_id" in random_lines_map_step, random_lines_map_step
assert random_lines_map_step["tool_id"] == "random_lines1", random_lines_map_step
assert "input_connections" in random_lines_map_step, random_lines_map_step
random_lines_input_connections = random_lines_map_step["input_connections"]
assert "input" in random_lines_input_connections, random_lines_map_step
random_lines_input = random_lines_input_connections["input"]
assert random_lines_input["id"] == collect_step_idx
reduction_step_input = reduction_step["input_connections"]["f1"]
assert reduction_step_input["id"] == random_lines_map_step["id"]
[docs] @skip_without_tool("collection_paired_test")
def test_extract_workflows_with_dataset_collections(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- tool_id: collection_paired_test
state:
f1:
$link: text_input1
test_data:
text_input1:
collection_type: paired
""",
history_id,
)
job_id = self._job_id_for_tool(jobs_summary.jobs, "collection_paired_test")
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_with_basic_collections",
dataset_collection_ids=["1"],
job_ids=[job_id],
)
self.__check_workflow(
downloaded_workflow,
step_count=2,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=["collection_paired_test"],
)
collection_step = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)[0]
collection_step_state = loads(collection_step["tool_state"])
assert collection_step_state["collection_type"] == "paired"
[docs] def test_empty_collection_map_over_extract_workflow(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""class: GalaxyWorkflow
inputs:
input: collection
filter_file: data
steps:
filter_collection:
tool_id: __FILTER_FROM_FILE__
in:
input: input
how|filter_source: filter_file
state:
how:
how_filter: remove_if_present
concat:
tool_id: cat1
in:
input1: filter_collection/output_filtered
test_data:
input:
collection_type: list
elements:
- identifier: i1
content: "0"
filter_file: i1""",
history_id,
wait=True,
)
response = self._post(
"workflows", data={"from_history_id": history_id, "workflow_name": "extract with empty collection test"}
)
assert response.status_code == 200
workflow_id = response.json()["id"]
workflow = self.workflow_populator.download_workflow(workflow_id)
assert workflow
# TODO: after adding request models we should be able to recover implicit collection job requests.
# assert len(workflow["steps"]) == 4
[docs] @skip_without_tool("cat_collection")
def test_subcollection_mapping(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- label: noop
tool_id: cat1
state:
input1:
$link: text_input1
- tool_id: cat_collection
state:
input1:
$link: noop/out_file1
test_data:
text_input1:
collection_type: "list:paired"
""",
history_id,
)
job1_id = self._job_id_for_tool(jobs_summary.jobs, "cat1")
job2_id = self._job_id_for_tool(jobs_summary.jobs, "cat_collection")
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_subcollection_mapping",
dataset_collection_ids=["1"],
job_ids=[job1_id, job2_id],
)
self.__check_workflow(
downloaded_workflow,
step_count=3,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=["cat_collection", "cat1"],
)
collection_step = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)[0]
collection_step_state = loads(collection_step["tool_state"])
assert collection_step_state["collection_type"] == "list:paired"
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_creates_dynamic_nested")
def test_subcollection_reduction(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
creates_nested_list:
tool_id: collection_creates_dynamic_nested
reduce_nested_list:
tool_id: cat_list
in:
input1: creates_nested_list/list_output
""",
history_id,
)
job1_id = self._job_id_for_tool(jobs_summary.jobs, "cat_list")
job2_id = self._job_id_for_tool(jobs_summary.jobs, "collection_creates_dynamic_nested")
self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_subcollection_reduction",
dataset_collection_ids=["1"],
job_ids=[job1_id, job2_id],
)
# TODO: refactor workflow extraction to not rely on HID, so we can actually properly connect
# this workflow
[docs] @skip_without_tool("collection_split_on_column")
def test_extract_workflow_with_output_collections(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input
- label: text_input2
type: input
- label: cat_inputs
tool_id: cat1
state:
input1:
$link: text_input1
queries:
- input2:
$link: text_input2
- label: split_up
tool_id: collection_split_on_column
state:
input1:
$link: cat_inputs/out_file1
- tool_id: cat_list
state:
input1:
$link: split_up/split_output
test_data:
text_input1: "samp1\t10.0\nsamp2\t20.0\n"
text_input2: "samp1\t30.0\nsamp2\t40.0\n"
""",
history_id,
)
tool_ids = ["cat1", "collection_split_on_column", "cat_list"]
job_ids = [functools.partial(self._job_id_for_tool, jobs_summary.jobs)(_) for _ in tool_ids]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_output_collections",
dataset_ids=["1", "2"],
job_ids=job_ids,
)
self.__check_workflow(
downloaded_workflow,
step_count=5,
verify_connected=True,
data_input_count=2,
data_collection_input_count=0,
tool_ids=tool_ids,
)
[docs] @skip_without_tool("collection_creates_pair")
@summarize_instance_history_on_error
def test_extract_with_mapped_output_collections(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- label: cat_inputs
tool_id: cat1
state:
input1:
$link: text_input1
- label: pair_off
tool_id: collection_creates_pair
state:
input1:
$link: cat_inputs/out_file1
- label: cat_pairs
tool_id: cat_collection
state:
input1:
$link: pair_off/paired_output
- tool_id: cat_list
state:
input1:
$link: cat_pairs/out_file1
test_data:
text_input1:
collection_type: list
elements:
- identifier: samp1
content: "samp1\t10.0\nsamp2\t20.0\n"
- identifier: samp2
content: "samp1\t30.0\nsamp2\t40.0\n"
""",
history_id,
)
tool_ids = ["cat1", "collection_creates_pair", "cat_collection", "cat_list"]
job_ids = [functools.partial(self._job_id_for_tool, jobs_summary.jobs)(_) for _ in tool_ids]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_mapped_output_collections",
dataset_collection_ids=["1"],
job_ids=job_ids,
)
self.__check_workflow(
downloaded_workflow,
step_count=5,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=tool_ids,
)
def _job_id_for_tool(self, jobs, tool_id):
return self._job_for_tool(jobs, tool_id)["id"]
def _job_for_tool(self, jobs, tool_id):
tool_jobs = [j for j in jobs if j["tool_id"] == tool_id]
if not tool_jobs:
raise ValueError(f"Failed to find job for tool {tool_id}")
# if len( tool_jobs ) > 1:
# assert False, "Found multiple jobs for tool %s" % tool_id
return tool_jobs[-1]
def __run_random_lines_mapped_over_pair(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"], wait=True
).json()["outputs"][0]
hdca_id = hdca["id"]
inputs1 = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "num_lines": 2}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {"input": {"batch": True, "values": [{"src": "hdca", "id": implicit_hdca1["id"]}]}, "num_lines": 1}
_, job_id2 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs2)
return hdca, job_id1, job_id2
def __run_random_lines_mapped_over_singleton(self, history_id):
hdca = self.dataset_collection_populator.create_list_in_history(history_id, contents=["1 2 3\n4 5 6"]).json()
hdca_id = hdca["id"]
inputs1 = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "num_lines": 2}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {"input": {"batch": True, "values": [{"src": "hdca", "id": implicit_hdca1["id"]}]}, "num_lines": 1}
_, job_id2 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs2)
return hdca, job_id1, job_id2
def __assert_looks_like_randomlines_mapping_workflow(self, downloaded_workflow):
# Assert workflow is input connected to a tool step with one output
# connected to another tool step.
assert len(downloaded_workflow["steps"]) == 3
collect_step_idx = self._assert_first_step_is_paired_input(downloaded_workflow)
tool_steps = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=2)
tool_step_idxs = []
tool_input_step_idxs = []
for tool_step in tool_steps:
self._assert_has_key(tool_step["input_connections"], "input")
input_step_idx = tool_step["input_connections"]["input"]["id"]
tool_step_idxs.append(tool_step["id"])
tool_input_step_idxs.append(input_step_idx)
assert collect_step_idx not in tool_step_idxs
assert tool_input_step_idxs[0] == collect_step_idx
assert tool_input_step_idxs[1] == tool_step_idxs[0]
def __assert_looks_like_cat1_example_workflow(self, downloaded_workflow):
assert len(downloaded_workflow["steps"]) == 3
input_steps = self._get_steps_of_type(downloaded_workflow, "data_input", expected_len=2)
tool_step = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=1)[0]
input1 = tool_step["input_connections"]["input1"]
input2 = tool_step["input_connections"]["queries_0|input2"]
assert input_steps[0]["id"] == input1["id"]
assert input_steps[1]["id"] == input2["id"]
def _history_contents(self, history_id: str):
return self._get(f"histories/{history_id}/contents").json()
def __copy_content_to_history(self, history_id, content):
if content["history_content_type"] == "dataset":
payload = dict(source="hda", content=content["id"])
response = self._post(f"histories/{history_id}/contents/datasets", payload, json=True)
else:
payload = dict(source="hdca", content=content["id"])
response = self._post(f"histories/{history_id}/contents/dataset_collections", payload, json=True)
self._assert_status_code_is(response, 200)
return response.json()
def __setup_and_run_cat1_workflow(self, history_id):
workflow = self.workflow_populator.load_workflow(name="test_for_extract")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow, history_id=history_id)
run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request, json=True)
self._assert_status_code_is(run_workflow_response, 200)
invocation_response = run_workflow_response.json()
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id=workflow_id, invocation_id=invocation_response["id"]
)
return self.__cat_job_id(history_id)
def _assert_first_step_is_paired_input(self, downloaded_workflow):
collection_steps = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)
collection_step = collection_steps[0]
collection_step_state = loads(collection_step["tool_state"])
assert collection_step_state["collection_type"] == "paired"
collect_step_idx = collection_step["id"]
return collect_step_idx
def _extract_and_download_workflow(self, history_id: str, **extract_payload):
if reimport_as := extract_payload.get("reimport_as"):
history_name = reimport_as
self.dataset_populator.wait_for_history(history_id)
self.dataset_populator.rename_history(history_id, history_name)
history_length = extract_payload.get("reimport_wait_on_history_length")
if history_length is None:
# sometimes this won't be the same (i.e. datasets copied from outside the history
# that need to be included in target history for collections), but we can provide
# a reasonable default for fully in-history imports.
history_length = self.dataset_populator.history_length(history_id)
new_history_id = self.dataset_populator.reimport_history(
history_id,
history_name,
wait_on_history_length=history_length,
export_kwds={},
)
# wait a little more for those jobs, todo fix to wait for history imported false or
# for a specific number of jobs...
import time
time.sleep(1)
if "reimport_jobs_ids" in extract_payload:
new_history_job_ids = extract_payload["reimport_jobs_ids"](new_history_id)
extract_payload["job_ids"] = new_history_job_ids
else:
# Assume no copying or anything so just straight map job ids by index.
# Jobs are created after datasets, need to also wait on those...
history_jobs = [
j for j in self.dataset_populator.history_jobs(history_id) if j["tool_id"] != "__EXPORT_HISTORY__"
]
new_history_jobs = [
j
for j in self.dataset_populator.history_jobs(new_history_id)
if j["tool_id"] != "__EXPORT_HISTORY__"
]
history_job_ids = [j["id"] for j in history_jobs]
new_history_job_ids = [j["id"] for j in new_history_jobs]
assert len(history_job_ids) == len(new_history_job_ids)
if "job_ids" in extract_payload:
job_ids = extract_payload["job_ids"]
new_job_ids = []
for job_id in job_ids:
new_job_ids.append(new_history_job_ids[history_job_ids.index(job_id)])
extract_payload["job_ids"] = new_job_ids
history_id = new_history_id
if "from_history_id" not in extract_payload:
extract_payload["from_history_id"] = history_id
if "workflow_name" not in extract_payload:
extract_payload["workflow_name"] = "test import from history"
for key in "job_ids", "dataset_ids", "dataset_collection_ids":
if key in extract_payload:
value = extract_payload[key]
if isinstance(value, list):
extract_payload[key] = dumps(value)
create_workflow_response = self._post("workflows", data=extract_payload)
self._assert_status_code_is(create_workflow_response, 200)
new_workflow_id = create_workflow_response.json()["id"]
download_response = self._get(f"workflows/{new_workflow_id}/download")
self._assert_status_code_is(download_response, 200)
downloaded_workflow = download_response.json()
return downloaded_workflow
def _get_steps_of_type(self, downloaded_workflow, type: str, expected_len: Optional[int] = None):
steps = [s for s in downloaded_workflow["steps"].values() if s["type"] == type]
if expected_len is not None:
n = len(steps)
assert n == expected_len, f"Expected {expected_len} steps of type {type}, found {n}"
return sorted(steps, key=operator.itemgetter("id"))
def __job_id(self, history_id, dataset_id):
url = f"histories/{history_id}/contents/{dataset_id}/provenance"
prov_response = self._get(url, data=dict(follow=False))
self._assert_status_code_is(prov_response, 200)
return prov_response.json()["job_id"]
def __cat_job_id(self, history_id: str):
data = dict(history_id=history_id, tool_id="cat1")
jobs_response = self._get("jobs", data=data)
self._assert_status_code_is(jobs_response, 200)
cat1_job_id = jobs_response.json()[0]["id"]
return cat1_job_id
def _run_tool_get_collection_and_job_id(self, history_id: str, tool_id, inputs):
run_output1 = self.dataset_populator.run_tool(
tool_id=tool_id,
inputs=inputs,
history_id=history_id,
)
implicit_hdca = run_output1["implicit_collections"][0]
job_id = run_output1["jobs"][0]["id"]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
return implicit_hdca, job_id
def __check_workflow(
self,
workflow,
step_count: Optional[int] = None,
verify_connected: bool = False,
data_input_count: Optional[int] = None,
data_collection_input_count: Optional[int] = None,
tool_ids=None,
):
steps = workflow["steps"]
if step_count is not None:
assert len(steps) == step_count
if verify_connected:
self.__assert_connected(workflow, steps)
if tool_ids is not None:
tool_steps = self._get_steps_of_type(workflow, "tool")
found_steps = set(map(operator.itemgetter("tool_id"), tool_steps))
expected_steps = set(tool_ids)
assert found_steps == expected_steps
if data_input_count is not None:
self._get_steps_of_type(workflow, "data_input", expected_len=data_input_count)
if data_collection_input_count is not None:
self._get_steps_of_type(workflow, "data_collection_input", expected_len=data_collection_input_count)
def __assert_connected(self, workflow, steps):
disconnected_inputs = []
for value in steps.values():
if value["type"] == "tool":
input_connections = value["input_connections"]
if not input_connections:
disconnected_inputs.append(value)
if disconnected_inputs:
template = "%d steps disconnected in extracted workflow - disconnectect steps are %s - workflow is %s"
message = template % (len(disconnected_inputs), disconnected_inputs, workflow)
raise AssertionError(message)
RunJobsSummary = namedtuple("RunJobsSummary", ["history_id", "workflow_id", "inputs", "jobs"])