Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_workflow_extraction
import functools
import operator
from collections import namedtuple
from json import dumps, loads
from galaxy_test.base.populators import skip_without_tool, summarize_instance_history_on_error
from .test_workflows import BaseWorkflowsApiTestCase
[docs]class WorkflowExtractionApiTestCase(BaseWorkflowsApiTestCase):
[docs] @skip_without_tool("cat1")
@summarize_instance_history_on_error
def test_extract_from_history(self):
# Run the simple test workflow and extract it back out from history
cat1_job_id = self.__setup_and_run_cat1_workflow(history_id=self.history_id)
contents = self._history_contents()
input_hids = [c["hid"] for c in contents[0:2]]
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="extract_from_history_basic",
dataset_ids=input_hids,
job_ids=[cat1_job_id],
)
self.assertEqual(downloaded_workflow["name"], "test import from history")
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @summarize_instance_history_on_error
def test_extract_with_copied_inputs(self):
old_history_id = self.dataset_populator.new_history()
# Run the simple test workflow and extract it back out from history
self.__setup_and_run_cat1_workflow(history_id=old_history_id)
# Bug cannot mess up hids or these don't extract correctly. See Trello card here:
# https://trello.com/c/mKzLbM2P
# # create dummy dataset to complicate hid mapping
# self.dataset_populator.new_dataset( history_id, content="dummydataset" )
# offset = 1
offset = 0
old_contents = self._history_contents(old_history_id)
for old_dataset in old_contents:
self.__copy_content_to_history(self.history_id, old_dataset)
new_contents = self._history_contents()
input_hids = [c["hid"] for c in new_contents[(offset + 0):(offset + 2)]]
cat1_job_id = self.__job_id(self.history_id, new_contents[(offset + 2)]["id"])
def reimport_jobs_ids(new_history_id):
return [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "cat1"]
downloaded_workflow = self._extract_and_download_workflow(
dataset_ids=input_hids,
job_ids=[cat1_job_id],
)
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @summarize_instance_history_on_error
def test_extract_with_copied_inputs_reimported(self):
old_history_id = self.dataset_populator.new_history()
# Run the simple test workflow and extract it back out from history
self.__setup_and_run_cat1_workflow(history_id=old_history_id)
offset = 0
old_contents = self._history_contents(old_history_id)
for old_dataset in old_contents:
self.__copy_content_to_history(self.history_id, old_dataset)
new_contents = self._history_contents()
input_hids = [c["hid"] for c in new_contents[(offset + 0):(offset + 2)]]
def reimport_jobs_ids(new_history_id):
return [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "cat1"]
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="test_extract_with_copied_inputs",
reimport_jobs_ids=reimport_jobs_ids,
dataset_ids=input_hids,
)
self.__assert_looks_like_cat1_example_workflow(downloaded_workflow)
[docs] @skip_without_tool("random_lines1")
@summarize_instance_history_on_error
def test_extract_mapping_workflow_from_history(self):
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair(self.history_id)
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="extract_from_history_with_mapping",
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] def test_extract_copied_mapping_from_history(self):
old_history_id = self.dataset_populator.new_history()
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_pair(old_history_id)
old_contents = self._history_contents(old_history_id)
for old_content in old_contents:
self.__copy_content_to_history(self.history_id, old_content)
# API test is somewhat contrived since there is no good way
# to retrieve job_id1, job_id2 like this for copied dataset
# collections I don't think.
downloaded_workflow = self._extract_and_download_workflow(
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] def test_extract_copied_mapping_from_history_reimported(self):
import unittest
raise unittest.SkipTest("Mapping connection for copied collections not yet implemented in history import/export")
old_history_id = self.dataset_populator.new_history()
hdca, job_id1, job_id2 = self.__run_random_lines_mapped_over_singleton(old_history_id)
old_contents = self._history_contents(old_history_id)
for old_content in old_contents:
self.__copy_content_to_history(self.history_id, old_content)
def reimport_jobs_ids(new_history_id):
rval = [j["id"] for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] == "random_lines1"]
assert len(rval) == 2
print(rval)
return rval
# API test is somewhat contrived since there is no good way
# to retrieve job_id1, job_id2 like this for copied dataset
# collections I don't think.
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="test_extract_from_history_with_mapped_collection_reimport",
reimport_jobs_ids=reimport_jobs_ids,
reimport_wait_on_history_length=9, # see comments in _extract about eliminating this magic constant.
dataset_collection_ids=[hdca["hid"]],
)
self.__assert_looks_like_randomlines_mapping_workflow(downloaded_workflow)
[docs] @skip_without_tool("random_lines1")
@skip_without_tool("multi_data_param")
def test_extract_reduction_from_history(self):
hdca = self.dataset_collection_populator.create_pair_in_history(self.history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"]).json()
hdca_id = hdca["id"]
inputs1 = {
"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
"num_lines": 2
}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(self.history_id, "random_lines1", inputs1)
inputs2 = {
"f1": {"src": "hdca", "id": implicit_hdca1["id"]},
"f2": {"src": "hdca", "id": implicit_hdca1["id"]},
}
reduction_run_output = self.dataset_populator.run_tool(
tool_id="multi_data_param",
inputs=inputs2,
history_id=self.history_id,
)
job_id2 = reduction_run_output["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id2, assert_ok=True)
self.dataset_populator.wait_for_history(self.history_id, assert_ok=True)
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="extract_from_history_with_reduction",
dataset_collection_ids=[hdca["hid"]],
job_ids=[job_id1, job_id2],
)
assert len(downloaded_workflow["steps"]) == 3
collect_step_idx = self._assert_first_step_is_paired_input(downloaded_workflow)
tool_steps = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=2)
random_lines_map_step = tool_steps[0]
reduction_step = tool_steps[1]
assert "tool_id" in random_lines_map_step, random_lines_map_step
assert random_lines_map_step["tool_id"] == "random_lines1", random_lines_map_step
assert "input_connections" in random_lines_map_step, random_lines_map_step
random_lines_input_connections = random_lines_map_step["input_connections"]
assert "input" in random_lines_input_connections, random_lines_map_step
random_lines_input = random_lines_input_connections["input"]
assert random_lines_input["id"] == collect_step_idx
reduction_step_input = reduction_step["input_connections"]["f1"]
assert reduction_step_input["id"] == random_lines_map_step["id"]
[docs] @skip_without_tool("collection_paired_test")
def test_extract_workflows_with_dataset_collections(self):
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- tool_id: collection_paired_test
state:
f1:
$link: text_input1
test_data:
text_input1:
type: paired
""")
job_id = self._job_id_for_tool(jobs_summary.jobs, "collection_paired_test")
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="extract_from_history_with_basic_collections",
dataset_collection_ids=["1"],
job_ids=[job_id],
)
self.__check_workflow(
downloaded_workflow,
step_count=2,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=["collection_paired_test"]
)
collection_step = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)[0]
collection_step_state = loads(collection_step["tool_state"])
self.assertEqual(collection_step_state["collection_type"], "paired")
[docs] @skip_without_tool("cat_collection")
def test_subcollection_mapping(self):
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- label: noop
tool_id: cat1
state:
input1:
$link: text_input1
- tool_id: cat_collection
state:
input1:
$link: noop/out_file1
test_data:
text_input1:
type: "list:paired"
""")
job1_id = self._job_id_for_tool(jobs_summary.jobs, "cat1")
job2_id = self._job_id_for_tool(jobs_summary.jobs, "cat_collection")
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="test_extract_workflows_with_subcollection_mapping",
dataset_collection_ids=["1"],
job_ids=[job1_id, job2_id],
)
self.__check_workflow(
downloaded_workflow,
step_count=3,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=["cat_collection", "cat1"],
)
collection_step = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)[0]
collection_step_state = loads(collection_step["tool_state"])
self.assertEqual(collection_step_state["collection_type"], "list:paired")
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_creates_dynamic_nested")
def test_subcollection_reduction(self):
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
creates_nested_list:
tool_id: collection_creates_dynamic_nested
reduce_nested_list:
tool_id: cat_list
in:
input1: creates_nested_list/list_output
""")
job1_id = self._job_id_for_tool(jobs_summary.jobs, "cat_list")
job2_id = self._job_id_for_tool(jobs_summary.jobs, "collection_creates_dynamic_nested")
self._extract_and_download_workflow(
reimport_as="test_extract_workflows_with_subcollection_reduction",
dataset_collection_ids=["1"],
job_ids=[job1_id, job2_id],
)
# TODO: refactor workflow extraction to not rely on HID, so we can actually properly connect
# this workflow
[docs] @skip_without_tool("collection_split_on_column")
def test_extract_workflow_with_output_collections(self):
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input
- label: text_input2
type: input
- label: cat_inputs
tool_id: cat1
state:
input1:
$link: text_input1
queries:
- input2:
$link: text_input2
- label: split_up
tool_id: collection_split_on_column
state:
input1:
$link: cat_inputs/out_file1
- tool_id: cat_list
state:
input1:
$link: split_up/split_output
test_data:
text_input1: "samp1\t10.0\nsamp2\t20.0\n"
text_input2: "samp1\t30.0\nsamp2\t40.0\n"
""")
tool_ids = ["cat1", "collection_split_on_column", "cat_list"]
job_ids = [functools.partial(self._job_id_for_tool, jobs_summary.jobs)(_) for _ in tool_ids]
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="test_extract_workflows_with_output_collections",
dataset_ids=["1", "2"],
job_ids=job_ids,
)
self.__check_workflow(
downloaded_workflow,
step_count=5,
verify_connected=True,
data_input_count=2,
data_collection_input_count=0,
tool_ids=tool_ids,
)
[docs] @skip_without_tool("collection_creates_pair")
@summarize_instance_history_on_error
def test_extract_with_mapped_output_collections(self):
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: text_input1
type: input_collection
- label: cat_inputs
tool_id: cat1
state:
input1:
$link: text_input1
- label: pair_off
tool_id: collection_creates_pair
state:
input1:
$link: cat_inputs/out_file1
- label: cat_pairs
tool_id: cat_collection
state:
input1:
$link: pair_off/paired_output
- tool_id: cat_list
state:
input1:
$link: cat_pairs/out_file1
test_data:
text_input1:
type: list
elements:
- identifier: samp1
content: "samp1\t10.0\nsamp2\t20.0\n"
- identifier: samp2
content: "samp1\t30.0\nsamp2\t40.0\n"
""")
tool_ids = ["cat1", "collection_creates_pair", "cat_collection", "cat_list"]
job_ids = [functools.partial(self._job_id_for_tool, jobs_summary.jobs)(_) for _ in tool_ids]
downloaded_workflow = self._extract_and_download_workflow(
reimport_as="test_extract_workflows_with_mapped_output_collections",
dataset_collection_ids=["1"],
job_ids=job_ids,
)
self.__check_workflow(
downloaded_workflow,
step_count=5,
verify_connected=True,
data_input_count=0,
data_collection_input_count=1,
tool_ids=tool_ids,
)
def _job_id_for_tool(self, jobs, tool_id):
return self._job_for_tool(jobs, tool_id)["id"]
def _job_for_tool(self, jobs, tool_id):
tool_jobs = [j for j in jobs if j["tool_id"] == tool_id]
if not tool_jobs:
assert False, "Failed to find job for tool %s" % tool_id
# if len( tool_jobs ) > 1:
# assert False, "Found multiple jobs for tool %s" % tool_id
return tool_jobs[-1]
def __run_random_lines_mapped_over_pair(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(history_id, contents=["1 2 3\n4 5 6", "7 8 9\n10 11 10"]).json()
hdca_id = hdca["id"]
inputs1 = {
"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
"num_lines": 2
}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {
"input": {"batch": True, "values": [{"src": "hdca", "id": implicit_hdca1["id"]}]},
"num_lines": 1
}
_, job_id2 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs2)
return hdca, job_id1, job_id2
def __run_random_lines_mapped_over_singleton(self, history_id):
hdca = self.dataset_collection_populator.create_list_in_history(history_id, contents=["1 2 3\n4 5 6"]).json()
hdca_id = hdca["id"]
inputs1 = {
"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
"num_lines": 2
}
implicit_hdca1, job_id1 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs1)
inputs2 = {
"input": {"batch": True, "values": [{"src": "hdca", "id": implicit_hdca1["id"]}]},
"num_lines": 1
}
_, job_id2 = self._run_tool_get_collection_and_job_id(history_id, "random_lines1", inputs2)
return hdca, job_id1, job_id2
def __assert_looks_like_randomlines_mapping_workflow(self, downloaded_workflow):
# Assert workflow is input connected to a tool step with one output
# connected to another tool step.
assert len(downloaded_workflow["steps"]) == 3
collect_step_idx = self._assert_first_step_is_paired_input(downloaded_workflow)
tool_steps = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=2)
tool_step_idxs = []
tool_input_step_idxs = []
for tool_step in tool_steps:
self._assert_has_key(tool_step["input_connections"], "input")
input_step_idx = tool_step["input_connections"]["input"]["id"]
tool_step_idxs.append(tool_step["id"])
tool_input_step_idxs.append(input_step_idx)
assert collect_step_idx not in tool_step_idxs
assert tool_input_step_idxs[0] == collect_step_idx
assert tool_input_step_idxs[1] == tool_step_idxs[0]
def __assert_looks_like_cat1_example_workflow(self, downloaded_workflow):
assert len(downloaded_workflow["steps"]) == 3
input_steps = self._get_steps_of_type(downloaded_workflow, "data_input", expected_len=2)
tool_step = self._get_steps_of_type(downloaded_workflow, "tool", expected_len=1)[0]
input1 = tool_step["input_connections"]["input1"]
input2 = tool_step["input_connections"]["queries_0|input2"]
self.assertEqual(input_steps[0]["id"], input1["id"])
self.assertEqual(input_steps[1]["id"], input2["id"])
def _history_contents(self, history_id=None):
if history_id is None:
history_id = self.history_id
return self._get("histories/%s/contents" % history_id).json()
def __copy_content_to_history(self, history_id, content):
if content["history_content_type"] == "dataset":
payload = dict(
source="hda",
content=content["id"]
)
response = self._post("histories/%s/contents/datasets" % history_id, payload)
else:
payload = dict(
source="hdca",
content=content["id"]
)
response = self._post("histories/%s/contents/dataset_collections" % history_id, payload)
self._assert_status_code_is(response, 200)
return response.json()
def __setup_and_run_cat1_workflow(self, history_id):
workflow = self.workflow_populator.load_workflow(name="test_for_extract")
workflow_request, history_id = self._setup_workflow_run(workflow, history_id=history_id)
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
return self.__cat_job_id(history_id)
def _assert_first_step_is_paired_input(self, downloaded_workflow):
collection_steps = self._get_steps_of_type(downloaded_workflow, "data_collection_input", expected_len=1)
collection_step = collection_steps[0]
collection_step_state = loads(collection_step["tool_state"])
self.assertEqual(collection_step_state["collection_type"], "paired")
collect_step_idx = collection_step["id"]
return collect_step_idx
def _extract_and_download_workflow(self, **extract_payload):
reimport_as = extract_payload.get("reimport_as")
if reimport_as:
history_name = reimport_as
history_id = self.history_id
self.dataset_populator.wait_for_history(history_id)
self.dataset_populator.rename_history(history_id, history_name)
history_length = extract_payload.get("reimport_wait_on_history_length")
if history_length is None:
# sometimes this won't be the same (i.e. datasets copied from outside the history
# that need to be included in target history for collections), but we can provide
# a reasonable default for fully in-history imports.
history_length = self.dataset_populator.history_length(history_id)
new_history_id = self.dataset_populator.reimport_history(
history_id, history_name, wait_on_history_length=history_length, export_kwds={}, url=self.url, api_key=self.galaxy_interactor.api_key
)
# wait a little more for those jobs, todo fix to wait for history imported false or
# for a specific number of jobs...
import time
time.sleep(1)
if "reimport_jobs_ids" in extract_payload:
new_history_job_ids = extract_payload["reimport_jobs_ids"](new_history_id)
extract_payload["job_ids"] = new_history_job_ids
else:
# Assume no copying or anything so just straight map job ids by index.
# Jobs are created after datasets, need to also wait on those...
history_jobs = [j for j in self.dataset_populator.history_jobs(history_id) if j["tool_id"] != "__EXPORT_HISTORY__"]
new_history_jobs = [j for j in self.dataset_populator.history_jobs(new_history_id) if j["tool_id"] != "__EXPORT_HISTORY__"]
history_job_ids = [j["id"] for j in history_jobs]
new_history_job_ids = [j["id"] for j in new_history_jobs]
assert len(history_job_ids) == len(new_history_job_ids)
if "job_ids" in extract_payload:
job_ids = extract_payload["job_ids"]
new_job_ids = []
for i, job_id in enumerate(job_ids):
new_job_ids.append(new_history_job_ids[history_job_ids.index(job_id)])
extract_payload["job_ids"] = new_job_ids
self.history_id = new_history_id
if "from_history_id" not in extract_payload:
extract_payload["from_history_id"] = self.history_id
if "workflow_name" not in extract_payload:
extract_payload["workflow_name"] = "test import from history"
for key in "job_ids", "dataset_ids", "dataset_collection_ids":
if key in extract_payload:
value = extract_payload[key]
if isinstance(value, list):
extract_payload[key] = dumps(value)
create_workflow_response = self._post("workflows", data=extract_payload)
self._assert_status_code_is(create_workflow_response, 200)
new_workflow_id = create_workflow_response.json()["id"]
download_response = self._get("workflows/%s/download" % new_workflow_id)
self._assert_status_code_is(download_response, 200)
downloaded_workflow = download_response.json()
return downloaded_workflow
def _get_steps_of_type(self, downloaded_workflow, type, expected_len=None):
steps = [s for s in downloaded_workflow["steps"].values() if s["type"] == type]
if expected_len is not None:
n = len(steps)
assert n == expected_len, "Expected %d steps of type %s, found %d" % (expected_len, type, n)
return sorted(steps, key=operator.itemgetter("id"))
def __job_id(self, history_id, dataset_id):
url = "histories/{}/contents/{}/provenance".format(history_id, dataset_id)
prov_response = self._get(url, data=dict(follow=False))
self._assert_status_code_is(prov_response, 200)
return prov_response.json()["job_id"]
def __cat_job_id(self, history_id):
data = dict(history_id=history_id, tool_id="cat1")
jobs_response = self._get("jobs", data=data)
self._assert_status_code_is(jobs_response, 200)
cat1_job_id = jobs_response.json()[0]["id"]
return cat1_job_id
def _run_tool_get_collection_and_job_id(self, history_id, tool_id, inputs):
run_output1 = self.dataset_populator.run_tool(
tool_id=tool_id,
inputs=inputs,
history_id=history_id,
)
implicit_hdca = run_output1["implicit_collections"][0]
job_id = run_output1["jobs"][0]["id"]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
return implicit_hdca, job_id
def __check_workflow(
self,
workflow,
step_count=None,
verify_connected=False,
data_input_count=None,
data_collection_input_count=None,
tool_ids=None,
):
steps = workflow['steps']
if step_count is not None:
assert len(steps) == step_count
if verify_connected:
self.__assert_connected(workflow, steps)
if tool_ids is not None:
tool_steps = self._get_steps_of_type(workflow, "tool")
found_steps = set(map(operator.itemgetter("tool_id"), tool_steps))
expected_steps = set(tool_ids)
assert found_steps == expected_steps
if data_input_count is not None:
self._get_steps_of_type(workflow, "data_input", expected_len=data_input_count)
if data_collection_input_count is not None:
self._get_steps_of_type(workflow, "data_collection_input", expected_len=data_collection_input_count)
def __assert_connected(self, workflow, steps):
disconnected_inputs = []
for key, value in steps.items():
if value['type'] == "tool":
input_connections = value["input_connections"]
if not input_connections:
disconnected_inputs.append(value)
if disconnected_inputs:
template = "%d steps disconnected in extracted workflow - disconnectect steps are %s - workflow is %s"
message = template % (len(disconnected_inputs), disconnected_inputs, workflow)
raise AssertionError(message)
RunJobsSummary = namedtuple('RunJobsSummary', ['history_id', 'workflow_id', 'inputs', 'jobs'])