Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_workflows
from __future__ import print_function
import json
import time
from json import dumps
from uuid import uuid4
import pytest
from requests import delete, get, put
from galaxy.exceptions import error_codes
from galaxy_test.base import rules_test_data
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
skip_without_tool,
wait_on,
WorkflowPopulator
)
from galaxy_test.base.workflow_fixtures import (
WORKFLOW_NESTED_REPLACEMENT_PARAMETER,
WORKFLOW_NESTED_RUNTIME_PARAMETER,
WORKFLOW_NESTED_SIMPLE,
WORKFLOW_ONE_STEP_DEFAULT,
WORKFLOW_OPTIONAL_FALSE_INPUT_COLLECTION,
WORKFLOW_OPTIONAL_FALSE_INPUT_DATA,
WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION,
WORKFLOW_OPTIONAL_TRUE_INPUT_DATA,
WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL,
WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED,
WORKFLOW_RENAME_ON_INPUT,
WORKFLOW_RUNTIME_PARAMETER_AFTER_PAUSE,
WORKFLOW_WITH_CUSTOM_REPORT_1,
WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA,
WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION,
WORKFLOW_WITH_OUTPUT_COLLECTION,
WORKFLOW_WITH_OUTPUT_COLLECTION_MAPPING,
WORKFLOW_WITH_RULES_1,
)
from ._framework import ApiTestCase
NESTED_WORKFLOW_AUTO_LABELS_MODERN_SYNTAX = """
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
- id: inner_input
outputs:
- outputSource: 1/out_file1
steps:
random:
tool_id: random_lines1
state:
num_lines: 1
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
in:
inner_input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: nested_workflow/1:out_file1
queries_0|input2: nested_workflow/1:out_file1
"""
[docs]class BaseWorkflowsApiTestCase(ApiTestCase):
# TODO: Find a new file for this class.
[docs] def setUp(self):
super(BaseWorkflowsApiTestCase, self).setUp()
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
def _assert_user_has_workflow_with_name(self, name):
names = self._workflow_names()
assert name in names, "No workflows with name %s in users workflows <%s>" % (name, names)
def _workflow_names(self):
index_response = self._get("workflows")
self._assert_status_code_is(index_response, 200)
names = [w["name"] for w in index_response.json()]
return names
[docs] def import_workflow(self, workflow, **kwds):
upload_response = self.workflow_populator.import_workflow(workflow, **kwds)
return upload_response
def _upload_yaml_workflow(self, has_yaml, **kwds):
return self.workflow_populator.upload_yaml_workflow(has_yaml, **kwds)
def _setup_workflow_run(self, workflow=None, inputs_by='step_id', history_id=None, workflow_id=None):
if not workflow_id:
workflow_id = self.workflow_populator.create_workflow(workflow)
if not history_id:
history_id = self.dataset_populator.new_history()
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6")
workflow_request = dict(
history="hist_id=%s" % history_id,
workflow_id=workflow_id,
)
label_map = {
'WorkflowInput1': self._ds_entry(hda1),
'WorkflowInput2': self._ds_entry(hda2)
}
if inputs_by == 'step_id':
ds_map = self._build_ds_map(workflow_id, label_map)
workflow_request["ds_map"] = ds_map
elif inputs_by == "step_index":
index_map = {
'0': self._ds_entry(hda1),
'1': self._ds_entry(hda2)
}
workflow_request["inputs"] = dumps(index_map)
workflow_request["inputs_by"] = 'step_index'
elif inputs_by == "name":
workflow_request["inputs"] = dumps(label_map)
workflow_request["inputs_by"] = 'name'
elif inputs_by in ["step_uuid", "uuid_implicitly"]:
uuid_map = {
workflow["steps"]["0"]["uuid"]: self._ds_entry(hda1),
workflow["steps"]["1"]["uuid"]: self._ds_entry(hda2),
}
workflow_request["inputs"] = dumps(uuid_map)
if inputs_by == "step_uuid":
workflow_request["inputs_by"] = "step_uuid"
return workflow_request, history_id
def _build_ds_map(self, workflow_id, label_map):
workflow_inputs = self._workflow_inputs(workflow_id)
ds_map = {}
for key, value in workflow_inputs.items():
label = value["label"]
if label in label_map:
ds_map[key] = label_map[label]
return dumps(ds_map)
def _ds_entry(self, history_content):
return self.dataset_populator.ds_entry(history_content)
def _workflow_inputs(self, uploaded_workflow_id):
workflow_show_resposne = self._get("workflows/%s" % uploaded_workflow_id)
self._assert_status_code_is(workflow_show_resposne, 200)
workflow_inputs = workflow_show_resposne.json()["inputs"]
return workflow_inputs
def _invocation_details(self, workflow_id, invocation_id, **kwds):
invocation_details_response = self._get("workflows/%s/usage/%s" % (workflow_id, invocation_id), data=kwds)
self._assert_status_code_is(invocation_details_response, 200)
invocation_details = invocation_details_response.json()
return invocation_details
def _run_jobs(self, has_workflow, history_id=None, **kwds):
if history_id is None:
history_id = self.history_id
return self.workflow_populator.run_workflow(has_workflow, history_id=history_id, **kwds)
def _history_jobs(self, history_id):
return self._get("jobs", {"history_id": history_id, "order_by": "create_time"}).json()
def _assert_history_job_count(self, history_id, n):
jobs = self._history_jobs(history_id)
self.assertEqual(len(jobs), n)
def _download_workflow(self, workflow_id, style=None):
return self.workflow_populator.download_workflow(workflow_id, style=style)
[docs] def wait_for_invocation_and_jobs(self, history_id, workflow_id, invocation_id, assert_ok=True):
state = self.workflow_populator.wait_for_invocation(workflow_id, invocation_id)
if assert_ok:
assert state == "scheduled", state
self.workflow_populator.wait_for_invocation(workflow_id, invocation_id)
time.sleep(.5)
self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=assert_ok)
time.sleep(.5)
def _assert_is_runtime_input(self, tool_state_value):
if not isinstance(tool_state_value, dict):
tool_state_value = json.loads(tool_state_value)
assert isinstance(tool_state_value, dict)
assert "__class__" in tool_state_value
assert tool_state_value["__class__"] == "RuntimeValue"
# Workflow API TODO:
# - Allow history_id as param to workflow run action. (hist_id)
# - Allow post to workflows/<workflow_id>/run in addition to posting to
# /workflows with id in payload.
# - Much more testing obviously, always more testing.
[docs]class WorkflowsApiTestCase(BaseWorkflowsApiTestCase):
[docs] def test_show_valid(self):
workflow_id = self.workflow_populator.simple_workflow("dummy")
workflow_id = self.workflow_populator.simple_workflow("test_regular")
show_response = self._get("workflows/%s" % workflow_id, {"style": "instance"})
workflow = show_response.json()
self._assert_looks_like_instance_workflow_representation(workflow)
assert len(workflow["steps"]) == 3
self.assertEqual(sorted(step["id"] for step in workflow["steps"].values()), [0, 1, 2])
show_response = self._get("workflows/%s" % workflow_id, {"legacy": True})
workflow = show_response.json()
self._assert_looks_like_instance_workflow_representation(workflow)
assert len(workflow["steps"]) == 3
# Can't reay say what the legacy IDs are but must be greater than 3 because dummy
# workflow was created first in this instance.
self.assertNotEqual(sorted(step["id"] for step in workflow["steps"].values()), [0, 1, 2])
[docs] def test_show_invalid_key_is_400(self):
show_response = self._get("workflows/%s" % self._random_key())
self._assert_status_code_is(show_response, 400)
[docs] def test_cannot_show_private_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_not_importable")
with self._different_user():
show_response = self._get("workflows/%s" % workflow_id)
self._assert_status_code_is(show_response, 403)
# Try as anonymous user
workflows_url = self._api_url("workflows/%s" % workflow_id)
assert get(workflows_url).status_code == 403
[docs] def test_cannot_download_private_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_not_downloadable")
with self._different_user():
with pytest.raises(AssertionError) as excinfo:
self._download_workflow(workflow_id)
assert '403' in str(excinfo.value)
workflows_url = self._api_url("workflows/%s/download" % workflow_id)
assert get(workflows_url).status_code == 403
[docs] def test_anon_can_download_public_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_downloadable", publish=True)
workflows_url = self._api_url("workflows/%s/download" % workflow_id)
response = get(workflows_url)
response.raise_for_status()
assert response.json()['a_galaxy_workflow'] == 'true'
[docs] def test_delete(self):
workflow_id = self.workflow_populator.simple_workflow("test_delete")
workflow_name = "test_delete"
self._assert_user_has_workflow_with_name(workflow_name)
workflow_url = self._api_url("workflows/%s" % workflow_id, use_key=True)
delete_response = delete(workflow_url)
self._assert_status_code_is(delete_response, 200)
# Make sure workflow is no longer in index by default.
assert workflow_name not in self._workflow_names()
[docs] def test_other_cannot_delete(self):
workflow_id = self.workflow_populator.simple_workflow("test_other_delete")
with self._different_user():
workflow_url = self._api_url("workflows/%s" % workflow_id, use_key=True)
delete_response = delete(workflow_url)
self._assert_status_code_is(delete_response, 403)
[docs] def test_index(self):
index_response = self._get("workflows")
self._assert_status_code_is(index_response, 200)
assert isinstance(index_response.json(), list)
[docs] def test_import_tools_requires_admin(self):
response = self.__test_upload(import_tools=True, assert_ok=False)
assert response.status_code == 403
def __test_upload(self, use_deprecated_route=False, name="test_import", workflow=None, assert_ok=True, import_tools=False):
if workflow is None:
workflow = self.workflow_populator.load_workflow(name=name)
data = dict(
workflow=dumps(workflow),
)
if import_tools:
data["import_tools"] = import_tools
if use_deprecated_route:
route = "workflows/upload"
else:
route = "workflows"
upload_response = self._post(route, data=data)
if assert_ok:
self._assert_status_code_is(upload_response, 200)
self._assert_user_has_workflow_with_name(name)
return upload_response
[docs] def test_get_tool_predictions(self):
request = {"tool_sequence": "Cut1", "remote_model_url": "https://github.com/galaxyproject/galaxy-test-data/raw/master/tool_recommendation_model.hdf5"}
actual_recommendations = ['Filter1', 'cat1', 'addValue', 'comp1', 'Grep1']
route = "workflows/get_tool_predictions"
response = self._post(route, data=request)
recommendation_response = response.json()
is_empty = bool(recommendation_response["current_tool"])
if is_empty is False:
self._assert_status_code_is(response, 400)
else:
# check Ok response from the API
self._assert_status_code_is(response, 200)
recommendation_response = response.json()
# check the input tool sequence
assert recommendation_response["current_tool"] == request["tool_sequence"]
# check non-empty predictions list
predicted_tools = recommendation_response["predicted_data"]["children"]
assert len(predicted_tools) > 0
# check for the correct predictions
for tool in predicted_tools:
assert tool["tool_id"] in actual_recommendations
break
[docs] def test_update(self):
original_workflow = self.workflow_populator.load_workflow(name="test_import")
uuids = {}
labels = {}
for order_index, step_dict in original_workflow["steps"].items():
uuid = str(uuid4())
step_dict["uuid"] = uuid
uuids[order_index] = uuid
label = "label_%s" % order_index
step_dict["label"] = label
labels[order_index] = label
def check_label_and_uuid(order_index, step_dict):
assert order_index in uuids
assert order_index in labels
self.assertEqual(uuids[order_index], step_dict["uuid"])
self.assertEqual(labels[order_index], step_dict["label"])
upload_response = self.__test_upload(workflow=original_workflow)
workflow_id = upload_response.json()["id"]
def update(workflow_object):
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is(put_response, 200)
return put_response
workflow_content = self._download_workflow(workflow_id)
steps = workflow_content["steps"]
def tweak_step(step):
order_index, step_dict = step
check_label_and_uuid(order_index, step_dict)
assert step_dict['position']['top'] != 1
assert step_dict['position']['left'] != 1
step_dict['position'] = {'top': 1, 'left': 1}
map(tweak_step, steps.items())
update(workflow_content)
def check_step(step):
order_index, step_dict = step
check_label_and_uuid(order_index, step_dict)
assert step_dict['position']['top'] == 1
assert step_dict['position']['left'] == 1
updated_workflow_content = self._download_workflow(workflow_id)
map(check_step, updated_workflow_content['steps'].items())
# Re-update against original workflow...
update(original_workflow)
updated_workflow_content = self._download_workflow(workflow_id)
# Make sure the positions have been updated.
map(tweak_step, updated_workflow_content['steps'].items())
[docs] def test_update_tags(self):
workflow_object = self.workflow_populator.load_workflow(name="test_import")
upload_response = self.__test_upload(workflow=workflow_object)
workflow = upload_response.json()
workflow['tags'] = ['a_tag', 'b_tag']
update_response = self._update_workflow(workflow['id'], workflow).json()
assert update_response['tags'] == ['a_tag', 'b_tag']
del workflow['tags']
update_response = self._update_workflow(workflow['id'], workflow).json()
assert update_response['tags'] == ['a_tag', 'b_tag']
workflow['tags'] = []
update_response = self._update_workflow(workflow['id'], workflow).json()
assert update_response['tags'] == []
[docs] def test_update_no_tool_id(self):
workflow_object = self.workflow_populator.load_workflow(name="test_import")
upload_response = self.__test_upload(workflow=workflow_object)
workflow_id = upload_response.json()["id"]
del workflow_object["steps"]["2"]["tool_id"]
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is(put_response, 400)
[docs] def test_update_missing_tool(self):
# Create allows missing tools, update doesn't currently...
workflow_object = self.workflow_populator.load_workflow(name="test_import")
upload_response = self.__test_upload(workflow=workflow_object)
workflow_id = upload_response.json()["id"]
workflow_object["steps"]["2"]["tool_id"] = "cat-not-found"
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is(put_response, 400)
[docs] def test_require_unique_step_uuids(self):
workflow_dup_uuids = self.workflow_populator.load_workflow(name="test_import")
uuid0 = str(uuid4())
for step_dict in workflow_dup_uuids["steps"].values():
step_dict["uuid"] = uuid0
response = self.workflow_populator.create_workflow_response(workflow_dup_uuids)
self._assert_status_code_is(response, 400)
[docs] def test_require_unique_step_labels(self):
workflow_dup_label = self.workflow_populator.load_workflow(name="test_import")
for step_dict in workflow_dup_label["steps"].values():
step_dict["label"] = "my duplicated label"
response = self.workflow_populator.create_workflow_response(workflow_dup_label)
self._assert_status_code_is(response, 400)
[docs] def test_import_deprecated(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_published_deprecated", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
self._assert_user_has_workflow_with_name("imported: test_import_published_deprecated")
[docs] def test_import_export_dynamic(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
- type: input
label: input1
- tool_id: cat1
label: first_cat
state:
input1:
$link: 0
- label: embed1
run:
class: GalaxyTool
command: echo 'hello world 2' > $output1
outputs:
output1:
format: txt
- tool_id: cat1
state:
input1:
$link: first_cat/out_file1
queries:
input2:
$link: embed1/output1
test_data:
input1: "hello world"
""")
downloaded_workflow = self._download_workflow(workflow_id)
# The _upload_yaml_workflow entry point uses an admin key, but if we try to
# do the raw re-import as a regular user we expect a 403 error.
response = self.workflow_populator.create_workflow_response(downloaded_workflow)
self._assert_status_code_is(response, 403)
[docs] def test_import_annotations(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_annotations", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
# Test annotations preserved during upload and copied over during
# import.
other_id = other_import_response.json()["id"]
imported_workflow = self._show_workflow(other_id)
assert imported_workflow["annotation"] == "simple workflow"
step_annotations = set(step["annotation"] for step in imported_workflow["steps"].values())
assert "input1 description" in step_annotations
[docs] def test_import_subworkflows(self):
def get_subworkflow_content_id(workflow_id):
workflow_contents = self._download_workflow(workflow_id, style="editor")
steps = workflow_contents['steps']
subworkflow_step = next(s for s in steps.values() if s["type"] == "subworkflow")
return subworkflow_step['content_id']
workflow_id = self._upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE, publish=True)
subworkflow_content_id = get_subworkflow_content_id(workflow_id)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
imported_workflow_id = other_import_response.json()["id"]
imported_subworkflow_content_id = get_subworkflow_content_id(imported_workflow_id)
assert subworkflow_content_id != imported_subworkflow_content_id
[docs] def test_not_importable_prevents_import(self):
workflow_id = self.workflow_populator.simple_workflow("test_not_importportable")
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 403)
[docs] def test_anonymous_published(self):
def anonymous_published_workflows():
workflows_url = self._api_url("workflows?show_published=True")
return get(workflows_url).json()
names = [w["name"] for w in anonymous_published_workflows()]
assert "test published example" not in names
workflow_id = self.workflow_populator.simple_workflow("test published example", publish=True)
names = [w["name"] for w in anonymous_published_workflows()]
assert "test published example" in names
ids = [w["id"] for w in anonymous_published_workflows()]
assert workflow_id in ids
[docs] def test_import_published(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_published", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id, deprecated_route=True)
self._assert_status_code_is(other_import_response, 200)
self._assert_user_has_workflow_with_name("imported: test_import_published")
[docs] def test_export(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export")
downloaded_workflow = self._download_workflow(uploaded_workflow_id)
assert downloaded_workflow["name"] == "test_for_export"
steps = downloaded_workflow["steps"]
assert len(steps) == 3
assert "0" in steps
first_step = steps["0"]
self._assert_has_keys(first_step, "inputs", "outputs")
inputs = first_step["inputs"]
assert len(inputs) > 0, first_step
first_input = inputs[0]
assert first_input["name"] == "WorkflowInput1"
assert first_input["description"] == "input1 description"
self._assert_has_keys(downloaded_workflow, "a_galaxy_workflow", "format-version", "annotation", "uuid", "steps")
for step in downloaded_workflow["steps"].values():
self._assert_has_keys(
step,
'id',
'type',
'tool_id',
'tool_version',
'name',
'tool_state',
'annotation',
'inputs',
'workflow_outputs',
'outputs'
)
if step['type'] == "tool":
self._assert_has_keys(step, "post_job_actions")
[docs] def test_export_format2(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export_format2")
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="format2")
assert downloaded_workflow["class"] == "GalaxyWorkflow"
[docs] def test_export_editor(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export")
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="editor")
self._assert_has_keys(downloaded_workflow, "name", "steps", "upgrade_messages")
for step in downloaded_workflow["steps"].values():
self._assert_has_keys(
step,
'id',
'type',
'content_id',
'name',
'tool_state',
'tooltip',
'inputs',
'outputs',
'config_form',
'annotation',
'post_job_actions',
'workflow_outputs',
'uuid',
'label',
)
[docs] @skip_without_tool('output_filter_with_input')
def test_export_editor_filtered_outputs(self):
template = """
class: GalaxyWorkflow
steps:
- tool_id: output_filter_with_input
state:
produce_out_1: {produce_out_1}
filter_text_1: {filter_text_1}
produce_collection: false
produce_paired_collection: false
"""
workflow_id = self._upload_yaml_workflow(template.format(produce_out_1='false', filter_text_1='false'))
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow['steps']['0']['outputs']
assert len(outputs) == 1
assert outputs[0]['name'] == 'out_3'
workflow_id = self._upload_yaml_workflow(template.format(produce_out_1='true', filter_text_1='false'))
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow['steps']['0']['outputs']
assert len(outputs) == 2
assert outputs[0]['name'] == 'out_1'
assert outputs[1]['name'] == 'out_3'
workflow_id = self._upload_yaml_workflow(template.format(produce_out_1='true', filter_text_1='foo'))
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow['steps']['0']['outputs']
assert len(outputs) == 3
assert outputs[0]['name'] == 'out_1'
assert outputs[1]['name'] == 'out_2'
assert outputs[2]['name'] == 'out_3'
[docs] @skip_without_tool('output_filter_exception_1')
def test_export_editor_filtered_outputs_exception_handling(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
- tool_id: output_filter_exception_1
""")
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow['steps']['0']['outputs']
assert len(outputs) == 2
assert outputs[0]['name'] == 'out_1'
assert outputs[1]['name'] == 'out_2'
[docs] @skip_without_tool('collection_type_source')
def test_export_editor_collection_type_source(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
- id: text_input1
type: collection
collection_type: "list:paired"
steps:
- tool_id: collection_type_source
in:
input_collect: text_input1
""")
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
steps = downloaded_workflow['steps']
assert len(steps) == 2
# Non-subworkflow collection_type_source tools will be handled by the client,
# so collection_type should be None here.
assert steps['1']['outputs'][0]['collection_type'] is None
[docs] @skip_without_tool('collection_type_source')
def test_export_editor_subworkflow_collection_type_source(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
outer_input: data
steps:
inner_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input:
type: collection
collection_type: "list:paired"
outputs:
workflow_output:
outputSource: collection_type_source/list_output
steps:
collection_type_source:
tool_id: collection_type_source
in:
input_collect: inner_input
in:
inner_input: outer_input
""")
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
steps = downloaded_workflow['steps']
assert len(steps) == 2
assert steps['1']['type'] == 'subworkflow'
assert steps['1']['outputs'][0]['collection_type'] == 'list:paired'
[docs] def test_import_missing_tool(self):
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_missing_tool")
workflow_id = self.workflow_populator.create_workflow(workflow)
workflow_description = self._show_workflow(workflow_id)
steps = workflow_description["steps"]
missing_tool_steps = [v for v in steps.values() if v['tool_id'] == 'cat_missing_tool']
assert len(missing_tool_steps) == 1
[docs] def test_import_no_tool_id(self):
# Import works with missing tools, but not with absent content/tool id.
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_missing_tool")
del workflow["steps"]["2"]["tool_id"]
create_response = self.__test_upload(workflow=workflow, assert_ok=False)
self._assert_status_code_is(create_response, 400)
[docs] def test_import_export_with_runtime_inputs(self):
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_with_runtime_input")
workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(workflow_id)
assert len(downloaded_workflow["steps"]) == 2
runtime_step = downloaded_workflow["steps"]["1"]
for runtime_input in runtime_step["inputs"]:
if runtime_input["name"] == "num_lines":
break
assert runtime_input["description"].startswith("runtime parameter for tool")
tool_state = json.loads(runtime_step["tool_state"])
assert "num_lines" in tool_state
self._assert_is_runtime_input(tool_state["num_lines"])
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_index(self):
self.__run_cat_workflow(inputs_by='step_index')
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_uuid(self):
self.__run_cat_workflow(inputs_by='step_uuid')
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_uuid_implicitly(self):
self.__run_cat_workflow(inputs_by='uuid_implicitly')
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_name(self):
self.__run_cat_workflow(inputs_by='name')
[docs] @skip_without_tool("cat1")
def test_run_workflow(self):
self.__run_cat_workflow(inputs_by='step_id')
[docs] @skip_without_tool("multiple_versions")
def test_run_versioned_tools(self):
with self.dataset_populator.test_history() as history_01_id:
workflow_version_01 = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
multiple:
tool_id: multiple_versions
tool_version: "0.1"
state:
inttest: 0
""")
self.__invoke_workflow(history_01_id, workflow_version_01)
self.dataset_populator.wait_for_history(history_01_id, assert_ok=True)
with self.dataset_populator.test_history() as history_02_id:
workflow_version_02 = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
multiple:
tool_id: multiple_versions
tool_version: "0.2"
state:
inttest: 1
""")
self.__invoke_workflow(history_02_id, workflow_version_02)
self.dataset_populator.wait_for_history(history_02_id, assert_ok=True)
def __run_cat_workflow(self, inputs_by):
workflow = self.workflow_populator.load_workflow(name="test_for_run")
workflow["steps"]["0"]["uuid"] = str(uuid4())
workflow["steps"]["1"]["uuid"] = str(uuid4())
workflow_request, history_id = self._setup_workflow_run(workflow, inputs_by=inputs_by)
# TODO: This should really be a post to workflows/<workflow_id>/run or
# something like that.
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
invocation_id = run_workflow_response.json()["id"]
invocation = self._invocation_details(workflow_request["workflow_id"], invocation_id)
assert invocation["state"] == "scheduled", invocation
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_tool("collection_creates_pair")
def test_workflow_run_output_collections(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_WITH_OUTPUT_COLLECTION, history_id=history_id, assert_ok=True, wait=True)
self.assertEqual("a\nc\nb\nd\n", self.dataset_populator.get_history_dataset_content(history_id, hid=0))
[docs] @skip_without_tool("job_properties")
@skip_without_tool("identifier_multiple_in_conditional")
def test_workflow_resume_from_failed_step(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
job_props:
tool_id: job_properties
state:
thebool: true
failbool: true
identifier:
tool_id: identifier_multiple_in_conditional
state:
outer_cond:
cond_param_outer: true
inner_cond:
cond_param_inner: true
input1:
$link: 0/out_file1
cat:
tool_id: cat1
in:
input1: identifier/output1
queries_0|input2: identifier/output1
""")
with self.dataset_populator.test_history() as history_id:
invocation_id = self.__invoke_workflow(history_id, workflow_id)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=False)
failed_dataset_one = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False)
assert failed_dataset_one['state'] == 'error', failed_dataset_one
paused_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False)
assert paused_dataset['state'] == 'paused', paused_dataset
inputs = {"thebool": "false",
"failbool": "false",
"rerun_remap_job_id": failed_dataset_one['creating_job']}
self.dataset_populator.run_tool(tool_id='job_properties',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset_1 = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False)
assert unpaused_dataset_1['state'] == 'ok'
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
unpaused_dataset_2 = self.dataset_populator.get_history_dataset_details(history_id, hid=6, wait=True, assert_ok=False)
assert unpaused_dataset_2['state'] == 'ok'
[docs] @skip_without_tool("job_properties")
@skip_without_tool("collection_creates_list")
def test_workflow_resume_from_failed_step_with_hdca_input(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
job_props:
tool_id: job_properties
state:
thebool: true
failbool: true
list_in_list_out:
tool_id: collection_creates_list
in:
input1: job_props/list_output
identifier:
tool_id: identifier_collection
in:
input1: list_in_list_out/list_output
""")
with self.dataset_populator.test_history() as history_id:
invocation_id = self.__invoke_workflow(history_id, workflow_id)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=False)
failed_dataset_one = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False)
assert failed_dataset_one['state'] == 'error', failed_dataset_one
paused_colletion = self.dataset_populator.get_history_collection_details(history_id, hid=7, wait=True, assert_ok=False)
first_paused_element = paused_colletion['elements'][0]['object']
assert first_paused_element['state'] == 'paused', first_paused_element
dependent_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=8, wait=True,
assert_ok=False)
assert dependent_dataset['state'] == 'paused'
inputs = {"thebool": "false",
"failbool": "false",
"rerun_remap_job_id": failed_dataset_one['creating_job']}
self.dataset_populator.run_tool(tool_id='job_properties',
inputs=inputs,
history_id=history_id,
assert_ok=True)
paused_colletion = self.dataset_populator.get_history_collection_details(history_id, hid=7, wait=True, assert_ok=False)
first_paused_element = paused_colletion['elements'][0]['object']
assert first_paused_element['state'] == 'ok'
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
dependent_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=8, wait=True, assert_ok=False)
assert dependent_dataset['name'].startswith('identifier_collection')
assert dependent_dataset['state'] == 'ok'
[docs] @skip_without_tool("fail_identifier")
@skip_without_tool("identifier_collection")
def test_workflow_resume_with_mapped_over_input(self):
with self.dataset_populator.test_history() as history_id:
job_summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input_datasets: collection
steps:
fail_identifier_1:
tool_id: fail_identifier
state:
failbool: true
in:
input1: input_datasets
identifier:
tool_id: identifier_collection
in:
input1: fail_identifier_1/out_file1
test_data:
input_datasets:
type: list
elements:
- identifier: fail
value: 1.fastq
type: File
- identifier: success
value: 1.fastq
type: File
""", history_id=history_id, assert_ok=False, wait=False)
self.wait_for_invocation_and_jobs(history_id, job_summary.workflow_id, job_summary.invocation_id, assert_ok=False)
history_contents = self.dataset_populator._get_contents_request(history_id=history_id).json()
first_input = history_contents[1]
assert first_input['history_content_type'] == 'dataset'
paused_dataset = history_contents[-1]
failed_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, assert_ok=False)
assert paused_dataset['state'] == 'paused', paused_dataset
assert failed_dataset['state'] == 'error', failed_dataset
inputs = {"input1": {'values': [{'src': 'hda',
'id': first_input['id']}]
},
"failbool": "false",
"rerun_remap_job_id": failed_dataset['creating_job']}
run_dict = self.dataset_populator.run_tool(tool_id='fail_identifier',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False)
assert unpaused_dataset['state'] == 'ok'
contents = self.dataset_populator.get_history_dataset_content(history_id, hid=7, assert_ok=False)
assert contents == 'fail\nsuccess\n', contents
replaced_hda_id = run_dict['outputs'][0]['id']
replaced_hda = self.dataset_populator.get_history_dataset_details(history_id, dataset_id=replaced_hda_id, wait=True, assert_ok=False)
assert not replaced_hda['visible'], replaced_hda
[docs] @skip_without_tool('multi_data_optional')
def test_workflow_list_list_multi_data_map_over(self):
# Test that a list:list is reduced to list with a multiple="true" data input
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
input_datasets: collection
steps:
multi_data_optional:
tool_id: multi_data_optional
in:
input1: input_datasets
""")
with self.dataset_populator.test_history() as history_id:
hdca_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
'0': self._ds_entry(hdca_id),
}
invocation_id = self.__invoke_workflow(history_id, workflow_id, inputs)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
output_collection = self.dataset_populator.get_history_collection_details(history_id, hid=6)
assert output_collection['collection_type'] == 'list'
assert output_collection['job_source_type'] == 'ImplicitCollectionJobs'
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_creates_pair")
def test_workflow_run_output_collection_mapping(self):
workflow_id = self._upload_yaml_workflow(WORKFLOW_WITH_OUTPUT_COLLECTION_MAPPING)
with self.dataset_populator.test_history() as history_id:
hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=["a\nb\nc\nd\n", "e\nf\ng\nh\n"]).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
'0': self._ds_entry(hdca1),
}
invocation_id = self.__invoke_workflow(history_id, workflow_id, inputs)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
self.assertEqual("a\nc\nb\nd\ne\ng\nf\nh\n", self.dataset_populator.get_history_dataset_content(history_id, hid=0))
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_split_on_column")
def test_workflow_run_dynamic_output_collections(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION, history_id=history_id, assert_ok=True, wait=True)
details = self.dataset_populator.get_history_dataset_details(history_id, hid=0)
last_item_hid = details["hid"]
assert last_item_hid == 7, "Expected 7 history items, got %s" % last_item_hid
content = self.dataset_populator.get_history_dataset_content(history_id, hid=0)
self.assertEqual("10.0\n30.0\n20.0\n40.0\n", content)
[docs] @skip_without_tool("collection_split_on_column")
@skip_without_tool("min_repeat")
def test_workflow_run_dynamic_output_collections_2(self):
# A more advanced output collection workflow, testing regression of
# https://github.com/galaxyproject/galaxy/issues/776
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
test_input_1: data
test_input_2: data
test_input_3: data
steps:
split_up:
tool_id: collection_split_on_column
in:
input1: test_input_2
min_repeat:
tool_id: min_repeat
in:
queries_0|input: test_input_1
queries2_0|input2: split_up/split_output
""")
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t20.0\nsamp2\t40.0\n")
hda3 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t60.0\n")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
'0': self._ds_entry(hda1),
'1': self._ds_entry(hda2),
'2': self._ds_entry(hda3),
}
invocation_id = self.__invoke_workflow(history_id, workflow_id, inputs)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=7)
assert collection_details["populated_state"] == "ok"
content = self.dataset_populator.get_history_dataset_content(history_id, hid=11)
self.assertEqual(content.strip(), "samp1\t10.0\nsamp2\t20.0")
[docs] @skip_without_tool("cat")
@skip_without_tool("collection_split_on_column")
def test_workflow_run_dynamic_output_collections_3(self):
# Test a workflow that create a list:list:list followed by a mapping step.
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
text_input1: data
text_input2: data
steps:
cat_inputs:
tool_id: cat1
in:
input1: text_input1
queries_0|input2: text_input2
split_up_1:
tool_id: collection_split_on_column
in:
input1: cat_inputs/out_file1
split_up_2:
tool_id: collection_split_on_column
in:
input1: split_up_1/split_output
cat_output:
tool_id: cat
in:
input1: split_up_2/split_output
""")
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t40.0\n")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
'0': self._ds_entry(hda1),
'1': self._ds_entry(hda2),
}
invocation_id = self.__invoke_workflow(history_id, workflow_id, inputs)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
[docs] @skip_without_tool("mapper")
@skip_without_tool("pileup")
def test_workflow_metadata_validation_0(self):
# Testing regression of
# https://github.com/galaxyproject/galaxy/issues/1514
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input_fastqs: collection
reference: data
steps:
map_over_mapper:
tool_id: mapper
in:
input1: input_fastqs
reference: reference
pileup:
tool_id: pileup
in:
input1: map_over_mapper/out_file1
reference: reference
test_data:
input_fastqs:
type: list
elements:
- identifier: samp1
value: 1.fastq
type: File
- identifier: samp2
value: 1.fastq
type: File
reference:
value: 1.fasta
type: File
""", history_id=history_id)
[docs] def test_run_subworkflow_simple(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_NESTED_SIMPLE, test_data="""
outer_input:
value: 1.bed
type: File
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_content(history_id)
self.assertEqual("chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", content)
[docs] @skip_without_tool("random_lines1")
def test_run_subworkflow_runtime_parameters(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_NESTED_RUNTIME_PARAMETER, test_data="""
step_parameters:
'1':
'1|num_lines': 2
outer_input:
value: 1.bed
type: File
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len([x for x in content.split("\n") if x]) == 2
[docs] @skip_without_tool("cat")
def test_run_subworkflow_replacment_parameters(self):
with self.dataset_populator.test_history() as history_id:
test_data = """
replacement_parameters:
replaceme: moocow
outer_input:
value: 1.bed
type: File
"""
self._run_jobs(WORKFLOW_NESTED_REPLACEMENT_PARAMETER, test_data=test_data, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow suffix"
[docs] @skip_without_tool("random_lines1")
def test_run_runtime_parameters_after_pause(self):
with self.dataset_populator.test_history() as history_id:
workflow_run_description = """%s
test_data:
step_parameters:
'2':
'num_lines': 2
input1:
value: 1.bed
type: File
""" % WORKFLOW_RUNTIME_PARAMETER_AFTER_PAUSE
job_summary = self._run_jobs(workflow_run_description, history_id=history_id, wait=False)
uploaded_workflow_id, invocation_id = job_summary.workflow_id, job_summary.invocation_id
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
# Review the paused steps to allow the workflow to continue.
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=1, action=True)
# Wait for the workflow to finish scheduling and ensure both the invocation
# and the history are in valid states.
invocation_scheduled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'scheduled')
assert invocation_scheduled, "Workflow state is not scheduled..."
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len([x for x in content.split("\n") if x]) == 2
[docs] def test_run_subworkflow_auto_labels(self):
def run_test(workflow_text):
with self.dataset_populator.test_history() as history_id:
test_data = """
outer_input:
value: 1.bed
type: File
"""
job_summary = self._run_jobs(workflow_text, test_data=test_data, history_id=history_id)
assert len(job_summary.jobs) == 4, "4 jobs expected, got %d jobs" % len(job_summary.jobs)
content = self.dataset_populator.get_history_dataset_content(history_id)
self.assertEqual(
"chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n",
content)
run_test(NESTED_WORKFLOW_AUTO_LABELS_MODERN_SYNTAX)
[docs] @skip_without_tool("cat1")
@skip_without_tool("collection_paired_test")
def test_workflow_run_zip_collections(self):
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
test_input_1: data
test_input_2: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input_1
zip_it:
tool_id: "__ZIP_COLLECTION__"
in:
input_forward: first_cat/out_file1
input_reverse: test_input_2
concat_pair:
tool_id: collection_paired_test
in:
f1: zip_it/output
""")
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t20.0\nsamp2\t40.0\n")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
'0': self._ds_entry(hda1),
'1': self._ds_entry(hda2),
}
invocation_id = self.__invoke_workflow(history_id, workflow_id, inputs)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
content = self.dataset_populator.get_history_dataset_content(history_id)
self.assertEqual(content.strip(), "samp1\t10.0\nsamp2\t20.0\nsamp1\t20.0\nsamp2\t40.0")
[docs] @skip_without_tool("collection_paired_test")
def test_workflow_flatten(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
steps:
nested:
tool_id: collection_creates_dynamic_nested
state:
sleep_time: 0
foo: 'dummy'
flatten:
tool_id: '__FLATTEN__'
state:
input:
$link: nested/list_output
join_identifier: '-'
""", test_data={}, history_id=history_id)
details = self.dataset_populator.get_history_collection_details(history_id, hid=14)
assert details['collection_type'] == "list"
elements = details["elements"]
identifiers = [e['element_identifier'] for e in elements]
assert len(identifiers) == 6
assert "oe1-ie1" in identifiers
[docs] @skip_without_tool("collection_paired_test")
def test_workflow_flatten_with_mapped_over_execution(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(r"""
class: GalaxyWorkflow
inputs:
input_fastqs: collection
steps:
split_up:
tool_id: collection_split_on_column
in:
input1: input_fastqs
flatten:
tool_id: '__FLATTEN__'
in:
input: split_up/split_output
join_identifier: '-'
test_data:
input_fastqs:
type: list
elements:
- identifier: samp1
content: "0\n1"
""", history_id=history_id)
history = self._get('histories/%s/contents' % history_id).json()
flattened_collection = history[-1]
assert flattened_collection['history_content_type'] == 'dataset_collection'
assert flattened_collection['collection_type'] == 'list'
assert flattened_collection['element_count'] == 2
nested_collection = self.dataset_populator.get_history_collection_details(history_id, hid=3)
assert nested_collection['collection_type'] == 'list:list'
assert nested_collection['element_count'] == 1
assert nested_collection['elements'][0]['object']['populated']
assert nested_collection['elements'][0]['object']['element_count'] == 2
[docs] @skip_without_tool("cat")
def test_workflow_invocation_report_1(self):
test_data = """
input_1:
value: 1.bed
type: File
"""
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs(r"""
class: GalaxyWorkflow
inputs:
input_1: data
outputs:
output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat
in:
input1: input_1
""", test_data=test_data, history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
report_json = self.workflow_populator.workflow_report_json(workflow_id, invocation_id)
assert "markdown" in report_json
self._assert_has_keys(report_json , "markdown", "render_format")
assert report_json["render_format"] == "markdown"
markdown_content = report_json["markdown"]
assert "## Workflow Outputs" in markdown_content
assert "## Workflow Inputs" in markdown_content
assert "## About This Report" not in markdown_content
[docs] @skip_without_tool("cat")
def test_workflow_invocation_report_custom(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs(
WORKFLOW_WITH_CUSTOM_REPORT_1,
test_data=WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA,
history_id=history_id
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
downloaded_workflow = self._download_workflow(workflow_id)
assert "report" in downloaded_workflow
report_config = downloaded_workflow["report"]
assert "markdown" in report_config
report_json = self.workflow_populator.workflow_report_json(workflow_id, invocation_id)
assert "markdown" in report_json, "markdown not in report json %s" % report_json
self._assert_has_keys(report_json , "markdown", "render_format")
assert report_json["render_format"] == "markdown"
markdown_content = report_json["markdown"]
assert "## Workflow Outputs" in markdown_content
assert "\n```galaxy\nhistory_dataset_display(history_dataset_id=" in markdown_content
assert "## Workflow Inputs" in markdown_content
assert "## About This Report" in markdown_content
[docs] @skip_without_tool("__APPLY_RULES__")
def test_workflow_run_apply_rules(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_WITH_RULES_1, history_id=history_id, wait=True, assert_ok=True, round_trip_format_conversion=True)
output_content = self.dataset_populator.get_history_collection_details(history_id, hid=6)
rules_test_data.check_example_2(output_content, self.dataset_populator)
[docs] def test_filter_failed_mapping(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input_c: collection
steps:
mixed_collection:
tool_id: exit_code_from_file
state:
input:
$link: input_c
filtered_collection:
tool_id: "__FILTER_FAILED_DATASETS__"
state:
input:
$link: mixed_collection/out_file1
cat:
tool_id: cat1
state:
input1:
$link: filtered_collection
""", test_data="""
input_c:
type: list
elements:
- identifier: i1
content: "0"
- identifier: i2
content: "1"
""", history_id=history_id, wait=True, assert_ok=False)
jobs = summary.jobs
def filter_jobs_by_tool(tool_id):
return [j for j in summary.jobs if j["tool_id"] == tool_id]
assert len(filter_jobs_by_tool("__DATA_FETCH__")) == 1, jobs
assert len(filter_jobs_by_tool("exit_code_from_file")) == 2, jobs
assert len(filter_jobs_by_tool("__FILTER_FAILED_DATASETS__")) == 1, jobs
# Follow proves one job was filtered out of the result of cat1
assert len(filter_jobs_by_tool("cat1")) == 1, jobs
[docs] def test_workflow_request(self):
workflow = self.workflow_populator.load_workflow(name="test_for_queue")
workflow_request, history_id = self._setup_workflow_run(workflow)
url = "workflows/%s/usage" % (workflow_request["workflow_id"])
del workflow_request["workflow_id"]
run_workflow_response = self._post(url, data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
# Give some time for workflow to get scheduled before scanning the history.
time.sleep(5)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] def test_workflow_output_dataset(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
""", test_data={"input1": "hello world"}, history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation , "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 0
assert len(invocation["outputs"]) == 1
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=invocation["outputs"]["wf_output_1"]["id"])
assert "hello world" == output_content.strip()
[docs] @skip_without_tool("cat")
def test_workflow_output_dataset_collection(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1:
type: data_collection_input
collection_type: list
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat
in:
input1: input1
""", test_data="""
input1:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""", history_id=history_id, round_trip_format_conversion=True)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation , "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"])
self._assert_has_keys(output_content , "id", "elements")
assert output_content["collection_type"] == "list"
elements = output_content["elements"]
assert len(elements) == 1
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
[docs] def test_workflow_input_as_output(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
text_input: text
outputs:
wf_output_1:
outputSource: input1
wf_output_param:
outputSource: text_input
steps: []
""", test_data={"input1": "hello world", "text_input": {"value": "A text variable", "type": "raw"}}, history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation , "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 0
assert len(invocation["outputs"]) == 1
assert len(invocation["output_values"]) == 1
assert "wf_output_param" in invocation["output_values"]
assert invocation["output_values"]["wf_output_param"] == "A text variable", invocation["output_values"]
output_content = self.dataset_populator.get_history_dataset_content(history_id, content_id=invocation["outputs"]["wf_output_1"]["id"])
assert output_content == "hello world\n"
[docs] @skip_without_tool("cat")
def test_workflow_input_mapping(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat
in:
input1: input1
""", test_data="""
input1:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
""", history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation , "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"])
self._assert_has_keys(output_content , "id", "elements")
elements = output_content["elements"]
assert len(elements) == 2
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
[docs] @skip_without_tool("collection_creates_pair")
def test_workflow_run_input_mapping_with_output_collections(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
text_input: data
outputs:
wf_output_1:
outputSource: split_up/paired_output
steps:
split_up:
tool_id: collection_creates_pair
in:
input1: text_input
""", test_data="""
text_input:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
""", history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation , "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"])
self._assert_has_keys(output_content , "id", "elements")
assert output_content["collection_type"] == "list:paired", output_content
elements = output_content["elements"]
assert len(elements) == 2
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
jobs_summary_response = self._get("workflows/%s/invocations/%s/jobs_summary" % (workflow_id, invocation_id))
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
assert 'states' in jobs_summary
invocation_states = jobs_summary['states']
assert invocation_states and 'ok' in invocation_states, jobs_summary
assert invocation_states['ok'] == 2, jobs_summary
assert jobs_summary['model'] == 'WorkflowInvocation', jobs_summary
jobs_summary_response = self._get("workflows/%s/invocations/%s/step_jobs_summary" % (workflow_id, invocation_id))
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
assert len(jobs_summary) == 1
collection_summary = jobs_summary[0]
assert 'states' in collection_summary
collection_states = collection_summary['states']
assert collection_states and 'ok' in collection_states, collection_states
assert collection_states['ok'] == 2, collection_summary
assert collection_summary['model'] == 'ImplicitCollectionJobs', collection_summary
[docs] def test_workflow_run_input_mapping_with_subworkflows(self):
with self.dataset_populator.test_history() as history_id:
test_data = """
outer_input:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
"""
summary = self._run_jobs(WORKFLOW_NESTED_SIMPLE, test_data=test_data, history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation_response = self._get("workflows/%s/invocations/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation , "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1, invocation
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(history_id, content_id=invocation["output_collections"]["outer_output"]["id"])
self._assert_has_keys(output_content , "id", "elements")
assert output_content["collection_type"] == "list", output_content
elements = output_content["elements"]
assert len(elements) == 2
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
[docs] @skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_subworkflow_recover_mapping_1(self):
# This test case tests an outer workflow continues to scheduling and handle
# collection mapping properly after the last step of a subworkflow requires delayed
# evaluation. Testing rescheduling and propagating connections within a subworkflow
# is handled by the next test case.
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
workflow_output:
outputSource: random_lines/out_file1
steps:
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
in:
inner_input: first_cat/out_file1
split:
tool_id: split
in:
input1: nested_workflow/workflow_output
second_cat:
tool_id: cat_list
in:
input1: split/output
test_data:
outer_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, round_trip_format_conversion=True)
self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
# self.assertEqual("chr16\t142908\t143003\tCCDS10397.1_cds_0_0_chr16_142909_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_subworkflow_recover_mapping_2(self):
# Like the above test case, this test case tests an outer workflow continues to
# schedule and handle collection mapping properly after a subworkflow needs to be
# delayed, but this also tests recovering and handling scheduling within the subworkflow
# since the delayed step (split) isn't the last step of the subworkflow.
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
workflow_output:
outputSource: inner_cat/out_file1
steps:
random_lines:
tool_id: random_lines1
in:
input: inner_input
num_lines:
default: 2
seed_source|seed_source_selector:
default: set_seed
seed_source|seed:
default: asdf
split:
tool_id: split
in:
input1: random_lines/out_file1
inner_cat:
tool_id: cat1
in:
input1: split/output
in:
inner_input: first_cat/out_file1
second_cat:
tool_id: cat_list
in:
input1: nested_workflow/workflow_output
""", test_data="""
outer_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, round_trip_format_conversion=True)
self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_recover_mapping_in_subworkflow(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
workflow_output:
outputSource: split/output
steps:
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
split:
tool_id: split
in:
input1: random_lines/out_file1
in:
inner_input: first_cat/out_file1
second_cat:
tool_id: cat_list
in:
input1: nested_workflow/workflow_output
""", test_data="""
outer_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, round_trip_format_conversion=True)
self.assertEqual("chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("empty_list")
@skip_without_tool("count_list")
@skip_without_tool("random_lines1")
def test_empty_list_mapping(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
count_list:
outputSource: count_list/out_file1
steps:
empty_list:
tool_id: empty_list
in:
input1: input1
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: empty_list/output
seed_source:
seed_source_selector: set_seed
seed: asdf
count_list:
tool_id: count_list
in:
input1: random_lines/out_file1
""", test_data="""
input1:
value: 1.bed
type: File
""", history_id=history_id, wait=True)
self.assertEqual("0\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("random_lines1")
def test_change_datatype_collection_map_over(self):
with self.dataset_populator.test_history() as history_id:
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
text_input1: collection
steps:
map_over:
tool_id: random_lines1
in:
input: text_input1
outputs:
out_file1:
change_datatype: csv
""", test_data="""
text_input1:
type: "list:paired"
""", history_id=history_id)
hdca = self.dataset_populator.get_history_collection_details(history_id=jobs_summary.history_id, hid=4)
assert hdca['collection_type'] == 'list:paired'
assert len(hdca['elements'][0]['object']["elements"]) == 2
forward, reverse = hdca['elements'][0]['object']["elements"]
assert forward['object']['file_ext'] == 'csv'
assert reverse['object']['file_ext'] == 'csv'
[docs] @skip_without_tool("collection_type_source_map_over")
def test_mapping_and_subcollection_mapping(self):
with self.dataset_populator.test_history() as history_id:
jobs_summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
text_input1: collection
steps:
map_over:
tool_id: collection_type_source_map_over
in:
input_collect: text_input1
""", test_data="""
text_input1:
type: "list:paired"
""", history_id=history_id)
hdca = self.dataset_populator.get_history_collection_details(history_id=jobs_summary.history_id, hid=1)
assert hdca['collection_type'] == 'list:paired'
assert len(hdca['elements'][0]['object']["elements"]) == 2
[docs] @skip_without_tool("empty_list")
@skip_without_tool("count_multi_file")
@skip_without_tool("random_lines1")
def test_empty_list_reduction(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
count_multi_file:
outputSource: count_multi_file/out_file1
steps:
empty_list:
tool_id: empty_list
in:
input1: input1
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: empty_list/output
seed_source:
seed_source_selector: set_seed
seed: asdf
count_multi_file:
tool_id: count_multi_file
in:
input1: random_lines/out_file1
""", test_data="""
input1:
value: 1.bed
type: File
""", history_id=history_id, wait=True, round_trip_format_conversion=True)
self.assertEqual("0\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat")
def test_cancel_new_workflow_when_history_deleted(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# There is no pause of anything in here, so likely the invocation is
# is still in a new state. If it isn't that is fine, continue with the
# test it will just happen to test the same thing as below.
# Wait for all the datasets to complete, make sure the workflow invocation
# is not complete.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
self._delete("histories/%s" % history_id)
invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'cancelled')
assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("cat")
def test_cancel_ready_workflow_when_history_deleted(self):
# Same as previous test but make sure invocation isn't a new state before
# cancelling.
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Wait for all the datasets to complete, make sure the workflow invocation
# is not complete.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
self._delete("histories/%s" % history_id)
invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'cancelled')
assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("cat")
def test_workflow_pause(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
# Review the paused steps to allow the workflow to continue.
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=2, action=True)
# Wait for the workflow to finish scheduling and ensure both the invocation
# and the history are in valid states.
invocation_scheduled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'scheduled')
assert invocation_scheduled, "Workflow state is not scheduled..."
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_tool("cat")
def test_workflow_pause_cancel(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
# Review the paused workflow and cancel it at the paused step.
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=2, action=False)
# Ensure the workflow eventually becomes cancelled.
invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, 'cancelled')
assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("head")
def test_workflow_map_reduce_pause(self):
with self.dataset_populator.test_history() as history_id:
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_map_reduce_pause")
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
hda1 = self.dataset_populator.new_dataset(history_id, content="reviewed\nunreviewed")
hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=["1\n2\n3", "4\n5\n6"]).json()
index_map = {
'0': self._ds_entry(hda1),
'1': self._ds_entry(hdca1),
}
invocation_id = self.__invoke_workflow(history_id, uploaded_workflow_id, index_map)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=4, action=True)
self.wait_for_invocation_and_jobs(history_id, uploaded_workflow_id, invocation_id)
invocation = self._invocation_details(uploaded_workflow_id, invocation_id)
assert invocation['state'] == 'scheduled'
self.assertEqual("reviewed\n1\nreviewed\n4\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("cat")
def test_cancel_workflow_invocation(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
invocation_url = self._api_url("workflows/%s/usage/%s" % (uploaded_workflow_id, invocation_id), use_key=True)
delete_response = delete(invocation_url)
self._assert_status_code_is(delete_response, 200)
invocation = self._invocation_details(uploaded_workflow_id, invocation_id)
assert invocation['state'] == 'cancelled'
[docs] @skip_without_tool("cat")
def test_pause_outputs_with_deleted_inputs(self):
self._deleted_inputs_workflow(purge=False)
[docs] @skip_without_tool("cat")
def test_error_outputs_with_purged_inputs(self):
self._deleted_inputs_workflow(purge=True)
def _deleted_inputs_workflow(self, purge):
# We run a workflow on a collection with a deleted element.
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
second_cat:
tool_id: cat
in:
input1: first_cat/out_file1
""")
DELETED = 0
PAUSED_1 = 3
PAUSED_2 = 5
hdca1 = self.dataset_collection_populator.create_list_in_history(history_id,
contents=[("sample1-1", "1 2 3")]).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
deleted_id = hdca1['elements'][DELETED]['object']['id']
r = self._delete("histories/%s/contents/%s?purge=%s" % (history_id, deleted_id, purge))
label_map = {"input1": self._ds_entry(hdca1)}
workflow_request = dict(
history="hist_id=%s" % history_id,
workflow_id=workflow_id,
ds_map=self._build_ds_map(workflow_id, label_map),
)
r = self._post("workflows", data=workflow_request)
self._assert_status_code_is(r, 200)
# If this starts failing we may have prevented running workflows on collections with deleted members,
# in which case we can disable this test.
self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=False)
contents = self.__history_contents(history_id)
assert contents[DELETED]['deleted']
state = 'error' if purge else 'paused'
assert contents[PAUSED_1]['state'] == state
assert contents[PAUSED_2]['state'] == 'paused'
[docs] def test_run_with_implicit_connection(self):
with self.dataset_populator.test_history() as history_id:
run_summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
third_cat:
tool_id: random_lines1
in:
$step: second_cat
state:
num_lines: 1
input:
$link: test_input
seed_source:
seed_source_selector: set_seed
seed: asdf
""", test_data={"test_input": "hello world"}, history_id=history_id, wait=False, round_trip_format_conversion=True)
history_id = run_summary.history_id
workflow_id = run_summary.workflow_id
invocation_id = run_summary.invocation_id
# Wait for first two jobs to be scheduled - upload and first cat.
wait_on(lambda: len(self._history_jobs(history_id)) >= 2 or None, "history jobs")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation['state'] != 'scheduled', invocation
# Expect two jobs - the upload and first cat. randomlines shouldn't run
# it is implicitly dependent on second cat.
self._assert_history_job_count(history_id, 2)
self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
self._assert_history_job_count(history_id, 4)
[docs] def test_run_with_optional_data_specified_to_multi_data(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_DATA, test_data="""
input1:
value: 1.bed
type: File
""", history_id=history_id, wait=True, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "CCDS989.1_cds_0_0_chr1_147962193_r" in content
[docs] def test_run_with_optional_data_unspecified_to_multi_data(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_DATA, test_data={}, history_id=history_id, wait=True, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "No input selected" in content
[docs] def test_run_with_non_optional_data_unspecified_fails_invocation(self):
with self.dataset_populator.test_history() as history_id:
error = self._run_jobs(WORKFLOW_OPTIONAL_FALSE_INPUT_DATA, test_data={}, history_id=history_id, wait=False, assert_ok=False, expected_response=400)
self._assert_failed_on_non_optional_input(error, "input1")
[docs] def test_run_with_optional_collection_specified(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION, test_data="""
input1:
type: paired
name: the_dataset_pair
elements:
- identifier: forward
value: 1.fastq
type: File
- identifier: reverse
value: 1.fastq
type: File
""", history_id=history_id, wait=True, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "GAATTGATCAGGACATAGGACAACTGTAGGCACCAT" in content
[docs] def test_run_with_optional_collection_unspecified(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION, test_data={}, history_id=history_id, wait=True, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "No input specified." in content
[docs] def test_run_with_non_optional_collection_unspecified_fails_invocation(self):
with self.dataset_populator.test_history() as history_id:
error = self._run_jobs(WORKFLOW_OPTIONAL_FALSE_INPUT_COLLECTION, test_data={}, history_id=history_id, wait=False, assert_ok=False, expected_response=400)
self._assert_failed_on_non_optional_input(error, "input1")
def _assert_failed_on_non_optional_input(self, error, input_name):
assert "err_msg" in error
err_msg = error["err_msg"]
assert input_name in err_msg
assert "is not optional and no input" in err_msg
[docs] def test_run_with_validated_parameter_connection_optional(self):
with self.dataset_populator.test_history() as history_id:
run_summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
text_input: text
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text:
$link: text_input
""", test_data="""
text_input:
value: "abd"
type: raw
""", history_id=history_id, wait=True, round_trip_format_conversion=True)
self.wait_for_invocation_and_jobs(history_id, run_summary.workflow_id, run_summary.invocation_id)
jobs = self._history_jobs(history_id)
assert len(jobs) == 1
[docs] def test_run_with_int_parameter(self):
with self.dataset_populator.test_history() as history_id:
failed = False
try:
self._run_jobs(WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED, test_data="""
data_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, assert_ok=True)
except AssertionError as e:
assert '(int_input) is not optional' in str(e)
failed = True
assert failed
self._run_jobs(WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED, test_data="""
data_input:
value: 1.bed
type: File
int_input:
value: 1
type: raw
""", history_id=history_id, wait=True, assert_ok=True)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len(content.splitlines()) == 1, content
self._run_jobs(WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL, test_data="""
data_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, assert_ok=True)
[docs] def test_run_with_int_parameter_nested(self):
with self.dataset_populator.test_history() as history_id:
workflow = self.workflow_populator.load_workflow_from_resource("test_subworkflow_with_integer_input")
workflow_id = self.workflow_populator.create_workflow(workflow)
hda = self.dataset_populator.new_dataset(history_id, content="1 2 3")
workflow_request = {
'history_id': history_id,
'workflow_id': workflow_id,
'inputs_by': 'name',
'inputs': json.dumps({
'input_dataset': {'src': 'hda', 'id': hda['id']},
'int_parameter': 1,
})
}
invocation_response = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request)
assert invocation_response.status_code == 200, invocation_response.text
self.workflow_populator.wait_for_invocation(workflow_id, invocation_response.json()['id'])
[docs] def test_run_with_validated_parameter_connection_default_values(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT, test_data="""
data_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, assert_ok=True)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len(content.splitlines()) == 3, content
[docs] def test_run_with_validated_parameter_connection_invalid(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
text_input: text
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text:
$link: text_input
""", test_data="""
text_input:
value: ""
type: raw
""", history_id=history_id, wait=True, assert_ok=False)
[docs] def test_run_with_text_input_connection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
data_input: data
text_input: text
steps:
randomlines:
tool_id: random_lines1
state:
num_lines: 1
input:
$link: data_input
seed_source:
seed_source_selector: set_seed
seed:
$link: text_input
""", test_data="""
data_input:
value: 1.bed
type: File
text_input:
value: asdf
type: raw
""", history_id=history_id)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
self.assertEqual("chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n", content)
[docs] def test_run_with_numeric_input_connection(self):
history_id = self.dataset_populator.new_history()
self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: forty_two
tool_id: expression_forty_two
state: {}
- label: consume_expression_parameter
tool_id: cheetah_casting
state:
floattest: 3.14
inttest:
$link: forty_two/out1
test_data: {}
""", history_id=history_id)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
lines = content.split("\n")
assert len(lines) == 4
str_43 = lines[0]
str_4point14 = lines[2]
assert lines[3] == ""
assert int(str_43) == 43
assert abs(float(str_4point14) - 4.14) < .0001
[docs] @skip_without_tool("param_value_from_file")
def test_expression_tool_map_over(self):
history_id = self.dataset_populator.new_history()
self._run_jobs("""
class: GalaxyWorkflow
inputs:
text_input1: collection
steps:
- label: param_out
tool_id: param_value_from_file
in:
input1: text_input1
- label: consume_expression_parameter
tool_id: validation_default
in:
input1: param_out/text_param
outputs:
out_file1:
rename: "replaced_param_collection"
test_data:
text_input1:
type: list
elements:
- identifier: A
content: A
- identifier: B
content: B
""", history_id=history_id)
history_contents = self._get('histories/{history_id}/contents'.format(history_id=history_id)).json()
collection = [c for c in history_contents if c['history_content_type'] == 'dataset_collection' and c['name'] == 'replaced_param_collection'][0]
collection_details = self._get(collection['url']).json()
assert collection_details['element_count'] == 2
elements = collection_details['elements']
assert elements[0]['element_identifier'] == 'A'
assert elements[1]['element_identifier'] == 'B'
element_a_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=elements[0]['object'])
element_b_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=elements[1]['object'])
assert element_a_content.strip() == 'A'
assert element_b_content.strip() == 'B'
[docs] @skip_without_tool('cat1')
def test_workflow_rerun_with_use_cached_job(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run")
# We launch a workflow
with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two:
workflow_request, _ = self._setup_workflow_run(workflow, history_id=history_id_one)
run_workflow_response = self._post("workflows", data=workflow_request).json()
# We copy the workflow inputs to a new history
new_workflow_request = workflow_request.copy()
new_ds_map = json.loads(new_workflow_request['ds_map'])
for key, input_values in run_workflow_response['inputs'].items():
copy_payload = {"content": input_values['id'], "source": "hda", "type": "dataset"}
copy_response = self._post("histories/%s/contents" % history_id_two, data=copy_payload).json()
new_ds_map[key]['id'] = copy_response['id']
new_workflow_request['ds_map'] = json.dumps(new_ds_map, sort_keys=True)
new_workflow_request['history'] = "hist_id=%s" % history_id_two
new_workflow_request['use_cached_job'] = True
# We run the workflow again, it should not produce any new outputs
new_workflow_response = self._post("workflows", data=new_workflow_request).json()
first_wf_output = self._get("datasets/%s" % run_workflow_response['outputs'][0]).json()
second_wf_output = self._get("datasets/%s" % new_workflow_response['outputs'][0]).json()
assert first_wf_output['file_name'] == second_wf_output['file_name'], \
"first output:\n%s\nsecond output:\n%s" % (first_wf_output, second_wf_output)
[docs] @skip_without_tool('cat1')
def test_nested_workflow_rerun_with_use_cached_job(self):
with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two:
test_data = """
outer_input:
value: 1.bed
type: File
"""
run_jobs_summary = self._run_jobs(WORKFLOW_NESTED_SIMPLE, test_data=test_data, history_id=history_id_one)
workflow_request = run_jobs_summary.workflow_request
# We copy the inputs to a new history and re-run the workflow
inputs = json.loads(workflow_request['inputs'])
dataset_type = inputs['outer_input']['src']
dataset_id = inputs['outer_input']['id']
copy_payload = {"content": dataset_id, "source": dataset_type, "type": "dataset"}
copy_response = self._post("histories/%s/contents" % history_id_two, data=copy_payload)
self._assert_status_code_is(copy_response, 200)
new_dataset_id = copy_response.json()['id']
inputs['outer_input']['id'] = new_dataset_id
workflow_request['use_cached_job'] = True
workflow_request['history'] = "hist_id={history_id_two}".format(history_id_two=history_id_two)
workflow_request['inputs'] = json.dumps(inputs)
run_workflow_response = self._post("workflows", data=run_jobs_summary.workflow_request).json()
self.workflow_populator.wait_for_workflow(workflow_request['workflow_id'],
run_workflow_response['id'],
history_id_two,
assert_ok=True)
# Now make sure that the HDAs in each history point to the same dataset instances
history_one_contents = self.__history_contents(history_id_one)
history_two_contents = self.__history_contents(history_id_two)
assert len(history_one_contents) == len(history_two_contents)
for i, (item_one, item_two) in enumerate(zip(history_one_contents, history_two_contents)):
assert item_one['dataset_id'] == item_two['dataset_id'], \
'Dataset ids should match, but "%s" and "%s" are not the same for History item %i.' % (item_one['dataset_id'],
item_two['dataset_id'],
i + 1)
[docs] def test_cannot_run_inaccessible_workflow(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run_cannot_access")
workflow_request, history_id = self._setup_workflow_run(workflow)
with self._different_user():
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 403)
[docs] def test_400_on_invalid_workflow_id(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run_does_not_exist")
workflow_request, history_id = self._setup_workflow_run(workflow)
workflow_request["workflow_id"] = self._random_key()
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 400)
[docs] def test_cannot_run_against_other_users_history(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run_does_not_exist")
workflow_request, history_id = self._setup_workflow_run(workflow)
with self._different_user():
other_history_id = self.dataset_populator.new_history()
workflow_request["history"] = "hist_id=%s" % other_history_id
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 403)
[docs] @skip_without_tool("cat")
@skip_without_tool("cat_list")
def test_workflow_run_with_matching_lists(self):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_matching_lists")
workflow_id = self.workflow_populator.create_workflow(workflow)
with self.dataset_populator.test_history() as history_id:
hdca1 = self.dataset_collection_populator.create_list_in_history(history_id, contents=[("sample1-1", "1 2 3"), ("sample2-1", "7 8 9")]).json()
hdca2 = self.dataset_collection_populator.create_list_in_history(history_id, contents=[("sample1-2", "4 5 6"), ("sample2-2", "0 a b")]).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
label_map = {"list1": self._ds_entry(hdca1), "list2": self._ds_entry(hdca2)}
workflow_request = dict(
history="hist_id=%s" % history_id,
workflow_id=workflow_id,
ds_map=self._build_ds_map(workflow_id, label_map),
)
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self.assertEqual("1 2 3\n4 5 6\n7 8 9\n0 a b\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] def test_workflow_stability(self):
# Run this index stability test with following command:
# ./run_tests.sh test/api/test_workflows.py:WorkflowsApiTestCase.test_workflow_stability
num_tests = 1
for workflow_file in ["test_workflow_topoambigouity", "test_workflow_topoambigouity_auto_laidout"]:
workflow = self.workflow_populator.load_workflow_from_resource(workflow_file)
last_step_map = self._step_map(workflow)
for i in range(num_tests):
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(uploaded_workflow_id)
step_map = self._step_map(downloaded_workflow)
assert step_map == last_step_map
last_step_map = step_map
def _step_map(self, workflow):
# Build dict mapping 'tep index to input name.
step_map = {}
for step_index, step in workflow["steps"].items():
if step["type"] == "data_input":
step_map[step_index] = step["inputs"][0]["name"]
return step_map
[docs] def test_empty_create(self):
response = self._post("workflows")
self._assert_status_code_is(response, 400)
self._assert_error_code_is(response, error_codes.USER_REQUEST_MISSING_PARAMETER)
[docs] def test_invalid_create_multiple_types(self):
data = {
'shared_workflow_id': '1234567890abcdef',
'from_history_id': '1234567890abcdef'
}
response = self._post("workflows", data)
self._assert_status_code_is(response, 400)
self._assert_error_code_is(response, error_codes.USER_REQUEST_INVALID_PARAMETER)
[docs] @skip_without_tool("cat1")
def test_run_with_pja(self):
workflow = self.workflow_populator.load_workflow(name="test_for_pja_run", add_pja=True)
workflow_request, history_id = self._setup_workflow_run(workflow, inputs_by='step_index')
workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced"))
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
assert content["name"] == "foo was replaced"
[docs] @skip_without_tool("output_filter")
def test_optional_workflow_output(self):
with self.dataset_populator.test_history() as history_id:
run_object = self._run_jobs("""
class: GalaxyWorkflow
inputs: []
outputs:
wf_output_1:
outputSource: output_filter/out_1
steps:
output_filter:
tool_id: output_filter
state:
produce_out_1: False
filter_text_1: '1'
produce_collection: False
""", test_data={}, history_id=history_id, wait=False)
self.wait_for_invocation_and_jobs(history_id, run_object.workflow_id, run_object.invocation_id)
contents = self.__history_contents(history_id)
assert len(contents) == 1
okay_dataset = contents[0]
assert okay_dataset["state"] == "ok"
[docs] @skip_without_tool("cat")
def test_run_rename_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
rename: "my new name"
""", test_data="""
input1:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, hid=4, wait=True, assert_ok=True)
name = content["name"]
assert name == "my new name", name
assert content["history_content_type"] == "dataset"
content = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True)
name = content["name"]
assert content["history_content_type"] == "dataset_collection", content
assert name == "my new name", name
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_inputs_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
rename: "#{input1} suffix"
""", test_data="""
input1:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""", history_id=history_id)
content = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True)
name = content["name"]
assert content["history_content_type"] == "dataset_collection", content
assert name == "the_dataset_list suffix", name
[docs] @skip_without_tool("collection_creates_pair")
def test_run_rename_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: collection_creates_pair
in:
input1: input1
outputs:
paired_output:
rename: "my new name"
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=4, wait=True, assert_ok=True)
assert details1['elements'][0]['object']['visible'] is False
assert details1["name"] == "my new name", details1
assert details1["history_content_type"] == "dataset_collection"
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_hide_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
outputs:
output:
hide: true
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True)
assert details1['elements'][0]['object']['visible'] is False
assert details1["name"] == "data 1 (as list)", details1
assert details1["visible"] is False
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_delete_intermediate_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
outputs:
output:
delete_intermediate_datasets: true
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True,
assert_ok=True)
assert details1['elements'][0]['object']['visible'] is False
assert details1["name"] == "data 1 (as list)", details1
# FIXME: this doesn't work because the workflow is still being scheduled
# TODO: Implement a way to run PJAs that couldn't be run during/after the job
# after the workflow has run to completion
assert details1["deleted"] is False
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_change_datatype_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
outputs:
output:
change_datatype: txt
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
""", test_data="""
input1:
value: 1.fasta
type: File
file_type: fasta
name: fasta1
""", history_id=history_id)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True,
assert_ok=True)
assert details1["name"] == "data 1 (as list)", details1
assert details1['elements'][0]['object']['visible'] is False
assert details1['elements'][0]['object']['file_ext'] == 'txt'
details2 = self.dataset_populator.get_history_collection_details(history_id, hid=5, wait=True,
assert_ok=True)
# Also check that we don't overwrite the original HDA's datatype
assert details2['elements'][0]['object']['file_ext'] == 'fasta'
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_rename_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
outputs:
output:
rename: "my new name"
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True,
assert_ok=True)
assert details1['elements'][0]['object']['visible'] is False
assert details1["name"] == "my new name", details1
assert details1["history_content_type"] == "dataset_collection"
[docs] @skip_without_tool("create_2")
def test_run_rename_multiple_outputs(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs: []
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "my new name"
out_file2:
rename: "my other new name"
""", test_data={}, history_id=history_id)
details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=True)
details2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert details1["name"] == "my new name"
assert details2["name"] == "my other new name"
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_input(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_RENAME_ON_INPUT, history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fasta1 suffix", name
[docs] @skip_without_tool("fail_identifier")
@skip_without_tool("cat")
def test_run_rename_when_resuming_jobs(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_fail:
tool_id: fail_identifier
state:
failbool: true
input1:
$link: input1
outputs:
out_file1:
rename: "cat1 out"
cat:
tool_id: cat
in:
input1: first_fail/out_file1
outputs:
out_file1:
rename: "#{input1} suffix"
""", test_data="""
input1:
value: 1.fasta
type: File
name: fail
""", history_id=history_id, wait=True, assert_ok=False)
content = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=False)
name = content["name"]
assert content['state'] == 'error', content
input1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False)
job_id = content['creating_job']
inputs = {"input1": {'values': [{'src': 'hda',
'id': input1['id']}]
},
"failbool": "false",
"rerun_remap_job_id": job_id}
self.dataset_populator.run_tool(tool_id='fail_identifier',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False)
assert unpaused_dataset['state'] == 'ok'
assert unpaused_dataset['name'] == "%s suffix" % name
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_input_recursive(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
rename: "#{input1} #{input1 | upper} suffix"
""", test_data="""
input1:
value: 1.fasta
type: File
name: '#{input1}'
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "#{input1} #{INPUT1} suffix", name
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_input_repeat(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
input2: data
steps:
first_cat:
tool_id: cat
state:
input1:
$link: input1
queries:
- input2:
$link: input2
outputs:
out_file1:
rename: "#{queries_0.input2| basename} suffix"
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
input2:
value: 1.fasta
type: File
name: fasta2
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fasta2 suffix", name
[docs] @skip_without_tool("mapper2")
def test_run_rename_based_on_input_conditional(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
fasta_input: data
fastq_input: data
steps:
mapping:
tool_id: mapper2
state:
fastq_input:
fastq_input_selector: single
fastq_input1:
$link: fastq_input
reference:
$link: fasta_input
outputs:
out_file1:
# Wish it was qualified for conditionals but it doesn't seem to be. -John
# rename: "#{fastq_input.fastq_input1 | basename} suffix"
rename: "#{fastq_input1 | basename} suffix"
""", test_data="""
fasta_input:
value: 1.fasta
type: File
name: fasta1
file_type: fasta
fastq_input:
value: 1.fastqsanger
type: File
name: fastq1
file_type: fastqsanger
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fastq1 suffix", name
[docs] @skip_without_tool("mapper2")
def test_run_rename_based_on_input_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
fasta_input: data
fastq_inputs: data
steps:
mapping:
tool_id: mapper2
state:
fastq_input:
fastq_input_selector: paired_collection
fastq_input1:
$link: fastq_inputs
reference:
$link: fasta_input
outputs:
out_file1:
# Wish it was qualified for conditionals but it doesn't seem to be. -John
# rename: "#{fastq_input.fastq_input1 | basename} suffix"
rename: "#{fastq_input1} suffix"
""", test_data="""
fasta_input:
value: 1.fasta
type: File
name: fasta1
file_type: fasta
fastq_inputs:
type: list
name: the_dataset_pair
elements:
- identifier: forward
value: 1.fastq
type: File
- identifier: reverse
value: 1.fastq
type: File
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "the_dataset_pair suffix", name
[docs] @skip_without_tool("collection_creates_pair")
def test_run_hide_on_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
create_pair:
tool_id: collection_creates_pair
state:
input1:
$link: input1
outputs:
paired_output:
hide: true
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=4, wait=True, assert_ok=True)
assert details1["history_content_type"] == "dataset_collection"
assert not details1["visible"], details1
[docs] @skip_without_tool("cat")
def test_run_hide_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
- id: input1
type: data_collection_input
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
hide: true
""", test_data="""
input1:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""", history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, hid=4, wait=True, assert_ok=True)
assert content["history_content_type"] == "dataset"
assert not content["visible"]
content = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True)
assert content["history_content_type"] == "dataset_collection", content
assert not content["visible"]
[docs] @skip_without_tool("cat")
def test_tag_auto_propagation(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
add_tags:
- "name:treated1fb"
- "group:condition:treated"
- "group:type:single-read"
- "machine:illumina"
second_cat:
tool_id: cat
in:
input1: first_cat/out_file1
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id, round_trip_format_conversion=True)
details0 = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True)
tags = details0["tags"]
assert len(tags) == 4, details0
assert "name:treated1fb" in tags, tags
assert "group:condition:treated" in tags, tags
assert "group:type:single-read" in tags, tags
assert "machine:illumina" in tags, tags
details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=3, wait=True, assert_ok=True)
tags = details1["tags"]
assert len(tags) == 1, details1
assert "name:treated1fb" in tags, tags
[docs] @skip_without_tool("collection_creates_pair")
def test_run_add_tag_on_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
create_pair:
tool_id: collection_creates_pair
in:
input1: input1
outputs:
paired_output:
add_tags:
- "name:foo"
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id, round_trip_format_conversion=True)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=4, wait=True, assert_ok=True)
assert details1["history_content_type"] == "dataset_collection"
assert details1["tags"][0] == "name:foo", details1
[docs] @skip_without_tool("cat")
def test_run_add_tag_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
add_tags:
- "name:foo"
""", test_data="""
input1:
type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""", history_id=history_id, round_trip_format_conversion=True)
details1 = self.dataset_populator.get_history_collection_details(history_id, hid=3, wait=True, assert_ok=True)
assert details1["history_content_type"] == "dataset_collection"
assert details1["tags"][0] == "name:foo", details1
[docs] @skip_without_tool("cat")
def test_assign_column_pja(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
change_datatype: bed
set_columns:
chromCol: 1
endCol: 2
startCol: 3
""", test_data="""
input1:
value: 1.bed
type: File
""", history_id=history_id)
details_dataset_new_col = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True)
assert details_dataset_new_col["history_content_type"] == "dataset", details_dataset_new_col
assert details_dataset_new_col['metadata_endCol'] == 2
assert details_dataset_new_col['metadata_startCol'] == 3
[docs] @skip_without_tool("collection_creates_pair")
@skip_without_tool("cat")
def test_run_remove_tag_on_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
add_tags:
- "name:foo"
create_pair:
tool_id: collection_creates_pair
in:
input1: first_cat/out_file1
outputs:
paired_output:
remove_tags:
- "name:foo"
""", test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""", history_id=history_id, round_trip_format_conversion=True)
details_dataset_with_tag = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True)
assert details_dataset_with_tag["history_content_type"] == "dataset", details_dataset_with_tag
assert details_dataset_with_tag["tags"][0] == "name:foo", details_dataset_with_tag
details_collection_without_tag = self.dataset_populator.get_history_collection_details(history_id, hid=5, wait=True, assert_ok=True)
assert details_collection_without_tag["history_content_type"] == "dataset_collection", details_collection_without_tag
assert len(details_collection_without_tag["tags"]) == 0, details_collection_without_tag
[docs] @skip_without_tool("cat1")
def test_run_with_runtime_pja(self):
workflow = self.workflow_populator.load_workflow(name="test_for_pja_runtime")
uuid0, uuid1, uuid2 = str(uuid4()), str(uuid4()), str(uuid4())
workflow["steps"]["0"]["uuid"] = uuid0
workflow["steps"]["1"]["uuid"] = uuid1
workflow["steps"]["2"]["uuid"] = uuid2
workflow_request, history_id = self._setup_workflow_run(workflow, inputs_by='step_index')
workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced"))
pja_map = {
"RenameDatasetActionout_file1": dict(
action_type="RenameDatasetAction",
output_name="out_file1",
action_arguments=dict(newname="foo ${replaceme}"),
)
}
workflow_request["parameters"] = dumps({
uuid2: {"__POST_JOB_ACTIONS__": pja_map}
})
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
assert content["name"] == "foo was replaced", content["name"]
# Test for regression of previous behavior where runtime post job actions
# would be added to the original workflow post job actions.
workflow_id = workflow_request["workflow_id"]
downloaded_workflow = self._download_workflow(workflow_id)
pjas = list(downloaded_workflow["steps"]["2"]["post_job_actions"].values())
assert len(pjas) == 0, len(pjas)
[docs] @skip_without_tool("cat1")
def test_run_with_delayed_runtime_pja(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
""", round_trip_format_conversion=True)
downloaded_workflow = self._download_workflow(workflow_id)
uuid_dict = dict((int(index), step["uuid"]) for index, step in downloaded_workflow["steps"].items())
with self.dataset_populator.test_history() as history_id:
hda = self.dataset_populator.new_dataset(history_id, content="1 2 3")
self.dataset_populator.wait_for_history(history_id)
inputs = {
'0': self._ds_entry(hda),
}
uuid2 = uuid_dict[3]
workflow_request = {}
workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced"))
pja_map = {
"RenameDatasetActionout_file1": dict(
action_type="RenameDatasetAction",
output_name="out_file1",
action_arguments=dict(newname="foo ${replaceme}"),
)
}
workflow_request["parameters"] = dumps({
uuid2: {"__POST_JOB_ACTIONS__": pja_map}
})
invocation_id = self.__invoke_workflow(history_id, workflow_id, inputs=inputs, request=workflow_request)
time.sleep(2)
self.dataset_populator.wait_for_history(history_id)
self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True)
self.workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id)
time.sleep(1)
content = self.dataset_populator.get_history_dataset_details(history_id)
assert content["name"] == "foo was replaced", content["name"]
[docs] @skip_without_tool("cat1")
def test_delete_intermediate_datasets_pja_1(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: third_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
second_cat:
tool_id: cat1
in:
input1: first_cat/out_file1
third_cat:
tool_id: cat1
in:
input1: second_cat/out_file1
outputs:
out_file1:
delete_intermediate_datasets: true
""", test_data={"input1": "hello world"}, history_id=history_id)
hda1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
hda3 = self.dataset_populator.get_history_dataset_details(history_id, hid=3)
hda4 = self.dataset_populator.get_history_dataset_details(history_id, hid=4)
assert not hda1["deleted"]
assert hda2["deleted"]
# I think hda3 should be deleted, but the inputs to
# steps with workflow outputs are not deleted.
# assert hda3["deleted"]
print(hda3["deleted"])
assert not hda4["deleted"]
[docs] @skip_without_tool("cat1")
def test_validated_post_job_action_validated(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
post_job_actions:
ValidateOutputsAction:
action_type: ValidateOutputsAction
""", test_data={"input1": {"type": "File", "file_type": "fastqsanger", "value": "1.fastqsanger"}}, history_id=history_id)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert hda2["validated_state"] == "ok"
[docs] @skip_without_tool("cat1")
def test_validated_post_job_action_unvalidated_default(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
""", test_data={"input1": {"type": "File", "file_type": "fastqsanger", "value": "1.fastqsanger"}}, history_id=history_id)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert hda2["validated_state"] == "unknown"
[docs] @skip_without_tool("cat1")
def test_validated_post_job_action_invalid(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
post_job_actions:
ValidateOutputsAction:
action_type: ValidateOutputsAction
""", test_data={"input1": {"type": "File", "file_type": "fastqcssanger", "value": "1.fastqsanger"}}, history_id=history_id)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert hda2["validated_state"] == "invalid"
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_by_tool(self):
workflow_request, history_id = self._setup_random_x2_workflow("test_for_replace_tool_params")
workflow_request["parameters"] = dumps(dict(random_lines1=dict(num_lines=5)))
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Would be 8 and 6 without modification
self.__assert_lines_hid_line_count_is(history_id, 2, 5)
self.__assert_lines_hid_line_count_is(history_id, 3, 5)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_by_uuid(self):
workflow_request, history_id = self._setup_random_x2_workflow("test_for_replace_tool_params")
workflow_request["parameters"] = dumps({
"58dffcc9-bcb7-4117-a0e1-61513524b3b1": dict(num_lines=4),
"58dffcc9-bcb7-4117-a0e1-61513524b3b2": dict(num_lines=3),
})
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Would be 8 and 6 without modification
self.__assert_lines_hid_line_count_is(history_id, 2, 4)
self.__assert_lines_hid_line_count_is(history_id, 3, 3)
[docs] @skip_without_tool("cat1")
@skip_without_tool("addValue")
def test_run_batch(self):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_batch")
workflow_id = self.workflow_populator.create_workflow(workflow)
with self.dataset_populator.test_history() as history_id:
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6")
hda3 = self.dataset_populator.new_dataset(history_id, content="7 8 9")
hda4 = self.dataset_populator.new_dataset(history_id, content="10 11 12")
parameters = {
"0": {"input": {"batch": True, "values": [{"id" : hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"},
{"id" : hda2.get("id"), "hid": hda2.get("hid"), "src": "hda"},
{"id" : hda3.get("id"), "hid": hda2.get("hid"), "src": "hda"},
{"id" : hda4.get("id"), "hid": hda2.get("hid"), "src": "hda"}]}},
"1": {"input": {"batch": False, "values": [{"id" : hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}]}, "exp": "2"}}
workflow_request = {
"history_id" : history_id,
"batch" : True,
"parameters_normalized": True,
"parameters" : dumps(parameters),
}
invocation_response = self._post("workflows/%s/usage" % workflow_id, data=workflow_request)
self._assert_status_code_is(invocation_response, 200)
time.sleep(5)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
r1 = "1 2 3\t1\n1 2 3\t2\n"
r2 = "4 5 6\t1\n1 2 3\t2\n"
r3 = "7 8 9\t1\n1 2 3\t2\n"
r4 = "10 11 12\t1\n1 2 3\t2\n"
t1 = self.dataset_populator.get_history_dataset_content(history_id, hid=7)
t2 = self.dataset_populator.get_history_dataset_content(history_id, hid=10)
t3 = self.dataset_populator.get_history_dataset_content(history_id, hid=13)
t4 = self.dataset_populator.get_history_dataset_content(history_id, hid=16)
self.assertEqual(r1, t1)
self.assertEqual(r2, t2)
self.assertEqual(r3, t3)
self.assertEqual(r4, t4)
[docs] @skip_without_tool("validation_default")
def test_parameter_substitution_sanitization(self):
substitions = dict(input1="\" ; echo \"moo")
run_workflow_response, history_id = self._run_validation_workflow_with_substitions(substitions)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self.assertEqual("__dq__ X echo __dq__moo\n", self.dataset_populator.get_history_dataset_content(history_id, hid=1))
[docs] @skip_without_tool("validation_repeat")
def test_parameter_substitution_validation_value_errors_0(self):
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text: "abd"
""")
workflow_request = dict(
history="hist_id=%s" % history_id,
parameters=dumps(dict(validation_repeat={"r2_0|text": ""}))
)
url = "workflows/%s/invocations" % workflow_id
invocation_response = self._post(url, data=workflow_request)
# Take a valid stat and make it invalid, assert workflow won't run.
self._assert_status_code_is(invocation_response, 400)
[docs] @skip_without_tool("validation_default")
def test_parameter_substitution_validation_value_errors_1(self):
substitions = dict(select_param="\" ; echo \"moo")
run_workflow_response, history_id = self._run_validation_workflow_with_substitions(substitions)
self._assert_status_code_is(run_workflow_response, 400)
[docs] @skip_without_tool("validation_repeat")
def test_workflow_import_state_validation_1(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text: ""
""", history_id=history_id, wait=False, expected_response=400, assert_ok=False)
def _run_validation_workflow_with_substitions(self, substitions):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_validation_1")
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
history_id = self.dataset_populator.new_history()
workflow_request = dict(
history="hist_id=%s" % history_id,
workflow_id=uploaded_workflow_id,
parameters=dumps(dict(validation_default=substitions))
)
run_workflow_response = self._post("workflows", data=workflow_request)
return run_workflow_response, history_id
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_by_steps(self):
workflow_request, history_id, steps = self._setup_random_x2_workflow_steps("test_for_replace_step_params")
params = dumps({str(steps[1]["id"]): dict(num_lines=5)})
workflow_request["parameters"] = params
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Would be 8 and 6 without modification
self.__assert_lines_hid_line_count_is(history_id, 2, 8)
self.__assert_lines_hid_line_count_is(history_id, 3, 5)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_nested(self):
workflow_request, history_id, steps = self._setup_random_x2_workflow_steps("test_for_replace_step_params_nested")
seed_source = dict(
seed_source_selector="set_seed",
seed="moo",
)
params = dumps({str(steps[0]["id"]): dict(num_lines=1, seed_source=seed_source),
str(steps[1]["id"]): dict(num_lines=1, seed_source=seed_source)})
workflow_request["parameters"] = params
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self.assertEqual("2\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_nested_normalized(self):
workflow_request, history_id, steps = self._setup_random_x2_workflow_steps("test_for_replace_step_normalized_params_nested")
parameters = {
"num_lines": 1,
"seed_source|seed_source_selector": "set_seed",
"seed_source|seed": "moo",
}
params = dumps({str(steps[0]["id"]): parameters,
str(steps[1]["id"]): parameters})
workflow_request["parameters"] = params
workflow_request["parameters_normalized"] = False
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self.assertEqual("2\n", self.dataset_populator.get_history_dataset_content(history_id))
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_over_default(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_ONE_STEP_DEFAULT, test_data="""
step_parameters:
'1':
num_lines: 4
input:
value: 1.bed
type: File
""", history_id=history_id, wait=True, assert_ok=True, round_trip_format_conversion=True)
result = self.dataset_populator.get_history_dataset_content(history_id)
assert result.count("\n") == 4
[docs] @skip_without_tool("random_lines1")
def test_defaults_editor(self):
workflow_id = self._upload_yaml_workflow(WORKFLOW_ONE_STEP_DEFAULT, publish=True)
workflow_object = self._download_workflow(workflow_id, style="editor")
put_response = self._update_workflow(workflow_id, workflow_object)
assert put_response.status_code == 200
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_over_default_delayed(self):
with self.dataset_populator.test_history() as history_id:
run_summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input: data
steps:
first_cat:
tool_id: cat1
in:
input1: input
the_pause:
type: pause
in:
input: first_cat/out_file1
randomlines:
tool_id: random_lines1
in:
input: the_pause
num_lines:
default: 6
""", test_data="""
step_parameters:
'3':
num_lines: 4
input:
value: 1.bed
type: File
""", history_id=history_id, wait=False)
wait_on(lambda: len(self._history_jobs(history_id)) >= 2 or None, "history jobs")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
workflow_id = run_summary.workflow_id
invocation_id = run_summary.invocation_id
self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
result = self.dataset_populator.get_history_dataset_content(history_id)
assert result.count("\n") == 4
[docs] def test_pja_import_export(self):
workflow = self.workflow_populator.load_workflow(name="test_for_pja_import", add_pja=True)
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(uploaded_workflow_id)
self._assert_has_keys(downloaded_workflow["steps"], "0", "1", "2")
pjas = list(downloaded_workflow["steps"]["2"]["post_job_actions"].values())
assert len(pjas) == 1, len(pjas)
pja = pjas[0]
self._assert_has_keys(pja, "action_type", "output_name", "action_arguments")
[docs] @skip_without_tool("cat1")
def test_only_own_invocations_indexed_and_accessible(self):
workflow_id, usage = self._run_workflow_once_get_invocation("test_usage")
with self._different_user():
usage_details_response = self._get("workflows/%s/usage/%s" % (workflow_id, usage["id"]))
self._assert_status_code_is(usage_details_response, 403)
index_response = self._get("workflows/%s/invocations" % workflow_id)
self._assert_status_code_is(index_response, 200)
assert len(index_response.json()) == 0
invocation_ids = self._all_user_invocation_ids()
assert usage["id"] in invocation_ids
with self._different_user():
invocation_ids = self._all_user_invocation_ids()
assert usage["id"] not in invocation_ids
[docs] @skip_without_tool("cat1")
def test_invocation_usage(self):
workflow_id, usage = self._run_workflow_once_get_invocation("test_usage")
invocation_id = usage["id"]
usage_details = self._invocation_details(workflow_id, invocation_id)
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id", "history_id")
# Check invocations for this workflow invocation by history and regardless of history.
history_invocations_response = self._get("invocations", {"history_id": usage_details["history_id"]})
self._assert_status_code_is(history_invocations_response, 200)
assert len(history_invocations_response.json()) == 1
assert history_invocations_response.json()[0]["id"] == invocation_id
# Check history invocations for this workflow invocation.
invocation_ids = self._all_user_invocation_ids()
assert invocation_id in invocation_ids
# Wait for the invocation to be fully scheduled, so we have details on all steps.
self._wait_for_invocation_state(workflow_id, invocation_id, 'scheduled')
usage_details = self._invocation_details(workflow_id, invocation_id)
invocation_steps = usage_details["steps"]
invocation_input_step, invocation_tool_step = None, None
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1, 2], order_index
if order_index == 0:
invocation_input_step = invocation_step
elif order_index == 2:
invocation_tool_step = invocation_step
# Tool steps have non-null job_ids (deprecated though they may be)
assert invocation_input_step.get("job_id", None) is None
job_id = invocation_tool_step.get("job_id", None)
assert job_id is not None
invocation_tool_step_id = invocation_tool_step["id"]
invocation_tool_step_response = self._get("workflows/%s/invocations/%s/steps/%s" % (workflow_id, invocation_id, invocation_tool_step_id))
self._assert_status_code_is(invocation_tool_step_response, 200)
self._assert_has_keys(invocation_tool_step_response.json(), "id", "order_index", "job_id")
assert invocation_tool_step_response.json()["job_id"] == job_id
[docs] def test_invocation_with_collection_mapping(self):
workflow_id, invocation_id = self._run_mapping_workflow()
usage_details = self._invocation_details(workflow_id, invocation_id)
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
invocation_steps = usage_details["steps"]
invocation_input_step, invocation_tool_step = None, None
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1]
if invocation_step["order_index"] == 0:
assert invocation_input_step is None
invocation_input_step = invocation_step
else:
assert invocation_tool_step is None
invocation_tool_step = invocation_step
# Tool steps have non-null job_ids (deprecated though they may be)
assert invocation_input_step.get("job_id", None) is None
assert invocation_tool_step.get("job_id", None) is None
assert invocation_tool_step["state"] == "scheduled"
usage_details = self._invocation_details(workflow_id, invocation_id, legacy_job_state="true")
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
invocation_steps = usage_details["steps"]
invocation_input_step = None
invocation_tool_steps = []
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1]
if invocation_step["order_index"] == 0:
assert invocation_input_step is None
invocation_input_step = invocation_step
else:
invocation_tool_steps.append(invocation_step)
assert len(invocation_tool_steps) == 2
assert invocation_tool_steps[0]["state"] == "ok"
def _run_mapping_workflow(self):
history_id = self.dataset_populator.new_history()
summary = self._run_jobs("""
class: GalaxyWorkflow
inputs:
input_c: collection
steps:
cat1:
tool_id: cat1
in:
input1: input_c
""", test_data="""
input_c:
type: list
elements:
- identifier: i1
content: "0"
- identifier: i2
content: "1"
""", history_id=history_id, wait=True, assert_ok=True)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
return workflow_id, invocation_id
[docs] @skip_without_tool("cat1")
def test_invocations_accessible_imported_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
other_id = other_import_response.json()["id"]
workflow_request, history_id = self._setup_workflow_run(workflow_id=other_id)
response = self._get("workflows/%s/usage" % other_id)
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
run_workflow_response = run_workflow_response.json()
invocation_id = run_workflow_response['id']
usage_details_response = self._get("workflows/%s/usage/%s" % (other_id, invocation_id))
self._assert_status_code_is(usage_details_response, 200)
[docs] @skip_without_tool("cat1")
def test_invocations_accessible_published_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True)
with self._different_user():
workflow_request, history_id = self._setup_workflow_run(workflow_id=workflow_id)
workflow_request['workflow_id'] = workflow_request.pop('workflow_id')
response = self._get("workflows/%s/usage" % workflow_id)
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
run_workflow_response = run_workflow_response.json()
invocation_id = run_workflow_response['id']
usage_details_response = self._get("workflows/%s/usage/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(usage_details_response, 200)
[docs] @skip_without_tool("cat1")
def test_invocations_not_accessible_by_different_user_for_published_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True)
workflow_request, history_id = self._setup_workflow_run(workflow_id=workflow_id)
workflow_request['workflow_id'] = workflow_request.pop('workflow_id')
response = self._get("workflows/%s/usage" % workflow_id)
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
run_workflow_response = run_workflow_response.json()
invocation_id = run_workflow_response['id']
with self._different_user():
usage_details_response = self._get("workflows/%s/usage/%s" % (workflow_id, invocation_id))
self._assert_status_code_is(usage_details_response, 403)
def _invoke_paused_workflow(self, history_id):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_pause")
workflow_id = self.workflow_populator.create_workflow(workflow)
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
index_map = {
'0': self._ds_entry(hda1),
}
invocation_id = self.__invoke_workflow(
history_id,
workflow_id,
index_map,
)
return workflow_id, invocation_id
def _wait_for_invocation_non_new(self, workflow_id, invocation_id):
target_state_reached = False
for i in range(50):
invocation = self._invocation_details(workflow_id, invocation_id)
if invocation['state'] != 'new':
target_state_reached = True
break
time.sleep(.25)
return target_state_reached
def _assert_invocation_non_terminal(self, workflow_id, invocation_id):
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation['state'] in ['ready', 'new'], invocation
def _wait_for_invocation_state(self, workflow_id, invocation_id, target_state):
target_state_reached = False
for i in range(25):
invocation = self._invocation_details(workflow_id, invocation_id)
if invocation['state'] == target_state:
target_state_reached = True
break
time.sleep(.5)
return target_state_reached
def _update_workflow(self, workflow_id, workflow_object):
return self.workflow_populator.update_workflow(workflow_id, workflow_object)
def _invocation_step_details(self, workflow_id, invocation_id, step_id):
invocation_step_response = self._get("workflows/%s/usage/%s/steps/%s" % (workflow_id, invocation_id, step_id))
self._assert_status_code_is(invocation_step_response, 200)
invocation_step_details = invocation_step_response.json()
return invocation_step_details
def _execute_invocation_step_action(self, workflow_id, invocation_id, step_id, action):
raw_url = "workflows/%s/usage/%s/steps/%s" % (workflow_id, invocation_id, step_id)
url = self._api_url(raw_url, use_key=True)
payload = dumps(dict(action=action))
action_response = put(url, data=payload)
self._assert_status_code_is(action_response, 200)
invocation_step_details = action_response.json()
return invocation_step_details
def _run_workflow_once_get_invocation(self, name):
workflow = self.workflow_populator.load_workflow(name=name)
workflow_request, history_id = self._setup_workflow_run(workflow)
workflow_id = workflow_request["workflow_id"]
response = self._get("workflows/%s/usage" % workflow_id)
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self._post("workflows", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 200)
response = self._get("workflows/%s/usage" % workflow_id)
self._assert_status_code_is(response, 200)
usages = response.json()
assert len(usages) == 1
return workflow_id, usages[0]
def _setup_random_x2_workflow_steps(self, name):
workflow_request, history_id = self._setup_random_x2_workflow("test_for_replace_step_params")
random_line_steps = self._random_lines_steps(workflow_request)
return workflow_request, history_id, random_line_steps
def _random_lines_steps(self, workflow_request):
workflow_summary_response = self._get("workflows/%s" % workflow_request["workflow_id"])
self._assert_status_code_is(workflow_summary_response, 200)
steps = workflow_summary_response.json()["steps"]
return sorted((step for step in steps.values() if step["tool_id"] == "random_lines1"), key=lambda step: step["id"])
def _setup_random_x2_workflow(self, name):
workflow = self.workflow_populator.load_random_x2_workflow(name)
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
workflow_inputs = self._workflow_inputs(uploaded_workflow_id)
key = next(iter(workflow_inputs.keys()))
history_id = self.dataset_populator.new_history()
ten_lines = "\n".join(str(_) for _ in range(10))
hda1 = self.dataset_populator.new_dataset(history_id, content=ten_lines)
workflow_request = dict(
history="hist_id=%s" % history_id,
workflow_id=uploaded_workflow_id,
ds_map=dumps({
key: self._ds_entry(hda1),
}),
)
return workflow_request, history_id
def __review_paused_steps(self, uploaded_workflow_id, invocation_id, order_index, action=True):
invocation = self._invocation_details(uploaded_workflow_id, invocation_id)
invocation_steps = invocation["steps"]
pause_steps = [s for s in invocation_steps if s['order_index'] == order_index]
for pause_step in pause_steps:
pause_step_id = pause_step['id']
self._execute_invocation_step_action(uploaded_workflow_id, invocation_id, pause_step_id, action=action)
def __assert_lines_hid_line_count_is(self, history, hid, lines):
contents_url = "histories/%s/contents" % history
history_contents = self.__history_contents(history)
hda_summary = next(hc for hc in history_contents if hc["hid"] == hid)
hda_info_response = self._get("%s/%s" % (contents_url, hda_summary["id"]))
self._assert_status_code_is(hda_info_response, 200)
self.assertEqual(hda_info_response.json()["metadata_data_lines"], lines)
def __history_contents(self, history_id):
contents_url = "histories/%s/contents" % history_id
history_contents_response = self._get(contents_url)
self._assert_status_code_is(history_contents_response, 200)
return history_contents_response.json()
def __invoke_workflow(self, *args, **kwds):
return self.workflow_populator.invoke_workflow(*args, **kwds)
def __import_workflow(self, workflow_id, deprecated_route=False):
if deprecated_route:
route = "workflows/import"
import_data = dict(
workflow_id=workflow_id,
)
else:
route = "workflows"
import_data = dict(
shared_workflow_id=workflow_id,
)
return self._post(route, import_data)
def _show_workflow(self, workflow_id):
show_response = self._get("workflows/%s" % workflow_id)
self._assert_status_code_is(show_response, 200)
return show_response.json()
def _assert_looks_like_instance_workflow_representation(self, workflow):
self._assert_has_keys(
workflow,
'url',
'owner',
'inputs',
'annotation',
'steps'
)
for step in workflow["steps"].values():
self._assert_has_keys(
step,
'id',
'type',
'tool_id',
'tool_version',
'annotation',
'tool_inputs',
'input_steps',
)
def _all_user_invocation_ids(self):
all_invocations_for_user = self._get("invocations")
self._assert_status_code_is(all_invocations_for_user, 200)
invocation_ids = [i["id"] for i in all_invocations_for_user.json()]
return invocation_ids
[docs]class AdminWorkflowsApiTestCase(BaseWorkflowsApiTestCase):
require_admin_user = True
[docs] def test_import_export_dynamic_tools(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
- type: input
label: input1
- tool_id: cat1
label: first_cat
state:
input1:
$link: 0
- label: embed1
run:
class: GalaxyTool
command: echo 'hello world 2' > $output1
outputs:
output1:
format: txt
- tool_id: cat1
state:
input1:
$link: first_cat/out_file1
queries:
- input2:
$link: embed1/output1
test_data:
input1: "hello world"
""")
downloaded_workflow = self._download_workflow(workflow_id)
response = self.workflow_populator.create_workflow_response(downloaded_workflow)
workflow_id = response.json()["id"]
history_id = self.dataset_populator.new_history()
hda1 = self.dataset_populator.new_dataset(history_id, content="Hello World Second!")
workflow_request = dict(
inputs_by="name",
inputs=json.dumps({'input1': self._ds_entry(hda1)}),
)
invocation_id = self.workflow_populator.invoke_workflow(history_id, workflow_id, request=workflow_request, assert_ok=True)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
self.assertEqual("Hello World Second!\nhello world 2\n", self.dataset_populator.get_history_dataset_content(history_id, hid=4))