Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_workflow_extraction
import functools
import operator
from collections import namedtuple
from json import (
dumps,
loads,
)
from galaxy_test.base.populators import (
skip_without_tool,
summarize_instance_history_on_error,
)
from .test_workflows import BaseWorkflowsApiTestCase
[docs]class TestWorkflowExtractionApi(BaseWorkflowsApiTestCase):
[docs] @skip_without_tool("cat1")
@summarize_instance_history_on_error
def test_extract_from_history(self, history_id):
# Run the simple test workflow and extract it back out from history
cat1_job_id = self.__setup_and_run_cat1_workflow(history_id=history_id)
contents = self._history_contents(history_id)
input_hids = [c["hid"] for c in contents[0:2]]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_basic",
dataset_ids=input_hids,
job_ids=[cat1_job_id],
)
assert downloaded_workflow["name"] == "test import from history"
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @summarize_instance_history_on_error
def test_extract_with_copied_inputs(self, history_id):
old_history_id = self.dataset_populator.new_history()
# Run the simple test workflow and extract it back out from history
self.__setup_and_run_cat1_workflow(history_id=old_history_id)
# Bug cannot mess up hids or these don't extract correctly. See Trello card here:
# https://trello.com/c/mKzLbM2P
# # create dummy dataset to complicate hid mapping
# self.dataset_populator.new_dataset( history_id, content="dummydataset" )
# offset = 1
offset = 0
old_contents = self._history_contents(old_history_id)
for old_dataset in old_contents:
self.__copy_content_to_history(history_id, old_dataset)
new_contents = self._history_contents(history_id)
input_hids = [c["hid"] for c in new_contents[(offset + 0) : (offset + 2)]]
cat1_job_id = self.__job_id(history_id, new_contents[(offset + 2)]["id"])
def reimport_jobs_ids(new_history_id):
return [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "cat1"]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
dataset_ids=input_hids,
job_ids=[cat1_job_id],
)
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @summarize_instance_history_on_error
def test_extract_with_copied_inputs_reimported(self, history_id):
old_history_id = self.dataset_populator.new_history()
# Run the simple test workflow and extract it back out from history
self.__setup_and_run_cat1_workflow(history_id=old_history_id)
offset = 0
old_contents = self._history_contents(old_history_id)
for old_dataset in old_contents:
self.__copy_content_to_history(history_id, old_dataset)
new_contents = self._history_contents(history_id)
input_hids = [c["hid"] for c in new_contents[(offset + 0) : (offset + 2)]]
def reimport_jobs_ids(new_history_id):
return [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "cat1"]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_with_copied_inputs",
reimport_jobs_ids=reimport_jobs_ids,
dataset_ids=input_hids,
)
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @skip_without_tool("random_lines1")
@summarize_instance_history_on_error
def test_extract_mapping_workflow_from_history(self, history_id):
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair(history_id)
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_with_mapping",
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] def test_extract_copied_mapping_from_history(self, history_id):
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair(history_id)
new_history_id = self.dataset_populator.copy_history(history_id).json()["id"]
# API test is somewhat contrived since there is no good way
# to retrieve job_id1, job_id2 like this for copied dataset
# collections I don't think.
downloaded_workflow = self._extract_and_download_workflow(
new_history_id,
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] def test_extract_copied_mapping_from_history_reimported(self, history_id):
import unittest
raise unittest.SkipTest(
"Mapping connection for copied collections not yet implemented in history import/export"
)
old_history_id = self.dataset_populator.new_history()
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_singleton(old_history_id)
old_contents = self._history_contents(old_history_id)
for old_content in old_contents:
self.__copy_content_to_history(history_id, old_content)
def reimport_jobs_ids(new_history_id):
rval = [
j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "random_lines1"
]
assert len(rval) == 2
print(rval)
return rval
# API test is somewhat contrived since there is no good way
# to retrieve job_id1, job_id2 like this for copied dataset
# collections I don't think.
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_from_history_with_mapped_collection_reimport",
reimport_jobs_ids=reimport_jobs_ids,
reimport_wait_on_history_length=9, # see comments in _extract about eliminating this magic constant.
dataset_collection_ids=[hdca["hid"]],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] @skip_without_tool("random_lines1")
@skip_without_tool("multi_data_param")
def test_extract_reduction_from_history(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"], wait=True
).json()["outputs"][0]
hdca_id = hdca["id"]
inputs1 = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "num_lines": 2}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {
"f1": {"src": "hdca", "id": implicit_hdca1["id"]},
"f2": {"src": "hdca", "id": implicit_hdca1["id"]},
}
reduction_run_output = self.dataset_populator.run_tool(
tool_id="multi_data_param",
inputs=inputs2,
history_id=history_id,
)
job_id2 = reduction_run_output["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id2, assert_ok=True)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_with_reduction",
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
assert len(downloaded_workflow["steps"]) == 3
collect_step_idx = self._assert_first_step_is_paired_input(downloaded_workflow)
tool_steps = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=2)
random_lines_map_step = tool_steps[0]
reduction_step = tool_steps[1]
assert "tool_id" in random_lines_map_step, random_lines_map_step
assert random_lines_map_step["tool_id"] == "random_lines1", random_lines_map_step
assert "input_connections" in random_lines_map_step, random_lines_map_step
random_lines_input_connections = random_lines_map_step["input_connections"]
assert "input" in random_lines_input_connections, random_lines_map_step
random_lines_input = random_lines_input_connections["input"]
assert random_lines_input["id"] == collect_step_idx
reduction_step_input = reduction_step["input_connections"]["f1"]
assert reduction_step_input["id"] == random_lines_map_step["id"]
[docs] @skip_without_tool("collection_paired_test")
def test_extract_workflows_with_dataset_collections(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- tool_id: collection_paired_test
state:
f1:
$link: text_input1
test_data:
text_input1:
collection_type: paired
""",
history_id,
)
job_id = self._job_id_for_tool(jobs_summary.jobs, "collection_paired_test")
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="extract_from_history_with_basic_collections",
dataset_collection_ids=["1"],
job_ids=[job_id],
)
self.__check_workflow(
downloaded_workflow,
step_count=2,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=["collection_paired_test"],
)
collection_step = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)[0]
collection_step_state = loads(collection_step["tool_state"])
assert collection_step_state["collection_type"] == "paired"
[docs] @skip_without_tool("cat_collection")
def test_subcollection_mapping(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- label: noop
tool_id: cat1
state:
input1:
$link: text_input1
- tool_id: cat_collection
state:
input1:
$link: noop/out_file1
test_data:
text_input1:
collection_type: "list:paired"
""",
history_id,
)
job1_id = self._job_id_for_tool(jobs_summary.jobs, "cat1")
job2_id = self._job_id_for_tool(jobs_summary.jobs, "cat_collection")
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_subcollection_mapping",
dataset_collection_ids=["1"],
job_ids=[job1_id, job2_id],
)
self.__check_workflow(
downloaded_workflow,
step_count=3,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=["cat_collection", "cat1"],
)
collection_step = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)[0]
collection_step_state = loads(collection_step["tool_state"])
assert collection_step_state["collection_type"] == "list:paired"
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_creates_dynamic_nested")
def test_subcollection_reduction(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
creates_nested_list:
tool_id: collection_creates_dynamic_nested
reduce_nested_list:
tool_id: cat_list
in:
input1: creates_nested_list/list_output
""",
history_id,
)
job1_id = self._job_id_for_tool(jobs_summary.jobs, "cat_list")
job2_id = self._job_id_for_tool(jobs_summary.jobs, "collection_creates_dynamic_nested")
self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_subcollection_reduction",
dataset_collection_ids=["1"],
job_ids=[job1_id, job2_id],
)
# TODO: refactor workflow extraction to not rely on HID, so we can actually properly connect
# this workflow
[docs] @skip_without_tool("collection_split_on_column")
def test_extract_workflow_with_output_collections(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input
- label: text_input2
type: input
- label: cat_inputs
tool_id: cat1
state:
input1:
$link: text_input1
queries:
- input2:
$link: text_input2
- label: split_up
tool_id: collection_split_on_column
state:
input1:
$link: cat_inputs/out_file1
- tool_id: cat_list
state:
input1:
$link: split_up/split_output
test_data:
text_input1: "samp1\t10.0\nsamp2\t20.0\n"
text_input2: "samp1\t30.0\nsamp2\t40.0\n"
""",
history_id,
)
tool_ids = ["cat1", "collection_split_on_column", "cat_list"]
job_ids = [functools.partial(self._job_id_for_tool, jobs_summary.jobs)(_) for _ in tool_ids]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_output_collections",
dataset_ids=["1", "2"],
job_ids=job_ids,
)
self.__check_workflow(
downloaded_workflow,
step_count=5,
verify_connected=True,
data_input_count=2,
data_collection_input_count=0,
tool_ids=tool_ids,
)
[docs] @skip_without_tool("collection_creates_pair")
@summarize_instance_history_on_error
def test_extract_with_mapped_output_collections(self, history_id):
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- label: cat_inputs
tool_id: cat1
state:
input1:
$link: text_input1
- label: pair_off
tool_id: collection_creates_pair
state:
input1:
$link: cat_inputs/out_file1
- label: cat_pairs
tool_id: cat_collection
state:
input1:
$link: pair_off/paired_output
- tool_id: cat_list
state:
input1:
$link: cat_pairs/out_file1
test_data:
text_input1:
collection_type: list
elements:
- identifier: samp1
content: "samp1\t10.0\nsamp2\t20.0\n"
- identifier: samp2
content: "samp1\t30.0\nsamp2\t40.0\n"
""",
history_id,
)
tool_ids = ["cat1", "collection_creates_pair", "cat_collection", "cat_list"]
job_ids = [functools.partial(self._job_id_for_tool, jobs_summary.jobs)(_) for _ in tool_ids]
downloaded_workflow = self._extract_and_download_workflow(
history_id,
reimport_as="test_extract_workflows_with_mapped_output_collections",
dataset_collection_ids=["1"],
job_ids=job_ids,
)
self.__check_workflow(
downloaded_workflow,
step_count=5,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=tool_ids,
)
def _job_id_for_tool(self, jobs, tool_id):
return self._job_for_tool(jobs, tool_id)["id"]
def _job_for_tool(self, jobs, tool_id):
tool_jobs = [j for j in jobs if j["tool_id"] == tool_id]
if not tool_jobs:
raise ValueError(f"Failed to find job for tool {tool_id}")
# if len( tool_jobs ) > 1:
# assert False, "Found multiple jobs for tool %s" % tool_id
return tool_jobs[-1]
def __run_random_lines_mapped_over_pair(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"], wait=True
).json()["outputs"][0]
hdca_id = hdca["id"]
inputs1 = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "num_lines": 2}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {"input": {"batch": True, "values": [{"src": "hdca", "id": implicit_hdca1["id"]}]}, "num_lines": 1}
_, job_id2 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs2)
return hdca, job_id1, job_id2
def __run_random_lines_mapped_over_singleton(self, history_id):
hdca = self.dataset_collection_populator.create_list_in_history(history_id, contents=["1 2 3\n4 5 6"]).json()
hdca_id = hdca["id"]
inputs1 = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "num_lines": 2}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {"input": {"batch": True, "values": [{"src": "hdca", "id": implicit_hdca1["id"]}]}, "num_lines": 1}
_, job_id2 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs2)
return hdca, job_id1, job_id2
def __assert_looks_like_randomlines_mapping_workflow(self, downloaded_workflow):
# Assert workflow is input connected to a tool step with one output
# connected to another tool step.
assert len(downloaded_workflow["steps"]) == 3
collect_step_idx = self._assert_first_step_is_paired_input(downloaded_workflow)
tool_steps = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=2)
tool_step_idxs = []
tool_input_step_idxs = []
for tool_step in tool_steps:
self._assert_has_key(tool_step["input_connections"], "input")
input_step_idx = tool_step["input_connections"]["input"]["id"]
tool_step_idxs.append(tool_step["id"])
tool_input_step_idxs.append(input_step_idx)
assert collect_step_idx not in tool_step_idxs
assert tool_input_step_idxs[0] == collect_step_idx
assert tool_input_step_idxs[1] == tool_step_idxs[0]
def __assert_looks_like_cat1_example_workflow(self, downloaded_workflow):
assert len(downloaded_workflow["steps"]) == 3
input_steps = self._get_steps_of_type(downloaded_workflow, "data_input", expected_len=2)
tool_step = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=1)[0]
input1 = tool_step["input_connections"]["input1"]
input2 = tool_step["input_connections"]["queries_0|input2"]
assert input_steps[0]["id"] == input1["id"]
assert input_steps[1]["id"] == input2["id"]
def _history_contents(self, history_id: str):
return self._get(f"histories/{history_id}/contents").json()
def __copy_content_to_history(self, history_id, content):
if content["history_content_type"] == "dataset":
payload = dict(source="hda", content=content["id"])
response = self._post(f"histories/{history_id}/contents/datasets", payload, json=True)
else:
payload = dict(source="hdca", content=content["id"])
response = self._post(f"histories/{history_id}/contents/dataset_collections", payload, json=True)
self._assert_status_code_is(response, 200)
return response.json()
def __setup_and_run_cat1_workflow(self, history_id):
workflow = self.workflow_populator.load_workflow(name="test_for_extract")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow, history_id=history_id)
run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
invocation_response = run_workflow_response.json()
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id=workflow_id, invocation_id=invocation_response["id"]
)
return self.__cat_job_id(history_id)
def _assert_first_step_is_paired_input(self, downloaded_workflow):
collection_steps = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)
collection_step = collection_steps[0]
collection_step_state = loads(collection_step["tool_state"])
assert collection_step_state["collection_type"] == "paired"
collect_step_idx = collection_step["id"]
return collect_step_idx
def _extract_and_download_workflow(self, history_id: str, **extract_payload):
reimport_as = extract_payload.get("reimport_as")
if reimport_as:
history_name = reimport_as
self.dataset_populator.wait_for_history(history_id)
self.dataset_populator.rename_history(history_id, history_name)
history_length = extract_payload.get("reimport_wait_on_history_length")
if history_length is None:
# sometimes this won't be the same (i.e. datasets copied from outside the history
# that need to be included in target history for collections), but we can provide
# a reasonable default for fully in-history imports.
history_length = self.dataset_populator.history_length(history_id)
new_history_id = self.dataset_populator.reimport_history(
history_id,
history_name,
wait_on_history_length=history_length,
export_kwds={},
)
# wait a little more for those jobs, todo fix to wait for history imported false or
# for a specific number of jobs...
import time
time.sleep(1)
if "reimport_jobs_ids" in extract_payload:
new_history_job_ids = extract_payload["reimport_jobs_ids"](new_history_id)
extract_payload["job_ids"] = new_history_job_ids
else:
# Assume no copying or anything so just straight map job ids by index.
# Jobs are created after datasets, need to also wait on those...
history_jobs = [
j for j in self.dataset_populator.history_jobs(history_id) if j["tool_id"] != "__EXPORT_HISTORY__"
]
new_history_jobs = [
j
for j in self.dataset_populator.history_jobs(new_history_id)
if j["tool_id"] != "__EXPORT_HISTORY__"
]
history_job_ids = [j["id"] for j in history_jobs]
new_history_job_ids = [j["id"] for j in new_history_jobs]
assert len(history_job_ids) == len(new_history_job_ids)
if "job_ids" in extract_payload:
job_ids = extract_payload["job_ids"]
new_job_ids = []
for job_id in job_ids:
new_job_ids.append(new_history_job_ids[history_job_ids.index(job_id)])
extract_payload["job_ids"] = new_job_ids
history_id = new_history_id
if "from_history_id" not in extract_payload:
extract_payload["from_history_id"] = history_id
if "workflow_name" not in extract_payload:
extract_payload["workflow_name"] = "test import from history"
for key in "job_ids", "dataset_ids", "dataset_collection_ids":
if key in extract_payload:
value = extract_payload[key]
if isinstance(value, list):
extract_payload[key] = dumps(value)
create_workflow_response = self._post("workflows", data=extract_payload)
self._assert_status_code_is(create_workflow_response, 200)
new_workflow_id = create_workflow_response.json()["id"]
download_response = self._get(f"workflows/{new_workflow_id}/download")
self._assert_status_code_is(download_response, 200)
downloaded_workflow = download_response.json()
return downloaded_workflow
def _get_steps_of_type(self, downloaded_workflow, type, expected_len=None):
steps = [s for s in downloaded_workflow["steps"].values() if s["type"] == type]
if expected_len is not None:
n = len(steps)
assert n == expected_len, "Expected %d steps of type %s, found %d" % (expected_len, type, n)
return sorted(steps, key=operator.itemgetter("id"))
def __job_id(self, history_id, dataset_id):
url = f"histories/{history_id}/contents/{dataset_id}/provenance"
prov_response = self._get(url, data=dict(follow=False))
self._assert_status_code_is(prov_response, 200)
return prov_response.json()["job_id"]
def __cat_job_id(self, history_id: str):
data = dict(history_id=history_id, tool_id="cat1")
jobs_response = self._get("jobs", data=data)
self._assert_status_code_is(jobs_response, 200)
cat1_job_id = jobs_response.json()[0]["id"]
return cat1_job_id
def _run_tool_get_collection_and_job_id(self, history_id: str, tool_id, inputs):
run_output1 = self.dataset_populator.run_tool(
tool_id=tool_id,
inputs=inputs,
history_id=history_id,
)
implicit_hdca = run_output1["implicit_collections"][0]
job_id = run_output1["jobs"][0]["id"]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
return implicit_hdca, job_id
def __check_workflow(
self,
workflow,
step_count=None,
verify_connected=False,
data_input_count=None,
data_collection_input_count=None,
tool_ids=None,
):
steps = workflow["steps"]
if step_count is not None:
assert len(steps) == step_count
if verify_connected:
self.__assert_connected(workflow, steps)
if tool_ids is not None:
tool_steps = self._get_steps_of_type(workflow, "tool")
found_steps = set(map(operator.itemgetter("tool_id"), tool_steps))
expected_steps = set(tool_ids)
assert found_steps == expected_steps
if data_input_count is not None:
self._get_steps_of_type(workflow, "data_input", expected_len=data_input_count)
if data_collection_input_count is not None:
self._get_steps_of_type(workflow, "data_collection_input", expected_len=data_collection_input_count)
def __assert_connected(self, workflow, steps):
disconnected_inputs = []
for value in steps.values():
if value["type"] == "tool":
input_connections = value["input_connections"]
if not input_connections:
disconnected_inputs.append(value)
if disconnected_inputs:
template = "%d steps disconnected in extracted workflow - disconnectect steps are %s - workflow is %s"
message = template % (len(disconnected_inputs), disconnected_inputs, workflow)
raise AssertionError(message)
RunJobsSummary = namedtuple("RunJobsSummary", ["history_id", "workflow_id", "inputs", "jobs"])