Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_workflows
import base64
import json
import os
import shutil
import time
from json import dumps
from tempfile import mkdtemp
from typing import (
Any,
cast,
Dict,
Optional,
Tuple,
Union,
)
from uuid import uuid4
import pytest
import yaml
from requests import (
delete,
get,
post,
put,
)
from galaxy.exceptions import error_codes
from galaxy.util import UNKNOWN
from galaxy_test.base import rules_test_data
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
RunJobsSummary,
skip_without_tool,
wait_on,
workflow_str,
WorkflowPopulator,
)
from galaxy_test.base.workflow_fixtures import (
NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE,
WORKFLOW_INPUTS_AS_OUTPUTS,
WORKFLOW_NESTED_REPLACEMENT_PARAMETER,
WORKFLOW_NESTED_RUNTIME_PARAMETER,
WORKFLOW_NESTED_SIMPLE,
WORKFLOW_ONE_STEP_DEFAULT,
WORKFLOW_OPTIONAL_FALSE_INPUT_COLLECTION,
WORKFLOW_OPTIONAL_FALSE_INPUT_DATA,
WORKFLOW_OPTIONAL_INPUT_DELAYED_SCHEDULING,
WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION,
WORKFLOW_OPTIONAL_TRUE_INPUT_DATA,
WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL,
WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED,
WORKFLOW_RENAME_ON_INPUT,
WORKFLOW_RUNTIME_PARAMETER_AFTER_PAUSE,
WORKFLOW_WITH_BAD_COLUMN_PARAMETER,
WORKFLOW_WITH_BAD_COLUMN_PARAMETER_GOOD_TEST_DATA,
WORKFLOW_WITH_CUSTOM_REPORT_1,
WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA,
WORKFLOW_WITH_DEFAULT_FILE_DATASET_INPUT,
WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION,
WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION,
WORKFLOW_WITH_OUTPUT_COLLECTION,
WORKFLOW_WITH_OUTPUT_COLLECTION_MAPPING,
WORKFLOW_WITH_RULES_1,
WORKFLOW_WITH_STEP_DEFAULT_FILE_DATASET_INPUT,
)
from ._framework import ApiTestCase
from .sharable import SharingApiTests
WORKFLOW_SIMPLE = """
class: GalaxyWorkflow
name: Simple Workflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
"""
NESTED_WORKFLOW_AUTO_LABELS_MODERN_SYNTAX = """
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
- id: inner_input
outputs:
- outputSource: 1/out_file1
steps:
random:
tool_id: random_lines1
state:
num_lines: 1
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
in:
inner_input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: nested_workflow/1:out_file1
queries_0|input2: nested_workflow/1:out_file1
"""
[docs]class RunsWorkflowFixtures:
workflow_populator: WorkflowPopulator
def _run_workflow_with_inputs_as_outputs(self, history_id: str) -> RunJobsSummary:
summary = self.workflow_populator.run_workflow(
WORKFLOW_INPUTS_AS_OUTPUTS,
test_data={"input1": "hello world", "text_input": {"value": "A text variable", "type": "raw"}},
history_id=history_id,
)
return summary
def _run_workflow_with_output_collections(self, history_id: str) -> RunJobsSummary:
summary = self.workflow_populator.run_workflow(
WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION,
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""",
history_id=history_id,
round_trip_format_conversion=True,
)
return summary
def _run_workflow_with_runtime_data_column_parameter(self, history_id: str) -> RunJobsSummary:
return self.workflow_populator.run_workflow(
WORKFLOW_WITH_BAD_COLUMN_PARAMETER,
test_data=WORKFLOW_WITH_BAD_COLUMN_PARAMETER_GOOD_TEST_DATA,
history_id=history_id,
)
def _run_workflow_once_get_invocation(self, name: str):
workflow = self.workflow_populator.load_workflow(name=name)
workflow_request, history_id, workflow_id = self.workflow_populator.setup_workflow_run(workflow)
usages = self.workflow_populator.workflow_invocations(workflow_id)
assert len(usages) == 0
self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request, assert_ok=True)
usages = self.workflow_populator.workflow_invocations(workflow_id)
assert len(usages) == 1
return workflow_id, usages[0]
[docs]class BaseWorkflowsApiTestCase(ApiTestCase, RunsWorkflowFixtures):
# TODO: Find a new file for this class.
dataset_populator: DatasetPopulator
[docs] def setUp(self):
super().setUp()
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
def _assert_user_has_workflow_with_name(self, name):
names = self._workflow_names()
assert name in names, f"No workflows with name {name} in users workflows <{names}>"
def _workflow_names(self):
index_response = self._get("workflows")
self._assert_status_code_is(index_response, 200)
names = [w["name"] for w in index_response.json()]
return names
[docs] def import_workflow(self, workflow, **kwds):
upload_response = self.workflow_populator.import_workflow(workflow, **kwds)
return upload_response
def _upload_yaml_workflow(self, has_yaml, **kwds) -> str:
return self.workflow_populator.upload_yaml_workflow(has_yaml, **kwds)
def _setup_workflow_run(
self,
workflow: Optional[Dict[str, Any]] = None,
inputs_by: str = "step_id",
history_id: Optional[str] = None,
workflow_id: Optional[str] = None,
) -> Tuple[Dict[str, Any], str, str]:
return self.workflow_populator.setup_workflow_run(workflow, inputs_by, history_id, workflow_id)
def _ds_entry(self, history_content):
return self.dataset_populator.ds_entry(history_content)
def _invocation_details(self, workflow_id, invocation_id, **kwds):
invocation_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}", data=kwds)
self._assert_status_code_is(invocation_details_response, 200)
invocation_details = invocation_details_response.json()
return invocation_details
def _run_jobs(self, has_workflow, history_id: str, **kwds) -> Union[Dict[str, Any], RunJobsSummary]:
return self.workflow_populator.run_workflow(has_workflow, history_id=history_id, **kwds)
def _run_workflow(self, has_workflow, history_id: str, **kwds) -> RunJobsSummary:
assert "expected_response" not in kwds
run_summary = self.workflow_populator.run_workflow(has_workflow, history_id=history_id, **kwds)
return cast(RunJobsSummary, run_summary)
def _history_jobs(self, history_id):
return self._get("jobs", {"history_id": history_id, "order_by": "create_time"}).json()
def _assert_history_job_count(self, history_id, n):
jobs = self._history_jobs(history_id)
assert len(jobs) == n
def _download_workflow(self, workflow_id, style=None, history_id=None):
return self.workflow_populator.download_workflow(workflow_id, style=style, history_id=history_id)
def _assert_is_runtime_input(self, tool_state_value):
if not isinstance(tool_state_value, dict):
tool_state_value = json.loads(tool_state_value)
assert isinstance(tool_state_value, dict)
assert "__class__" in tool_state_value
assert tool_state_value["__class__"] == "RuntimeValue"
[docs]class ChangeDatatypeTests:
dataset_populator: DatasetPopulator
workflow_populator: WorkflowPopulator
[docs] def test_assign_column_pja(self):
with self.dataset_populator.test_history() as history_id:
self.workflow_populator.run_workflow(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
change_datatype: bed
set_columns:
chromCol: 1
endCol: 2
startCol: 3
""",
test_data="""
input1:
value: 1.bed
type: File
""",
history_id=history_id,
)
details_dataset_new_col = self.dataset_populator.get_history_dataset_details(
history_id, hid=2, wait=True, assert_ok=True
)
assert details_dataset_new_col["history_content_type"] == "dataset", details_dataset_new_col
assert details_dataset_new_col["metadata_endCol"] == 2
assert details_dataset_new_col["metadata_startCol"] == 3
[docs]class TestWorkflowSharingApi(ApiTestCase, SharingApiTests):
api_name = "workflows"
[docs] def create(self, name: str) -> str:
"""Creates a shareable resource with the given name and returns it's ID.
:param name: The name of the shareable resource to create.
:return: The ID of the resource.
"""
workflow = self.workflow_populator.load_workflow(name=name)
data = dict(
workflow=dumps(workflow),
)
route = "workflows"
upload_response = self._post(route, data=data)
self._assert_status_code_is(upload_response, 200)
return upload_response.json()["id"]
[docs] def setUp(self):
super().setUp()
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
# Workflow API TODO:
# - Allow history_id as param to workflow run action. (hist_id)
# - Allow post to workflows/<workflow_id>/run in addition to posting to
# /workflows with id in payload.
# - Much more testing obviously, always more testing.
[docs]class TestWorkflowsApi(BaseWorkflowsApiTestCase, ChangeDatatypeTests):
dataset_populator: DatasetPopulator
[docs] def test_show_valid(self):
workflow_id = self.workflow_populator.simple_workflow("dummy")
workflow_id = self.workflow_populator.simple_workflow("test_regular")
show_response = self._get(f"workflows/{workflow_id}", {"style": "instance"})
workflow = show_response.json()
self._assert_looks_like_instance_workflow_representation(workflow)
assert len(workflow["steps"]) == 3
assert sorted(step["id"] for step in workflow["steps"].values()) == [0, 1, 2]
show_response = self._get(f"workflows/{workflow_id}", {"legacy": True})
workflow = show_response.json()
self._assert_looks_like_instance_workflow_representation(workflow)
assert len(workflow["steps"]) == 3
# Can't reay say what the legacy IDs are but must be greater than 3 because dummy
# workflow was created first in this instance.
assert sorted(step["id"] for step in workflow["steps"].values()) != [0, 1, 2]
[docs] def test_show_invalid_key_is_400(self):
show_response = self._get(f"workflows/{self._random_key()}")
self._assert_status_code_is(show_response, 400)
[docs] def test_cannot_show_private_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_not_importable")
with self._different_user():
show_response = self._get(f"workflows/{workflow_id}")
self._assert_status_code_is(show_response, 403)
# Try as anonymous user
workflows_url = self._api_url(f"workflows/{workflow_id}")
assert get(workflows_url).status_code == 403
[docs] def test_cannot_download_private_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_not_downloadable")
with self._different_user():
with pytest.raises(AssertionError) as excinfo:
self._download_workflow(workflow_id)
assert "403" in str(excinfo.value)
workflows_url = self._api_url(f"workflows/{workflow_id}/download")
assert get(workflows_url).status_code == 403
[docs] def test_anon_can_download_importable_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_downloadable", importable=True)
workflows_url = self._api_url(f"workflows/{workflow_id}/download")
response = get(workflows_url)
response.raise_for_status()
assert response.json()["a_galaxy_workflow"] == "true"
[docs] def test_anon_can_download_public_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_downloadable", publish=True)
workflows_url = self._api_url(f"workflows/{workflow_id}/download")
response = get(workflows_url)
response.raise_for_status()
assert response.json()["a_galaxy_workflow"] == "true"
[docs] def test_anon_can_see_workflow_preview(self):
workflow_id = self.workflow_populator.simple_workflow(name="test_preview", importable=True)
workflows_url = self._api_url(f"workflows/{workflow_id}/download", params={"style": "preview"})
response = get(workflows_url)
response.raise_for_status()
assert response.json()["name"] == "test_preview"
[docs] def test_delete(self):
workflow_id = self.workflow_populator.simple_workflow("test_delete")
workflow_name = "test_delete"
self._assert_user_has_workflow_with_name(workflow_name)
workflow_url = self._api_url(f"workflows/{workflow_id}", use_key=True)
delete_response = delete(workflow_url)
self._assert_status_code_is(delete_response, 204)
# Make sure workflow is no longer in index by default.
assert workflow_name not in self._workflow_names()
[docs] def test_other_cannot_delete(self):
workflow_id = self.workflow_populator.simple_workflow("test_other_delete")
with self._different_user():
workflow_url = self._api_url(f"workflows/{workflow_id}", use_key=True)
delete_response = delete(workflow_url)
self._assert_status_code_is(delete_response, 403)
[docs] def test_undelete(self):
workflow_id = self.workflow_populator.simple_workflow("test_undelete")
workflow_name = "test_undelete"
self._assert_user_has_workflow_with_name(workflow_name)
workflow_delete_url = self._api_url(f"workflows/{workflow_id}", use_key=True)
delete(workflow_delete_url)
workflow_undelete_url = self._api_url(f"workflows/{workflow_id}/undelete", use_key=True)
undelete_response = post(workflow_undelete_url)
self._assert_status_code_is(undelete_response, 204)
assert workflow_name in self._workflow_names()
[docs] def test_other_cannot_undelete(self):
workflow_id = self.workflow_populator.simple_workflow("test_other_undelete")
workflow_delete_url = self._api_url(f"workflows/{workflow_id}", use_key=True)
delete(workflow_delete_url)
with self._different_user():
workflow_undelete_url = self._api_url(f"workflows/{workflow_id}/undelete", use_key=True)
undelete_response = post(workflow_undelete_url)
self._assert_status_code_is(undelete_response, 403)
[docs] def test_index(self):
index_response = self._get("workflows")
self._assert_status_code_is(index_response, 200)
assert isinstance(index_response.json(), list)
[docs] def test_index_deleted(self):
workflow_id = self.workflow_populator.simple_workflow("test_delete")
workflow_index = self._get("workflows").json()
assert [w for w in workflow_index if w["id"] == workflow_id]
workflow_url = self._api_url(f"workflows/{workflow_id}", use_key=True)
delete_response = delete(workflow_url)
self._assert_status_code_is(delete_response, 204)
workflow_index = self._get("workflows").json()
assert not [w for w in workflow_index if w["id"] == workflow_id]
workflow_index = self._get("workflows?show_deleted=true").json()
assert [w for w in workflow_index if w["id"] == workflow_id]
workflow_index = self._get("workflows?show_deleted=false").json()
assert not [w for w in workflow_index if w["id"] == workflow_id]
[docs] def test_index_ordering(self):
# ordered by update_time on the stored workflows with all user's workflows
# before workflows shared with user.
my_workflow_id_1 = self.workflow_populator.simple_workflow("mine_1")
my_workflow_id_2 = self.workflow_populator.simple_workflow("mine_2")
my_email = self.dataset_populator.user_email()
with self._different_user():
their_workflow_id_1 = self.workflow_populator.simple_workflow("theirs_1")
their_workflow_id_2 = self.workflow_populator.simple_workflow("theirs_2")
self.workflow_populator.share_with_user(their_workflow_id_1, my_email)
self.workflow_populator.share_with_user(their_workflow_id_2, my_email)
index_ids = self.workflow_populator.index_ids()
assert index_ids.index(my_workflow_id_1) >= 0
assert index_ids.index(my_workflow_id_2) >= 0
assert index_ids.index(their_workflow_id_1) >= 0
assert index_ids.index(their_workflow_id_2) >= 0
# ordered by update time...
assert index_ids.index(my_workflow_id_2) < index_ids.index(my_workflow_id_1)
assert index_ids.index(their_workflow_id_2) < index_ids.index(their_workflow_id_1)
# my workflows before theirs...
assert index_ids.index(my_workflow_id_1) < index_ids.index(their_workflow_id_1)
assert index_ids.index(my_workflow_id_2) < index_ids.index(their_workflow_id_1)
assert index_ids.index(my_workflow_id_1) < index_ids.index(their_workflow_id_2)
assert index_ids.index(my_workflow_id_2) < index_ids.index(their_workflow_id_2)
actions = [
{"action_type": "update_name", "name": "mine_1(updated)"},
]
refactor_response = self.workflow_populator.refactor_workflow(my_workflow_id_1, actions)
refactor_response.raise_for_status()
index_ids = self.workflow_populator.index_ids()
# after an update to workflow 1, it now comes before workflow 2
assert index_ids.index(my_workflow_id_1) < index_ids.index(my_workflow_id_2)
[docs] def test_index_sort_by(self):
my_workflow_id_y = self.workflow_populator.simple_workflow("y_1")
my_workflow_id_z = self.workflow_populator.simple_workflow("z_2")
index_ids = self.workflow_populator.index_ids()
assert index_ids.index(my_workflow_id_z) < index_ids.index(my_workflow_id_y)
index_ids = self.workflow_populator.index_ids(sort_by="create_time", sort_desc=True)
assert index_ids.index(my_workflow_id_z) < index_ids.index(my_workflow_id_y)
index_ids = self.workflow_populator.index_ids(sort_by="create_time", sort_desc=False)
assert index_ids.index(my_workflow_id_y) < index_ids.index(my_workflow_id_z)
index_ids = self.workflow_populator.index_ids(sort_by="name")
assert index_ids.index(my_workflow_id_y) < index_ids.index(my_workflow_id_z)
index_ids = self.workflow_populator.index_ids(sort_by="name", sort_desc=False)
assert index_ids.index(my_workflow_id_y) < index_ids.index(my_workflow_id_z)
index_ids = self.workflow_populator.index_ids(sort_by="name", sort_desc=True)
assert index_ids.index(my_workflow_id_z) < index_ids.index(my_workflow_id_y)
[docs] def test_index_limit_and_offset(self):
self.workflow_populator.simple_workflow("y_1")
self.workflow_populator.simple_workflow("z_2")
index_ids = self.workflow_populator.index_ids(limit=1)
assert len(index_ids) == 1
index_ids_offset = self.workflow_populator.index_ids(limit=1, offset=1)
assert len(index_ids_offset) == 1
assert index_ids[0] != index_ids_offset[0]
[docs] def test_index_skip_step_counts(self):
self.workflow_populator.simple_workflow("mine_1")
index = self.workflow_populator.index()
index_0 = index[0]
assert "number_of_steps" in index_0
assert index_0["number_of_steps"]
index = self.workflow_populator.index(skip_step_counts=True)
index_0 = index[0]
assert "number_of_steps" not in index_0
[docs] def test_index_search(self):
name1, name2 = self.dataset_populator.get_random_name(), self.dataset_populator.get_random_name()
workflow_id_1 = self.workflow_populator.simple_workflow(name1)
self.workflow_populator.simple_workflow(name2)
index_ids = self.workflow_populator.index_ids(search=name1)
assert len(index_ids) == 1
assert workflow_id_1 in index_ids
[docs] def test_index_search_name(self):
name1, name2 = self.dataset_populator.get_random_name(), self.dataset_populator.get_random_name()
workflow_id_1 = self.workflow_populator.simple_workflow(name1)
self.workflow_populator.simple_workflow(name2)
self.workflow_populator.set_tags(workflow_id_1, [name2])
index_ids = self.workflow_populator.index_ids(search=name2)
# one found by tag and one found by name...
assert len(index_ids) == 2
assert workflow_id_1 in index_ids
index_ids = self.workflow_populator.index_ids(search=f"name:{name2}")
assert len(index_ids) == 1
assert workflow_id_1 not in index_ids
[docs] def test_index_search_name_exact_vs_inexact(self):
name_prefix = self.dataset_populator.get_random_name()
workflow_id_1 = self.workflow_populator.simple_workflow(name_prefix)
longer_name = f"{name_prefix}_some_stuff_on_it"
workflow_id_2 = self.workflow_populator.simple_workflow(longer_name)
index_ids = self.workflow_populator.index_ids(search=f"name:{name_prefix}")
assert len(index_ids) == 2
assert workflow_id_1 in index_ids
assert workflow_id_2 in index_ids
# quoting it will ensure the name matches exactly.
index_ids = self.workflow_populator.index_ids(search=f"name:'{name_prefix}'")
assert len(index_ids) == 1
assert workflow_id_1 in index_ids
assert workflow_id_2 not in index_ids
[docs] def test_index_search_tags(self):
name1, name2 = self.dataset_populator.get_random_name(), self.dataset_populator.get_random_name()
workflow_id_1 = self.workflow_populator.simple_workflow(name1)
self.workflow_populator.simple_workflow(name2)
moocowtag = f"moocowatag {uuid4()}"
index_ids = self.workflow_populator.index_ids(search=moocowtag)
assert len(index_ids) == 0
self.workflow_populator.set_tags(workflow_id_1, [moocowtag, f"another{moocowtag}"])
index_ids = self.workflow_populator.index_ids(search=moocowtag)
assert workflow_id_1 in index_ids
index_ids = self.workflow_populator.index_ids(search=f"tag:{moocowtag}")
assert workflow_id_1 in index_ids
[docs] def test_index_search_tags_multiple(self):
name1 = self.dataset_populator.get_random_name()
name2 = self.dataset_populator.get_random_name()
name3 = self.dataset_populator.get_random_name()
workflow_id_1 = self.workflow_populator.simple_workflow(name1)
workflow_id_2 = self.workflow_populator.simple_workflow(name2)
workflow_id_3 = self.workflow_populator.simple_workflow(name3)
self.workflow_populator.set_tags(workflow_id_1, ["multipletagfilter1", "multipletagfilter2", "decoy1"])
self.workflow_populator.set_tags(workflow_id_2, ["multipletagfilter1", "decoy2"])
self.workflow_populator.set_tags(workflow_id_3, ["multipletagfilter2", "decoy3"])
for search in ["multipletagfilter1", "tag:ipletagfilter1", "tag:'multipletagfilter1'"]:
index_ids = self.workflow_populator.index_ids(search=search)
assert workflow_id_1 in index_ids
assert workflow_id_2 in index_ids
assert workflow_id_3 not in index_ids
for search in ["multipletagfilter2", "tag:ipletagfilter2", "tag:'multipletagfilter2'"]:
index_ids = self.workflow_populator.index_ids(search=search)
assert workflow_id_1 in index_ids
assert workflow_id_2 not in index_ids
assert workflow_id_3 in index_ids
for search in [
"multipletagfilter2 multipletagfilter1",
"tag:filter2 tag:tagfilter1",
"tag:'multipletagfilter2' tag:'multipletagfilter1'",
]:
index_ids = self.workflow_populator.index_ids(search=search)
assert workflow_id_1 in index_ids
assert workflow_id_2 not in index_ids
assert workflow_id_3 not in index_ids
[docs] def test_search_casing(self):
name1, name2 = (
self.dataset_populator.get_random_name().upper(),
self.dataset_populator.get_random_name().upper(),
)
workflow_id_1 = self.workflow_populator.simple_workflow(name1)
self.workflow_populator.simple_workflow(name2)
searchcasingtag = f"searchcasingtag{uuid4()}"
self.workflow_populator.set_tags(workflow_id_1, [searchcasingtag, f"another{searchcasingtag}"])
index_ids = self.workflow_populator.index_ids(search=name1.lower())
assert len(index_ids) == 1
assert workflow_id_1 in index_ids
index_ids = self.workflow_populator.index_ids(search=searchcasingtag.upper())
assert len(index_ids) == 1
assert workflow_id_1 in index_ids
[docs] def test_index_search_tags_exact(self):
name1, name2 = self.dataset_populator.get_random_name(), self.dataset_populator.get_random_name()
workflow_id_1 = self.workflow_populator.simple_workflow(name1)
workflow_id_2 = self.workflow_populator.simple_workflow(name2)
exact_tag_to_search = f"exacttagtosearch{uuid4()}"
index_ids = self.workflow_populator.index_ids(search=exact_tag_to_search)
assert len(index_ids) == 0
self.workflow_populator.set_tags(workflow_id_1, [exact_tag_to_search])
self.workflow_populator.set_tags(workflow_id_2, [f"{exact_tag_to_search}longer"])
index_ids = self.workflow_populator.index_ids(search=exact_tag_to_search)
assert workflow_id_1 in index_ids
assert workflow_id_2 in index_ids
index_ids = self.workflow_populator.index_ids(search=f"tag:{exact_tag_to_search}")
assert workflow_id_1 in index_ids
assert workflow_id_2 in index_ids
index_ids = self.workflow_populator.index_ids(search=f"tag:'{exact_tag_to_search}'")
assert workflow_id_1 in index_ids
assert workflow_id_2 not in index_ids
[docs] def test_index_published(self):
# published workflows are also the default of what is displayed for anonymous API requests
# this is tested in test_anonymous_published.
uuid = str(uuid4())
workflow_name = f"test_pubished_anon_{uuid}"
with self._different_user():
workflow_id = self.workflow_populator.simple_workflow(workflow_name, publish=True)
assert workflow_id not in self.workflow_populator.index_ids()
assert workflow_id in self.workflow_populator.index_ids(show_published=True)
assert workflow_id not in self.workflow_populator.index_ids(show_published=False)
[docs] def test_index_search_is_tags(self):
my_workflow_id_1 = self.workflow_populator.simple_workflow("sitags_m_1")
my_email = self.dataset_populator.user_email()
with self._different_user():
their_workflow_id_1 = self.workflow_populator.simple_workflow("sitags_shwm_1")
self.workflow_populator.share_with_user(their_workflow_id_1, my_email)
published_workflow_id_1 = self.workflow_populator.simple_workflow("sitags_p_1", publish=True)
index_ids = self.workflow_populator.index_ids(search="is:published", show_published=True)
assert published_workflow_id_1 in index_ids
assert their_workflow_id_1 not in index_ids
assert my_workflow_id_1 not in index_ids
index_ids = self.workflow_populator.index_ids(search="is:shared_with_me")
assert published_workflow_id_1 not in index_ids
assert their_workflow_id_1 in index_ids
assert my_workflow_id_1 not in index_ids
[docs] def test_index_owner(self):
my_workflow_id_1 = self.workflow_populator.simple_workflow("ownertags_m_1")
email_1 = f"{uuid4()}@test.com"
with self._different_user(email=email_1):
published_workflow_id_1 = self.workflow_populator.simple_workflow("ownertags_p_1", publish=True)
owner_1 = self._show_workflow(published_workflow_id_1)["owner"]
email_2 = f"{uuid4()}@test.com"
with self._different_user(email=email_2):
published_workflow_id_2 = self.workflow_populator.simple_workflow("ownertags_p_2", publish=True)
index_ids = self.workflow_populator.index_ids(search="is:published", show_published=True)
assert published_workflow_id_1 in index_ids
assert published_workflow_id_2 in index_ids
assert my_workflow_id_1 not in index_ids
index_ids = self.workflow_populator.index_ids(search=f"is:published u:{owner_1}", show_published=True)
assert published_workflow_id_1 in index_ids
assert published_workflow_id_2 not in index_ids
assert my_workflow_id_1 not in index_ids
index_ids = self.workflow_populator.index_ids(search=f"is:published u:'{owner_1}'", show_published=True)
assert published_workflow_id_1 in index_ids
assert published_workflow_id_2 not in index_ids
assert my_workflow_id_1 not in index_ids
index_ids = self.workflow_populator.index_ids(search=f"is:published {owner_1}", show_published=True)
assert published_workflow_id_1 in index_ids
assert published_workflow_id_2 not in index_ids
assert my_workflow_id_1 not in index_ids
[docs] def test_index_parameter_invalid_combinations(self):
# these can all be called by themselves and return 200...
response = self._get("workflows?show_hidden=true")
self._assert_status_code_is(response, 200)
response = self._get("workflows?show_deleted=true")
self._assert_status_code_is(response, 200)
response = self._get("workflows?show_shared=true")
self._assert_status_code_is(response, 200)
# but showing shared workflows along with deleted or hidden results in an error
response = self._get("workflows?show_hidden=true&show_shared=true")
self._assert_status_code_is(response, 400)
self._assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_INVALID_PARAMETER"])
response = self._get("workflows?show_deleted=true&show_shared=true")
self._assert_status_code_is(response, 400)
self._assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_INVALID_PARAMETER"])
[docs] def test_index_total_matches(self):
with self._different_user("isolated.wf.user@test.email"):
my_workflow_id = self.workflow_populator.simple_workflow("mine_1")
self.workflow_populator.simple_workflow("mine_2")
my_email = self.dataset_populator.user_email()
with self._different_user():
their_shared_workflow_id = self.workflow_populator.simple_workflow("theirs_1")
self.workflow_populator.share_with_user(their_shared_workflow_id, my_email)
their_workflow_to_import_id = self.workflow_populator.simple_workflow("theirs_2", publish=True)
self.workflow_populator.set_tags(their_workflow_to_import_id, ["theirs_2", "test"])
import_response = self.__import_workflow(their_workflow_to_import_id)
self._assert_status_code_is(import_response, 200)
imported_wf_id = import_response.json()["id"]
# add tags to my workflows
self.workflow_populator.set_tags(my_workflow_id, ["mine_1", "test"])
self.workflow_populator.set_tags(imported_wf_id, ["imported", "test"])
# We should have 4 workflows now (2 mine, 1 shared with me, 1 imported)
expected_number_of_workflows = 4
workflows_response = self._get("workflows")
self._assert_status_code_is(workflows_response, 200)
assert workflows_response.headers["Total_matches"] == f"{expected_number_of_workflows}"
workflows = workflows_response.json()
assert len(workflows) == expected_number_of_workflows
[docs] def test_import_tools_requires_admin(self):
response = self.__test_upload(import_tools=True, assert_ok=False)
assert response.status_code == 403
def __test_upload(
self, use_deprecated_route=False, name="test_import", workflow=None, assert_ok=True, import_tools=False
):
if workflow is None:
workflow = self.workflow_populator.load_workflow(name=name)
data = dict(
workflow=dumps(workflow),
)
if import_tools:
data["import_tools"] = import_tools
if use_deprecated_route:
route = "workflows/upload"
else:
route = "workflows"
upload_response = self._post(route, data=data)
if assert_ok:
self._assert_status_code_is(upload_response, 200)
self._assert_user_has_workflow_with_name(name)
return upload_response
[docs] def test_update(self):
original_workflow = self.workflow_populator.load_workflow(name="test_import")
uuids = {}
labels = {}
for order_index, step_dict in original_workflow["steps"].items():
uuid = str(uuid4())
step_dict["uuid"] = uuid
uuids[order_index] = uuid
label = f"label_{order_index}"
step_dict["label"] = label
labels[order_index] = label
def check_label_and_uuid(order_index, step_dict):
assert order_index in uuids
assert order_index in labels
assert uuids[order_index] == step_dict["uuid"]
assert labels[order_index] == step_dict["label"]
upload_response = self.__test_upload(workflow=original_workflow)
workflow_id = upload_response.json()["id"]
def update(workflow_object):
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is(put_response, 200)
return put_response
workflow_content = self._download_workflow(workflow_id)
steps = workflow_content["steps"]
def tweak_step(step):
order_index, step_dict = step
check_label_and_uuid(order_index, step_dict)
assert step_dict["position"]["top"] != 1
assert step_dict["position"]["left"] != 1
step_dict["position"] = {"top": 1, "left": 1}
map(tweak_step, steps.items())
update(workflow_content)
def check_step(step):
order_index, step_dict = step
check_label_and_uuid(order_index, step_dict)
assert step_dict["position"]["top"] == 1
assert step_dict["position"]["left"] == 1
updated_workflow_content = self._download_workflow(workflow_id)
map(check_step, updated_workflow_content["steps"].items())
# Re-update against original workflow...
update(original_workflow)
updated_workflow_content = self._download_workflow(workflow_id)
# Make sure the positions have been updated.
map(tweak_step, updated_workflow_content["steps"].items())
[docs] def test_update_tags(self):
workflow_object = self.workflow_populator.load_workflow(name="test_import")
workflow_id = self.__test_upload(workflow=workflow_object).json()["id"]
update_payload = {}
update_payload["tags"] = ["a_tag", "b_tag"]
update_response = self._update_workflow(workflow_id, update_payload).json()
assert update_response["tags"] == ["a_tag", "b_tag"]
del update_payload["tags"]
update_response = self._update_workflow(workflow_id, update_payload).json()
assert update_response["tags"] == ["a_tag", "b_tag"]
update_payload["tags"] = []
update_response = self._update_workflow(workflow_id, update_payload).json()
assert update_response["tags"] == []
[docs] def test_update_name(self):
original_name = "test update name"
workflow_object = self.workflow_populator.load_workflow(name=original_name)
workflow_object["license"] = "AAL"
upload_response = self.__test_upload(workflow=workflow_object, name=original_name)
workflow = upload_response.json()
workflow_id = workflow["id"]
assert workflow["name"] == original_name
workflow_dict = self.workflow_populator.download_workflow(workflow_id)
assert workflow_dict["license"] == "AAL"
data = {"name": "my cool new name"}
update_response = self._update_workflow(workflow["id"], data).json()
assert update_response["name"] == "my cool new name"
workflow_dict = self.workflow_populator.download_workflow(workflow_id)
assert workflow_dict["license"] == "AAL"
[docs] def test_update_name_empty(self):
# Update doesn't allow empty names.
# Load a workflow with a given name.
original_name = "test update name"
workflow_object = self.workflow_populator.load_workflow(name=original_name)
upload_response = self.__test_upload(workflow=workflow_object, name=original_name)
workflow = upload_response.json()
assert workflow["name"] == original_name
# Try to update the name to an empty string (also change steps to force an update).
data = {"name": "", "steps": {}}
update_response = self._update_workflow(workflow["id"], data)
assert update_response.json()["err_msg"] == "Workflow must have a valid name"
self._assert_status_code_is(update_response, 400)
workflow_dict = self.workflow_populator.download_workflow(workflow["id"])
assert workflow_dict["name"] == original_name
[docs] def test_refactor(self):
workflow_id = self.workflow_populator.upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat
in:
input1: test_input
"""
)
actions = [
{"action_type": "update_step_label", "step": {"order_index": 0}, "label": "new_label"},
]
# perform refactoring as dry run
refactor_response = self.workflow_populator.refactor_workflow(workflow_id, actions, dry_run=True)
refactor_response.raise_for_status()
assert refactor_response.json()["workflow"]["steps"]["0"]["label"] == "new_label"
# perform refactoring as dry run but specify editor style response
refactor_response = self.workflow_populator.refactor_workflow(
workflow_id, actions, dry_run=True, style="editor"
)
refactor_response.raise_for_status()
assert refactor_response.json()["workflow"]["steps"]["0"]["label"] == "new_label"
# download the original workflow and make sure the dry run didn't modify that label
workflow_dict = self.workflow_populator.download_workflow(workflow_id)
assert workflow_dict["steps"]["0"]["label"] == "test_input"
refactor_response = self.workflow_populator.refactor_workflow(workflow_id, actions)
refactor_response.raise_for_status()
assert refactor_response.json()["workflow"]["steps"]["0"]["label"] == "new_label"
# this time dry_run was default of False, so the label is indeed changed
workflow_dict = self.workflow_populator.download_workflow(workflow_id)
assert workflow_dict["steps"]["0"]["label"] == "new_label"
[docs] def test_update_no_tool_id(self):
workflow_object = self.workflow_populator.load_workflow(name="test_import")
upload_response = self.__test_upload(workflow=workflow_object)
workflow_id = upload_response.json()["id"]
del workflow_object["steps"]["2"]["tool_id"]
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is(put_response, 400)
[docs] def test_update_missing_tool(self):
# Create allows missing tools, update doesn't currently...
workflow_object = self.workflow_populator.load_workflow(name="test_import")
upload_response = self.__test_upload(workflow=workflow_object)
workflow_id = upload_response.json()["id"]
workflow_object["steps"]["2"]["tool_id"] = "cat-not-found"
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is(put_response, 400)
[docs] def test_require_unique_step_uuids(self):
workflow_dup_uuids = self.workflow_populator.load_workflow(name="test_import")
uuid0 = str(uuid4())
for step_dict in workflow_dup_uuids["steps"].values():
step_dict["uuid"] = uuid0
response = self.workflow_populator.create_workflow_response(workflow_dup_uuids)
self._assert_status_code_is(response, 400)
[docs] def test_require_unique_step_labels(self):
workflow_dup_label = self.workflow_populator.load_workflow(name="test_import")
for step_dict in workflow_dup_label["steps"].values():
step_dict["label"] = "my duplicated label"
response = self.workflow_populator.create_workflow_response(workflow_dup_label)
self._assert_status_code_is(response, 400)
[docs] def test_import_deprecated(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_published_deprecated", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
self._assert_user_has_workflow_with_name("imported: test_import_published_deprecated")
[docs] def test_import_export_dynamic(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
- type: input
label: input1
- tool_id: cat1
label: first_cat
state:
input1:
$link: 0
- label: embed1
run:
class: GalaxyTool
command: echo 'hello world 2' > $output1
outputs:
output1:
format: txt
- tool_id: cat1
state:
input1:
$link: first_cat/out_file1
queries:
input2:
$link: embed1/output1
test_data:
input1: "hello world"
"""
)
downloaded_workflow = self._download_workflow(workflow_id)
# The _upload_yaml_workflow entry point uses an admin key, but if we try to
# do the raw re-import as a regular user we expect a 403 error.
response = self.workflow_populator.create_workflow_response(downloaded_workflow)
self._assert_status_code_is(response, 403)
[docs] def test_import_annotations(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_annotations", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
# Test annotations preserved during upload and copied over during
# import.
other_id = other_import_response.json()["id"]
imported_workflow = self._show_workflow(other_id)
assert imported_workflow["annotation"] == "simple workflow"
step_annotations = {step["annotation"] for step in imported_workflow["steps"].values()}
assert "input1 description" in step_annotations
[docs] def test_import_subworkflows(self):
def get_subworkflow_content_id(workflow_id):
workflow_contents = self._download_workflow(workflow_id, style="editor")
steps = workflow_contents["steps"]
subworkflow_step = next(s for s in steps.values() if s["type"] == "subworkflow")
return subworkflow_step["content_id"]
workflow_id = self._upload_yaml_workflow(WORKFLOW_NESTED_SIMPLE, publish=True)
subworkflow_content_id = get_subworkflow_content_id(workflow_id)
instance_response = self._get(f"workflows/{subworkflow_content_id}?instance=true")
self._assert_status_code_is(instance_response, 200)
subworkflow = instance_response.json()
assert subworkflow["inputs"]["0"]["label"] == "inner_input"
assert subworkflow["name"] == "Workflow"
assert subworkflow["hidden"]
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
imported_workflow_id = other_import_response.json()["id"]
imported_subworkflow_content_id = get_subworkflow_content_id(imported_workflow_id)
assert subworkflow_content_id != imported_subworkflow_content_id
[docs] def test_subworkflow_inputs_optional_editor(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
- id: inner_input
optional: true
outputs:
- outputSource: inner_input/output
steps: []
"""
)
workflow_contents = self._download_workflow(workflow_id, style="editor")
assert workflow_contents["steps"]["0"]["inputs"][0]["optional"]
[docs] def test_not_importable_prevents_import(self):
workflow_id = self.workflow_populator.simple_workflow("test_not_importportable")
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 403)
[docs] def test_url_import(self):
url = "https://raw.githubusercontent.com/galaxyproject/galaxy/release_19.09/test/base/data/test_workflow_1.ga"
workflow_id = self._post("workflows", data={"archive_source": url}).json()["id"]
workflow = self._download_workflow(workflow_id)
assert "TestWorkflow1" in workflow["name"]
assert (
workflow.get("source_metadata").get("url") == url
) # disappearance of source_metadata on modification is tested in test_trs_import
[docs] def test_base64_import(self):
base64_url = "base64://" + base64.b64encode(workflow_str.encode("utf-8")).decode("utf-8")
response = self._post("workflows", data={"archive_source": base64_url})
response.raise_for_status()
workflow_id = response.json()["id"]
workflow = self._download_workflow(workflow_id)
assert "TestWorkflow1" in workflow["name"]
[docs] def test_trs_import(self):
trs_payload = {
"archive_source": "trs_tool",
"trs_server": "dockstore",
"trs_tool_id": "#workflow/github.com/jmchilton/galaxy-workflow-dockstore-example-1/mycoolworkflow",
"trs_version_id": "master",
}
workflow_id = self._post("workflows", data=trs_payload).json()["id"]
original_workflow = self._download_workflow(workflow_id)
assert "Test Workflow" in original_workflow["name"]
assert original_workflow.get("source_metadata").get("trs_tool_id") == trs_payload["trs_tool_id"]
assert original_workflow.get("source_metadata").get("trs_version_id") == trs_payload["trs_version_id"]
assert original_workflow.get("source_metadata").get("trs_server") == "dockstore"
# refactor workflow and check that the trs id is removed
actions = [
{"action_type": "update_step_label", "step": {"order_index": 0}, "label": "new_label"},
]
self.workflow_populator.refactor_workflow(workflow_id, actions)
refactored_workflow = self._download_workflow(workflow_id)
assert refactored_workflow.get("source_metadata") is None
# reupload original_workflow and check that the trs id is removed
reuploaded_workflow_id = self.workflow_populator.create_workflow(original_workflow)
reuploaded_workflow = self._download_workflow(reuploaded_workflow_id)
assert reuploaded_workflow.get("source_metadata") is None
[docs] def test_trs_import_from_dockstore_trs_url(self):
trs_payload = {
"archive_source": "trs_tool",
"trs_url": "https://dockstore.org/api/ga4gh/trs/v2/tools/"
"%23workflow%2Fgithub.com%2Fjmchilton%2Fgalaxy-workflow-dockstore-example-1%2Fmycoolworkflow/"
"versions/master",
}
workflow_id = self._post("workflows", data=trs_payload).json()["id"]
original_workflow = self._download_workflow(workflow_id)
assert "Test Workflow" in original_workflow["name"]
assert (
original_workflow.get("source_metadata").get("trs_tool_id")
== "#workflow/github.com/jmchilton/galaxy-workflow-dockstore-example-1/mycoolworkflow"
)
assert original_workflow.get("source_metadata").get("trs_version_id") == "master"
assert original_workflow.get("source_metadata").get("trs_server") == ""
assert original_workflow.get("source_metadata").get("trs_url") == (
"https://dockstore.org/api/ga4gh/trs/v2/tools/"
"%23workflow%2Fgithub.com%2Fjmchilton%2Fgalaxy-workflow-dockstore-example-1%2Fmycoolworkflow/"
"versions/master"
)
# refactor workflow and check that the trs id is removed
actions = [
{"action_type": "update_step_label", "step": {"order_index": 0}, "label": "new_label"},
]
self.workflow_populator.refactor_workflow(workflow_id, actions)
refactored_workflow = self._download_workflow(workflow_id)
assert refactored_workflow.get("source_metadata") is None
# reupload original_workflow and check that the trs id is removed
reuploaded_workflow_id = self.workflow_populator.create_workflow(original_workflow)
reuploaded_workflow = self._download_workflow(reuploaded_workflow_id)
assert reuploaded_workflow.get("source_metadata") is None
[docs] def test_trs_import_from_workflowhub_trs_url(self):
trs_payload = {
"archive_source": "trs_tool",
"trs_url": "https://workflowhub.eu/ga4gh/trs/v2/tools/109/versions/5",
}
workflow_id = self._post("workflows", data=trs_payload).json()["id"]
original_workflow = self._download_workflow(workflow_id)
assert "COVID-19: variation analysis reporting" in original_workflow["name"]
assert original_workflow.get("source_metadata").get("trs_tool_id") == "109"
assert original_workflow.get("source_metadata").get("trs_version_id") == "5"
assert original_workflow.get("source_metadata").get("trs_server") == ""
assert (
original_workflow.get("source_metadata").get("trs_url")
== "https://workflowhub.eu/ga4gh/trs/v2/tools/109/versions/5"
)
# refactor workflow and check that the trs id is removed
actions = [
{"action_type": "update_step_label", "step": {"order_index": 0}, "label": "new_label"},
]
self.workflow_populator.refactor_workflow(workflow_id, actions)
refactored_workflow = self._download_workflow(workflow_id)
assert refactored_workflow.get("source_metadata") is None
# reupload original_workflow and check that the trs id is removed
reuploaded_workflow_id = self.workflow_populator.create_workflow(original_workflow)
reuploaded_workflow = self._download_workflow(reuploaded_workflow_id)
assert reuploaded_workflow.get("source_metadata") is None
[docs] def test_anonymous_published(self):
def anonymous_published_workflows(explicit_query_parameter):
if explicit_query_parameter:
index_url = "workflows?show_published=True"
else:
index_url = "workflows"
workflows_url = self._api_url(index_url)
response = get(workflows_url)
response.raise_for_status()
return response.json()
workflow_name = f"test published example {uuid4()}"
names = [w["name"] for w in anonymous_published_workflows(True)]
assert workflow_name not in names
workflow_id = self.workflow_populator.simple_workflow(workflow_name, publish=True)
for explicit_query_parameter in [True, False]:
workflow_index = anonymous_published_workflows(explicit_query_parameter)
names = [w["name"] for w in workflow_index]
assert workflow_name in names
ids = [w["id"] for w in workflow_index]
assert workflow_id in ids
[docs] def test_import_published(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_published", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id, deprecated_route=True)
self._assert_status_code_is(other_import_response, 200)
self._assert_user_has_workflow_with_name("imported: test_import_published")
[docs] def test_import_published_api(self):
workflow_id = self.workflow_populator.simple_workflow("test_import_published", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id, deprecated_route=False)
self._assert_status_code_is(other_import_response, 200)
workflow = self._download_workflow(other_import_response.json()["id"])
assert workflow["steps"]["2"]["tool_version"] == "1.0.0"
[docs] def test_export(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export")
downloaded_workflow = self._download_workflow(uploaded_workflow_id)
assert downloaded_workflow["name"] == "test_for_export"
steps = downloaded_workflow["steps"]
assert len(steps) == 3
assert "0" in steps
first_step = steps["0"]
self._assert_has_keys(first_step, "inputs", "outputs")
inputs = first_step["inputs"]
assert len(inputs) > 0, first_step
first_input = inputs[0]
assert first_input["name"] == "WorkflowInput1"
assert first_input["description"] == "input1 description"
self._assert_has_keys(downloaded_workflow, "a_galaxy_workflow", "format-version", "annotation", "uuid", "steps")
for step in downloaded_workflow["steps"].values():
self._assert_has_keys(
step,
"id",
"type",
"tool_id",
"tool_version",
"name",
"tool_state",
"annotation",
"inputs",
"workflow_outputs",
"outputs",
)
if step["type"] == "tool":
self._assert_has_keys(step, "post_job_actions")
[docs] def test_export_format2(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export_format2")
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="format2")
assert downloaded_workflow["class"] == "GalaxyWorkflow"
[docs] def test_export_editor(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export")
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="editor")
self._assert_has_keys(downloaded_workflow, "name", "steps", "upgrade_messages")
for step in downloaded_workflow["steps"].values():
self._assert_has_keys(
step,
"id",
"type",
"content_id",
"name",
"tool_state",
"tooltip",
"inputs",
"outputs",
"config_form",
"annotation",
"post_job_actions",
"workflow_outputs",
"uuid",
"label",
)
[docs] @skip_without_tool("output_filter_with_input")
def test_export_editor_filtered_outputs(self):
template = """
class: GalaxyWorkflow
steps:
- tool_id: output_filter_with_input
state:
produce_out_1: {produce_out_1}
filter_text_1: {filter_text_1}
produce_collection: false
produce_paired_collection: false
"""
workflow_id = self._upload_yaml_workflow(template.format(produce_out_1="false", filter_text_1="false"))
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow["steps"]["0"]["outputs"]
assert len(outputs) == 1
assert outputs[0]["name"] == "out_3"
workflow_id = self._upload_yaml_workflow(template.format(produce_out_1="true", filter_text_1="false"))
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow["steps"]["0"]["outputs"]
assert len(outputs) == 2
assert outputs[0]["name"] == "out_1"
assert outputs[1]["name"] == "out_3"
workflow_id = self._upload_yaml_workflow(template.format(produce_out_1="true", filter_text_1="foo"))
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow["steps"]["0"]["outputs"]
assert len(outputs) == 3
assert outputs[0]["name"] == "out_1"
assert outputs[1]["name"] == "out_2"
assert outputs[2]["name"] == "out_3"
[docs] @skip_without_tool("output_filter_exception_1")
def test_export_editor_filtered_outputs_exception_handling(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
- tool_id: output_filter_exception_1
"""
)
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
outputs = downloaded_workflow["steps"]["0"]["outputs"]
assert len(outputs) == 2
assert outputs[0]["name"] == "out_1"
assert outputs[1]["name"] == "out_2"
[docs] @skip_without_tool("collection_type_source")
def test_export_editor_collection_type_source(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
- id: text_input1
type: collection
collection_type: "list:paired"
steps:
- tool_id: collection_type_source
in:
input_collect: text_input1
"""
)
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
steps = downloaded_workflow["steps"]
assert len(steps) == 2
# Non-subworkflow collection_type_source tools will be handled by the client,
# so collection_type should be None here.
assert steps["1"]["outputs"][0]["collection_type"] is None
[docs] @skip_without_tool("collection_type_source")
def test_export_editor_subworkflow_collection_type_source(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
outer_input: data
steps:
inner_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input:
type: collection
collection_type: "list:paired"
outputs:
workflow_output:
outputSource: collection_type_source/list_output
steps:
collection_type_source:
tool_id: collection_type_source
in:
input_collect: inner_input
in:
inner_input: outer_input
"""
)
downloaded_workflow = self._download_workflow(workflow_id, style="editor")
steps = downloaded_workflow["steps"]
assert len(steps) == 2
assert steps["1"]["type"] == "subworkflow"
assert steps["1"]["outputs"][0]["collection_type"] == "list:paired"
[docs] def test_import_missing_tool(self):
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_missing_tool")
workflow_id = self.workflow_populator.create_workflow(workflow)
workflow_description = self._show_workflow(workflow_id)
steps = workflow_description["steps"]
missing_tool_steps = [v for v in steps.values() if v["tool_id"] == "cat_missing_tool"]
assert len(missing_tool_steps) == 1
[docs] def test_import_no_tool_id(self):
# Import works with missing tools, but not with absent content/tool id.
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_missing_tool")
del workflow["steps"]["2"]["tool_id"]
create_response = self.__test_upload(workflow=workflow, assert_ok=False)
self._assert_status_code_is(create_response, 400)
[docs] def test_import_export_with_runtime_inputs(self):
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_with_runtime_input")
workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(workflow_id)
assert len(downloaded_workflow["steps"]) == 2
runtime_step = downloaded_workflow["steps"]["1"]
for runtime_input in runtime_step["inputs"]:
if runtime_input["name"] == "num_lines":
break
assert runtime_input["description"].startswith("runtime parameter for tool")
tool_state = json.loads(runtime_step["tool_state"])
assert "num_lines" in tool_state
self._assert_is_runtime_input(tool_state["num_lines"])
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_index(self):
self.__run_cat_workflow(inputs_by="step_index")
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_uuid(self):
self.__run_cat_workflow(inputs_by="step_uuid")
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_uuid_implicitly(self):
self.__run_cat_workflow(inputs_by="uuid_implicitly")
[docs] @skip_without_tool("cat1")
def test_run_workflow_by_name(self):
self.__run_cat_workflow(inputs_by="name")
[docs] @skip_without_tool("cat1")
def test_run_workflow(self):
self.__run_cat_workflow(inputs_by="step_id")
[docs] @skip_without_tool("multiple_versions")
def test_run_versioned_tools(self):
with self.dataset_populator.test_history() as history_01_id:
workflow_version_01 = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
multiple:
tool_id: multiple_versions
tool_version: "0.1"
state:
inttest: 0
"""
)
self.workflow_populator.invoke_workflow_and_wait(workflow_version_01, history_id=history_01_id)
with self.dataset_populator.test_history() as history_02_id:
workflow_version_02 = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
multiple:
tool_id: multiple_versions
tool_version: "0.2"
state:
inttest: 1
"""
)
self.workflow_populator.invoke_workflow_and_wait(workflow_version_02, history_id=history_02_id)
def __run_cat_workflow(self, inputs_by):
workflow = self.workflow_populator.load_workflow(name="test_for_run")
workflow["steps"]["0"]["uuid"] = str(uuid4())
workflow["steps"]["1"]["uuid"] = str(uuid4())
workflow_request, _, workflow_id = self._setup_workflow_run(workflow, inputs_by=inputs_by)
invocation_id = self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request).json()[
"id"
]
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation["state"] == "scheduled", invocation
[docs] @skip_without_tool("collection_creates_pair")
def test_workflow_run_output_collections(self) -> None:
with self.dataset_populator.test_history() as history_id:
self._run_workflow(WORKFLOW_WITH_OUTPUT_COLLECTION, history_id=history_id)
assert "a\nc\nb\nd\n" == self.dataset_populator.get_history_dataset_content(history_id, hid=0)
[docs] @skip_without_tool("job_properties")
@skip_without_tool("identifier_multiple_in_conditional")
def test_workflow_resume_from_failed_step(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
job_props:
tool_id: job_properties
state:
thebool: true
failbool: true
identifier:
tool_id: identifier_multiple_in_conditional
state:
outer_cond:
cond_param_outer: true
inner_cond:
cond_param_inner: true
input1:
$link: 0/out_file1
thedata: null
cat:
tool_id: cat1
in:
input1: identifier/output1
queries_0|input2: identifier/output1
"""
)
with self.dataset_populator.test_history() as history_id:
invocation_response = self.workflow_populator.invoke_workflow(workflow_id, history_id=history_id)
invocation_id = invocation_response.json()["id"]
self.workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id, assert_ok=False)
failed_dataset_one = self.dataset_populator.get_history_dataset_details(
history_id, hid=1, wait=True, assert_ok=False
)
assert failed_dataset_one["state"] == "error", failed_dataset_one
paused_dataset = self.dataset_populator.get_history_dataset_details(
history_id, hid=5, wait=True, assert_ok=False
)
assert paused_dataset["state"] == "paused", paused_dataset
inputs = {"thebool": "false", "failbool": "false", "rerun_remap_job_id": failed_dataset_one["creating_job"]}
self.dataset_populator.run_tool(
tool_id="job_properties",
inputs=inputs,
history_id=history_id,
)
unpaused_dataset_1 = self.dataset_populator.get_history_dataset_details(
history_id, hid=5, wait=True, assert_ok=False
)
assert unpaused_dataset_1["state"] == "ok"
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
unpaused_dataset_2 = self.dataset_populator.get_history_dataset_details(
history_id, hid=6, wait=True, assert_ok=False
)
assert unpaused_dataset_2["state"] == "ok"
[docs] @skip_without_tool("job_properties")
@skip_without_tool("collection_creates_list")
def test_workflow_resume_from_failed_step_with_hdca_input(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
job_props:
tool_id: job_properties
state:
thebool: true
failbool: true
list_in_list_out:
tool_id: collection_creates_list
in:
input1: job_props/list_output
identifier:
tool_id: identifier_collection
in:
input1: list_in_list_out/list_output
"""
)
with self.dataset_populator.test_history() as history_id:
invocation_id = self.__invoke_workflow(workflow_id, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id, workflow_id, invocation_id, assert_ok=False
)
failed_dataset_one = self.dataset_populator.get_history_dataset_details(
history_id, hid=1, wait=True, assert_ok=False
)
assert failed_dataset_one["state"] == "error", failed_dataset_one
paused_colletion = self.dataset_populator.get_history_collection_details(
history_id, hid=7, wait=True, assert_ok=False
)
first_paused_element = paused_colletion["elements"][0]["object"]
assert first_paused_element["state"] == "paused", first_paused_element
dependent_dataset = self.dataset_populator.get_history_dataset_details(
history_id, hid=8, wait=True, assert_ok=False
)
assert dependent_dataset["state"] == "paused"
inputs = {"thebool": "false", "failbool": "false", "rerun_remap_job_id": failed_dataset_one["creating_job"]}
self.dataset_populator.run_tool(
tool_id="job_properties",
inputs=inputs,
history_id=history_id,
)
paused_colletion = self.dataset_populator.get_history_collection_details(
history_id, hid=7, wait=True, assert_ok=False
)
first_paused_element = paused_colletion["elements"][0]["object"]
assert first_paused_element["state"] == "ok"
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
dependent_dataset = self.dataset_populator.get_history_dataset_details(
history_id, hid=8, wait=True, assert_ok=False
)
assert dependent_dataset["name"].startswith("identifier_collection")
assert dependent_dataset["state"] == "ok"
[docs] @skip_without_tool("fail_identifier")
@skip_without_tool("identifier_collection")
def test_workflow_resume_with_mapped_over_input(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_datasets: collection
steps:
fail_identifier_1:
tool_id: fail_identifier
state:
failbool: true
in:
input1: input_datasets
identifier:
tool_id: identifier_collection
in:
input1: fail_identifier_1/out_file1
test_data:
input_datasets:
collection_type: list
elements:
- identifier: fail
value: 1.fastq
type: File
- identifier: success
value: 1.fastq
type: File
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
history_contents = self.dataset_populator._get_contents_request(history_id=history_id).json()
input_collection = self.dataset_populator.get_history_collection_details(history_id, hid=1, assert_ok=False)
first_input = input_collection["elements"][0]
paused_dataset = history_contents[-1]
failed_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, assert_ok=False)
assert paused_dataset["state"] == "paused", paused_dataset
assert failed_dataset["state"] == "error", failed_dataset
inputs = {
"input1": {"values": [{"src": "dce", "id": first_input["id"]}]},
"failbool": "false",
"rerun_remap_job_id": failed_dataset["creating_job"],
}
run_dict = self.dataset_populator.run_tool(
tool_id="fail_identifier",
inputs=inputs,
history_id=history_id,
)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(
history_id, wait=True, assert_ok=False
)
assert unpaused_dataset["state"] == "ok"
contents = self.dataset_populator.get_history_dataset_content(history_id, hid=7, assert_ok=False)
assert contents == "fail\nsuccess\n", contents
replaced_hda_id = run_dict["outputs"][0]["id"]
replaced_hda = self.dataset_populator.get_history_dataset_details(
history_id, dataset_id=replaced_hda_id, wait=True, assert_ok=False
)
assert not replaced_hda["visible"], replaced_hda
[docs] def test_workflow_resume_with_mapped_over_collection_input(self):
# Test that replacement and resume also works if the failed job re-run works on a input DCE
with self.dataset_populator.test_history() as history_id:
job_summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_collection: collection
steps:
- tool_id: collection_creates_list_of_pairs
state:
failbool: true
in:
input1:
source: input_collection
- tool_id: collection_creates_list_of_pairs
state:
failbool: false
in:
input1:
source: 1/list_output
test_data:
input_collection:
collection_type: "list:list:paired"
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation = self.workflow_populator.get_invocation(job_summary.invocation_id, step_details=True)
failed_step = invocation["steps"][1]
assert failed_step["jobs"][0]["state"] == "error"
failed_hdca_id = failed_step["output_collections"]["list_output"]["id"]
failed_hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id, content_id=failed_hdca_id, assert_ok=False
)
assert (
failed_hdca["elements"][0]["object"]["elements"][0]["object"]["elements"][0]["object"]["state"]
== "error"
)
paused_step = invocation["steps"][2]
# job not created, input in error state
assert paused_step["jobs"][0]["state"] == "paused"
input_hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id, content_id=job_summary.inputs["input_collection"]["id"], assert_ok=False
)
# now re-run errored job
inputs = {
"input1": {"values": [{"src": "dce", "id": input_hdca["elements"][0]["id"]}]},
"failbool": "false",
"rerun_remap_job_id": failed_step["jobs"][0]["id"],
}
run_response = self.dataset_populator.run_tool(
tool_id="collection_creates_list_of_pairs",
inputs=inputs,
history_id=history_id,
)
assert not run_response["output_collections"][0]["visible"]
self.dataset_populator.wait_for_job(paused_step["jobs"][0]["id"])
invocation = self.workflow_populator.get_invocation(job_summary.invocation_id, step_details=True)
rerun_step = invocation["steps"][1]
assert rerun_step["jobs"][0]["state"] == "ok"
replaced_hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id, content_id=failed_hdca_id, assert_ok=False
)
assert (
replaced_hdca["elements"][0]["object"]["elements"][0]["object"]["elements"][0]["object"]["state"]
== "ok"
)
[docs] @skip_without_tool("multi_data_optional")
def test_workflow_list_list_multi_data_map_over(self):
# Test that a list:list is reduced to list with a multiple="true" data input
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
input_datasets: collection
steps:
multi_data_optional:
tool_id: multi_data_optional
in:
input1: input_datasets
"""
)
with self.dataset_populator.test_history() as history_id:
hdca_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"0": self._ds_entry(hdca_id),
}
invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
output_collection = self.dataset_populator.get_history_collection_details(history_id, hid=6)
assert output_collection["collection_type"] == "list"
assert output_collection["job_source_type"] == "ImplicitCollectionJobs"
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_creates_pair")
def test_workflow_run_output_collection_mapping(self):
workflow_id = self._upload_yaml_workflow(WORKFLOW_WITH_OUTPUT_COLLECTION_MAPPING)
with self.dataset_populator.test_history() as history_id:
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd\n", "e\nf\ng\nh\n"]
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"0": self._ds_entry(hdca1),
}
invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
assert "a\nc\nb\nd\ne\ng\nf\nh\n" == self.dataset_populator.get_history_dataset_content(history_id, hid=0)
[docs] @skip_without_tool("cat_list")
@skip_without_tool("collection_split_on_column")
def test_workflow_run_dynamic_output_collections(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION, history_id=history_id, assert_ok=True, wait=True)
details = self.dataset_populator.get_history_dataset_details(history_id, hid=0)
last_item_hid = details["hid"]
assert last_item_hid == 7, f"Expected 7 history items, got {last_item_hid}"
content = self.dataset_populator.get_history_dataset_content(history_id, hid=0)
assert "10.0\n30.0\n20.0\n40.0\n" == content
[docs] @skip_without_tool("collection_split_on_column")
@skip_without_tool("min_repeat")
def test_workflow_run_dynamic_output_collections_2(self):
# A more advanced output collection workflow, testing regression of
# https://github.com/galaxyproject/galaxy/issues/776
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input_1: data
test_input_2: data
test_input_3: data
steps:
split_up:
tool_id: collection_split_on_column
in:
input1: test_input_2
min_repeat:
tool_id: min_repeat
in:
queries_0|input: test_input_1
queries2_0|input2: split_up/split_output
"""
)
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t20.0\nsamp2\t40.0\n")
hda3 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t60.0\n")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"0": self._ds_entry(hda1),
"1": self._ds_entry(hda2),
"2": self._ds_entry(hda3),
}
invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=7)
assert collection_details["populated_state"] == "ok"
content = self.dataset_populator.get_history_dataset_content(history_id, hid=11)
assert content.strip() == "samp1\t10.0\nsamp2\t20.0"
[docs] @skip_without_tool("cat")
@skip_without_tool("collection_split_on_column")
def test_workflow_run_dynamic_output_collections_3(self):
# Test a workflow that create a list:list:list followed by a mapping step.
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
text_input1: data
text_input2: data
steps:
cat_inputs:
tool_id: cat1
in:
input1: text_input1
queries_0|input2: text_input2
split_up_1:
tool_id: collection_split_on_column
in:
input1: cat_inputs/out_file1
split_up_2:
tool_id: collection_split_on_column
in:
input1: split_up_1/split_output
cat_output:
tool_id: cat
in:
input1: split_up_2/split_output
"""
)
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t40.0\n")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"0": self._ds_entry(hda1),
"1": self._ds_entry(hda2),
}
invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
[docs] @skip_without_tool("cat1")
@skip_without_tool("__FLATTEN__")
def test_workflow_input_tags(self):
workflow = self.workflow_populator.load_workflow_from_resource(name="test_workflow_with_input_tags")
workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(workflow_id)
count = 0
tag_test = ["tag1", "tag2"]
for step in downloaded_workflow["steps"]:
current = json.loads(downloaded_workflow["steps"][step]["tool_state"])
assert current["tag"] == tag_test[count]
count += 1
[docs] @skip_without_tool("column_param")
def test_empty_file_data_column_specified(self):
# Regression test for https://github.com/galaxyproject/galaxy/pull/10981
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""class: GalaxyWorkflow
steps:
empty_output:
tool_id: empty_output
outputs:
out_file1:
change_datatype: tabular
column_param:
tool_id: column_param
in:
input1: empty_output/out_file1
state:
col: 2
col_names: 'B'
""",
history_id=history_id,
)
[docs] @skip_without_tool("column_param_list")
def test_comma_separated_columns(self):
# Regression test for https://github.com/galaxyproject/galaxy/pull/10981
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""class: GalaxyWorkflow
steps:
empty_output:
tool_id: empty_output
outputs:
out_file1:
change_datatype: tabular
column_param_list:
tool_id: column_param_list
in:
input1: empty_output/out_file1
state:
col: '2,3'
col_names: 'B'
""",
history_id=history_id,
)
[docs] @skip_without_tool("column_param_list")
def test_comma_separated_columns_with_trailing_newline(self):
# Tests that workflows with weird tool state continue to run.
# In this case the newline may have been added by the workflow editor
# text field that is used for data_column parameters
with self.dataset_populator.test_history() as history_id:
job_summary = self._run_workflow(
"""class: GalaxyWorkflow
steps:
empty_output:
tool_id: empty_output
outputs:
out_file1:
change_datatype: tabular
column_param_list:
tool_id: column_param_list
in:
input1: empty_output/out_file1
state:
col: '2,3\n'
col_names: 'B\n'
""",
history_id=history_id,
)
job = self.dataset_populator.get_job_details(job_summary.jobs[0]["id"], full=True).json()
assert "col 2,3" in job["command_line"]
assert 'echo "col_names B" >>' in job["command_line"]
[docs] @skip_without_tool("column_param")
def test_runtime_data_column_parameter(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow_with_runtime_data_column_parameter(history_id)
[docs] @skip_without_tool("mapper")
@skip_without_tool("pileup")
def test_workflow_metadata_validation_0(self):
# Testing regression of
# https://github.com/galaxyproject/galaxy/issues/1514
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input_fastqs: collection
reference: data
steps:
map_over_mapper:
tool_id: mapper
in:
input1: input_fastqs
reference: reference
pileup:
tool_id: pileup
in:
input1: map_over_mapper/out_file1
reference: reference
test_data:
input_fastqs:
collection_type: list
elements:
- identifier: samp1
value: 1.fastq
type: File
- identifier: samp2
value: 1.fastq
type: File
reference:
value: 1.fasta
type: File
""",
history_id=history_id,
)
[docs] def test_run_workflow_pick_value_bam_pja(self):
# Makes sure that setting metadata on expression tool data outputs
# doesn't break result evaluation.
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""class: GalaxyWorkflow
inputs:
some_file:
type: data
steps:
pick_value:
tool_id: pick_value
in:
style_cond|type_cond|pick_from_0|value:
source: some_file
out:
data_param:
change_datatype: bam
tool_state:
style_cond:
__current_case__: 2
pick_style: first_or_error
type_cond:
__current_case__: 4
param_type: data
pick_from:
- __index__: 0
value:
__class__: RuntimeValue
""",
test_data="""
some_file:
value: 1.bam
file_type: bam
type: File
""",
history_id=history_id,
)
[docs] def test_run_workflow_simple_conditional_step(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_file:
type: data
steps:
cat1:
tool_id: cat1
in:
input1: some_file
should_run: should_run
when: $(inputs.should_run)
""",
test_data="""
some_file:
value: 1.bed
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "cat1":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1
[docs] def test_run_workflow_simple_conditional_step_with_nested_tool_state(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_file:
type: data
steps:
nested_tool_state:
tool_id: identifier_multiple_in_conditional
state:
outer_cond:
cond_param_outer: true
inner_cond:
cond_param_inner: true
input1:
$link: some_file
in:
should_run: should_run
when: $(inputs.should_run)
""",
test_data="""
some_file:
value: 1.bed
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "identifier_multiple_in_conditional":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1
[docs] def test_run_workflow_invalid_when_expression(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_file:
type: data
steps:
cat1:
tool_id: cat1
in:
input1: some_file
should_run: should_run
when: $(:syntaxError:)
""",
test_data="""
some_file:
value: 1.bed
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=False,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "expression_evaluation_failed"
[docs] def test_run_workflow_fails_when_expression_not_boolean(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_file:
type: data
steps:
cat1:
tool_id: cat1
in:
input1: some_file
should_run: should_run
when: $("false")
""",
test_data="""
some_file:
value: 1.bed
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=False,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "when_not_boolean"
assert message["details"] == "Type is: str"
assert message["workflow_step_id"] == 2
[docs] def test_run_workflow_subworkflow_conditional_with_simple_mapping_step(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_collection:
type: data_collection
steps:
subworkflow:
run:
class: GalaxyWorkflow
inputs:
some_collection:
type: data_collection
should_run:
type: boolean
steps:
a_tool_step:
tool_id: cat1
in:
input1: some_collection
in:
some_collection: some_collection
should_run: should_run
outputs:
inner_out: a_tool_step/out_file1
when: $(inputs.should_run)
outputs:
outer_output:
outputSource: subworkflow/inner_out
""",
test_data="""
some_collection:
collection_type: list
elements:
- identifier: true
content: A
- identifier: false
content: B
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "a_tool_step":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 2
[docs] def test_run_workflow_subworkflow_conditional_step(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_file:
type: data
steps:
subworkflow:
run:
class: GalaxyWorkflow
inputs:
some_file:
type: data
should_run:
type: boolean
steps:
a_tool_step:
tool_id: cat1
in:
input1: some_file
in:
some_file: some_file
should_run: should_run
outputs:
inner_out: a_tool_step/out_file1
when: $(inputs.should_run)
outputs:
outer_output:
outputSource: subworkflow/inner_out
""",
test_data="""
some_file:
value: 1.bed
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "a_tool_step":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1
[docs] def test_run_nested_conditional_workflow_steps(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
dataset:
type: data
when:
type: boolean
outputs:
output:
outputSource: outer_subworkflow/output
steps:
- label: outer_subworkflow
when: $(inputs.when)
in:
dataset:
source: dataset
when:
source: when
run:
class: GalaxyWorkflow
label: subworkflow cat1
inputs:
dataset:
type: data
outputs:
output:
outputSource: cat1_workflow/output
steps:
- label: cat1_workflow
in:
dataset:
source: dataset
run:
class: GalaxyWorkflow
label: cat1
inputs:
dataset:
type: data
outputs:
output:
outputSource: cat1/out_file1
steps:
- tool_id: cat1
label: cat1
in:
input1:
source: dataset
""",
test_data="""
dataset:
value: 1.bed
type: File
when:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "cat1":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1
[docs] def test_run_workflow_conditional_step_map_over_expression_tool(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
boolean_input_files: collection
steps:
- label: param_out
tool_id: param_value_from_file
in:
input1: boolean_input_files
state:
param_type: boolean
- label: consume_expression_parameter
tool_id: cat1
in:
input1: boolean_input_files
should_run: param_out/boolean_param
out:
out_file1:
change_datatype: txt
when: $(inputs.should_run)
test_data:
boolean_input_files:
collection_type: list
elements:
- identifier: true
content: true
- identifier: false
content: false
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "consume_expression_parameter":
skipped_jobs = [j for j in step["jobs"] if j["state"] == "skipped"]
assert len(skipped_jobs) == 1
# also assert that change_datatype was ignored for null output
job_details = self.dataset_populator.get_job_details(skipped_jobs[0]["id"], full=True).json()
skipped_hda_id = job_details["outputs"]["out_file1"]["id"]
dataset_details = self.dataset_populator.get_history_dataset_details(
history_id=history_id, content_id=skipped_hda_id
)
assert dataset_details["file_ext"] == "expression.json", dataset_details
[docs] def test_run_workflow_conditional_subworkflow_step_map_over_expression_tool(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
boolean_input_files: collection
steps:
create_list_of_boolean:
tool_id: param_value_from_file
in:
input1: boolean_input_files
state:
param_type: boolean
subworkflow:
run:
class: GalaxyWorkflow
inputs:
boolean_input_file: data
should_run: boolean
steps:
consume_expression_parameter:
tool_id: cat1
in:
input1: boolean_input_file
out:
out_file1:
change_datatype: txt
outputs:
inner_output:
outputSource: consume_expression_parameter/out_file1
in:
boolean_input_file: boolean_input_files
should_run: create_list_of_boolean/boolean_param
when: $(inputs.should_run)
outputs:
outer_output:
outputSource: subworkflow/inner_output
test_data:
boolean_input_files:
collection_type: list
elements:
- identifier: true
content: true
- identifier: false
content: false
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert "outer_output" in invocation_details["output_collections"]
outer_output = invocation_details["output_collections"]["outer_output"]
outer_hdca = self.dataset_populator.get_history_collection_details(
history_id, content_id=outer_output["id"]
)
assert outer_hdca["job_state_summary"]["all_jobs"] == 2
assert outer_hdca["job_state_summary"]["ok"] == 1
assert outer_hdca["job_state_summary"]["skipped"] == 1
[docs] def test_run_workflow_conditional_subworkflow_step_map_over_expression_tool_with_extra_nesting(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE,
test_data="""boolean_input_files:
collection_type: list
elements:
- identifier: true
content: true
- identifier: false
content: false
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
outer_create_nested_id = invocation_details["output_collections"]["outer_create_nested"]["id"]
outer_create_nested = self.dataset_populator.get_history_collection_details(
history_id, content_id=outer_create_nested_id
)
assert outer_create_nested["job_state_summary"]["all_jobs"] == 2
assert outer_create_nested["job_state_summary"]["ok"] == 1
assert outer_create_nested["job_state_summary"]["skipped"] == 1
for cat1_output in ["outer_output_1", "outer_output_2"]:
outer_output = invocation_details["output_collections"][cat1_output]
outer_hdca = self.dataset_populator.get_history_collection_details(
history_id, content_id=outer_output["id"]
)
# You might expect 12 total jobs, 6 ok and 6 skipped,
# but because we're not actually running one branch of collection_creates_dynamic_nested
# there's no input to consume_expression_parameter.
# It's unclear if that's a problem or not ... probably not a major one,
# since we keep producing "empty" outer collections, which seems somewhat correct.
assert outer_hdca["job_state_summary"]["all_jobs"] == 6
assert outer_hdca["job_state_summary"]["ok"] == 6
assert outer_hdca["collection_type"] == "list:list:list"
elements = outer_hdca["elements"]
assert elements[0]["element_identifier"] == "True"
assert elements[0]["object"]["element_count"] == 3
assert elements[1]["element_identifier"] == "False"
assert elements[1]["object"]["element_count"] == 0
[docs] def test_run_workflow_conditional_subworkflow_step_map_over_expression_tool_with_extra_nesting_skip_all(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE,
test_data="""boolean_input_files:
collection_type: list
elements:
- identifier: false
content: false
- identifier: also_false
content: false
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
outer_create_nested_id = invocation_details["output_collections"]["outer_create_nested"]["id"]
outer_create_nested = self.dataset_populator.get_history_collection_details(
history_id, content_id=outer_create_nested_id
)
assert outer_create_nested["job_state_summary"]["all_jobs"] == 2
assert outer_create_nested["job_state_summary"]["skipped"] == 2
for cat1_output in ["outer_output_1", "outer_output_2"]:
outer_output = invocation_details["output_collections"][cat1_output]
outer_hdca = self.dataset_populator.get_history_collection_details(
history_id, content_id=outer_output["id"]
)
assert outer_hdca["job_state_summary"]["all_jobs"] == 0
assert outer_hdca["collection_type"] == "list:list:list"
[docs] def test_run_workflow_conditional_step_map_over_expression_tool_pick_value(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
boolean_input_files_1: collection
boolean_input_files_2: collection
outputs:
my_output:
outputSource: pick_value/data_param
steps:
- label: param_out_1
tool_id: param_value_from_file
in:
input1: boolean_input_files_1
state:
param_type: boolean
- label: param_out_2
tool_id: param_value_from_file
in:
input1: boolean_input_files_2
state:
param_type: boolean
- label: consume_expression_parameter_1
tool_id: cat1
in:
input1: boolean_input_files_1
should_run: param_out_1/boolean_param
when: $(inputs.should_run)
- label: consume_expression_parameter_2
tool_id: cat1
in:
input1: boolean_input_files_2
should_run: param_out_2/boolean_param
when: $(inputs.should_run)
- label: pick_value
tool_id: pick_value
tool_state:
style_cond:
__current_case__: 2
pick_style: first_or_error
type_cond:
__current_case__: 4
param_type: data
pick_from:
- __index__: 0
value:
__class__: RuntimeValue
- __index__: 1
value:
__class__: RuntimeValue
in:
style_cond|type_cond|pick_from_0|value:
source: consume_expression_parameter_1/out_file1
style_cond|type_cond|pick_from_1|value:
source: consume_expression_parameter_2/out_file1
test_data:
boolean_input_files_1:
collection_type: list
elements:
- identifier: true
content: true
- identifier: false
content: false
boolean_input_files_2:
collection_type: list
elements:
- identifier: false
content: false
- identifier: true
content: true
""",
history_id=history_id,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
output_collection_id = invocation_details["output_collections"]["my_output"]["id"]
hdca_details = self.dataset_populator.get_history_collection_details(
history_id=history_id, content_id=output_collection_id
)
elements = hdca_details["elements"]
assert len(elements) == 2
for element in elements:
content = self.dataset_populator.get_history_dataset_content(
history_id, content_id=element["object"]["id"]
)
assert content == "True"
for step in invocation_details["steps"]:
if step["workflow_step_label"].startswith("consume_expression_parameter_"):
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1
[docs] def test_run_subworkflow_simple(self) -> None:
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
WORKFLOW_NESTED_SIMPLE,
test_data="""
outer_input:
value: 1.bed
type: File
""",
history_id=history_id,
)
invocation_id = summary.invocation_id
content = self.dataset_populator.get_history_dataset_content(history_id)
assert (
content
== "chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n"
)
steps = self.workflow_populator.get_invocation(invocation_id)["steps"]
assert sum(1 for step in steps if step["subworkflow_invocation_id"] is None) == 3
subworkflow_invocation_id = [
step["subworkflow_invocation_id"] for step in steps if step["subworkflow_invocation_id"]
][0]
subworkflow_invocation = self.workflow_populator.get_invocation(subworkflow_invocation_id)
assert subworkflow_invocation["steps"][0]["workflow_step_label"] == "inner_input"
assert subworkflow_invocation["steps"][1]["workflow_step_label"] == "random_lines"
[docs] @skip_without_tool("random_lines1")
def test_run_subworkflow_runtime_parameters(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_NESTED_RUNTIME_PARAMETER,
test_data="""
step_parameters:
'1':
'1|num_lines': 2
outer_input:
value: 1.bed
type: File
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len([x for x in content.split("\n") if x]) == 2
[docs] @skip_without_tool("cat")
def test_run_subworkflow_replacement_parameters(self):
with self.dataset_populator.test_history() as history_id:
test_data = """
replacement_parameters:
replaceme: moocow
outer_input:
value: 1.bed
type: File
"""
self._run_jobs(WORKFLOW_NESTED_REPLACEMENT_PARAMETER, test_data=test_data, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow suffix"
[docs] @skip_without_tool("create_2")
def test_placements_from_text_inputs(self):
with self.dataset_populator.test_history() as history_id:
run_def = """
class: GalaxyWorkflow
inputs: []
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "${replaceme} name"
out_file2:
rename: "${replaceme} name 2"
test_data:
replacement_parameters:
replaceme: moocow
"""
self._run_jobs(run_def, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2"
run_def = """
class: GalaxyWorkflow
inputs:
replaceme: text
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "${replaceme} name"
out_file2:
rename: "${replaceme} name 2"
test_data:
replaceme:
value: moocow
type: raw
"""
self._run_jobs(run_def, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2", details["name"]
[docs] def test_placements_from_text_inputs_nested(self):
with self.dataset_populator.test_history() as history_id:
run_def = """
class: GalaxyWorkflow
inputs:
replacemeouter: text
steps:
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
replacemeinner: text
outputs:
workflow_output_1:
outputSource: create_2/out_file1
workflow_output_2:
outputSource: create_2/out_file2
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "${replacemeinner} name"
out_file2:
rename: "${replacemeinner} name 2"
in:
replacemeinner: replacemeouter
test_data:
replacemeouter:
value: moocow
type: raw
"""
self._run_jobs(run_def, history_id=history_id)
details = self.dataset_populator.get_history_dataset_details(history_id)
assert details["name"] == "moocow name 2", details["name"]
[docs] @skip_without_tool("random_lines1")
def test_run_runtime_parameters_after_pause(self):
with self.dataset_populator.test_history() as history_id:
workflow_run_description = f"""{WORKFLOW_RUNTIME_PARAMETER_AFTER_PAUSE}
test_data:
step_parameters:
'2':
'num_lines': 2
input1:
value: 1.bed
type: File
"""
job_summary = self._run_workflow(workflow_run_description, history_id=history_id, wait=False)
uploaded_workflow_id, invocation_id = job_summary.workflow_id, job_summary.invocation_id
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
# Review the paused steps to allow the workflow to continue.
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=1, action=True)
# Wait for the workflow to finish scheduling and ensure both the invocation
# and the history are in valid states.
invocation_scheduled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "scheduled")
assert invocation_scheduled, "Workflow state is not scheduled..."
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len([x for x in content.split("\n") if x]) == 2
[docs] def test_run_subworkflow_auto_labels(self):
def run_test(workflow_text):
with self.dataset_populator.test_history() as history_id:
test_data = """
outer_input:
value: 1.bed
type: File
"""
summary = self._run_workflow(workflow_text, test_data=test_data, history_id=history_id)
jobs = summary.jobs
num_jobs = len(jobs)
assert num_jobs == 2, f"2 jobs expected, got {num_jobs} jobs"
content = self.dataset_populator.get_history_dataset_content(history_id)
assert (
content
== "chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n"
)
run_test(NESTED_WORKFLOW_AUTO_LABELS_MODERN_SYNTAX)
[docs] @skip_without_tool("cat1")
@skip_without_tool("collection_paired_test")
def test_workflow_run_zip_collections(self):
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input_1: data
test_input_2: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input_1
zip_it:
tool_id: "__ZIP_COLLECTION__"
in:
input_forward: first_cat/out_file1
input_reverse: test_input_2
concat_pair:
tool_id: collection_paired_test
in:
f1: zip_it/output
"""
)
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t20.0\nsamp2\t40.0\n")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"0": self._ds_entry(hda1),
"1": self._ds_entry(hda2),
}
invocation_id = self.__invoke_workflow(workflow_id, inputs=inputs, history_id=history_id)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert content.strip() == "samp1\t10.0\nsamp2\t20.0\nsamp1\t20.0\nsamp2\t40.0"
[docs] @skip_without_tool("collection_paired_test")
def test_workflow_flatten(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
nested:
tool_id: collection_creates_dynamic_nested
state:
sleep_time: 0
foo: 'dummy'
flatten:
tool_id: '__FLATTEN__'
state:
input:
$link: nested/list_output
join_identifier: '-'
""",
test_data={},
history_id=history_id,
)
details = self.dataset_populator.get_history_collection_details(history_id, hid=14)
assert details["collection_type"] == "list"
elements = details["elements"]
identifiers = [e["element_identifier"] for e in elements]
assert len(identifiers) == 6
assert "oe1-ie1" in identifiers
[docs] @skip_without_tool("collection_paired_test")
def test_workflow_flatten_with_mapped_over_execution(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
r"""
class: GalaxyWorkflow
inputs:
input_fastqs: collection
steps:
split_up:
tool_id: collection_split_on_column
in:
input1: input_fastqs
flatten:
tool_id: '__FLATTEN__'
in:
input: split_up/split_output
join_identifier: '-'
test_data:
input_fastqs:
collection_type: list
elements:
- identifier: samp1
content: "0\n1"
""",
history_id=history_id,
)
history = self._get(f"histories/{history_id}/contents").json()
flattened_collection = history[-1]
assert flattened_collection["history_content_type"] == "dataset_collection"
assert flattened_collection["collection_type"] == "list"
assert flattened_collection["element_count"] == 2
nested_collection = self.dataset_populator.get_history_collection_details(history_id, hid=3)
assert nested_collection["collection_type"] == "list:list"
assert nested_collection["element_count"] == 1
assert nested_collection["elements"][0]["object"]["populated"]
assert nested_collection["elements"][0]["object"]["element_count"] == 2
[docs] @skip_without_tool("cat")
def test_workflow_invocation_report_1(self):
test_data = """
input_1:
value: 1.bed
type: File
"""
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_1: data
outputs:
output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat
in:
input1: input_1
""",
test_data=test_data,
history_id=history_id,
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
report_json = self.workflow_populator.workflow_report_json(workflow_id, invocation_id)
assert "markdown" in report_json
self._assert_has_keys(report_json, "markdown", "render_format")
assert report_json["render_format"] == "markdown"
markdown_content = report_json["markdown"]
assert "## Workflow Outputs" in markdown_content
assert "## Workflow Inputs" in markdown_content
assert "## About This Report" not in markdown_content
[docs] @skip_without_tool("cat")
def test_workflow_invocation_report_custom(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
WORKFLOW_WITH_CUSTOM_REPORT_1, test_data=WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA, history_id=history_id
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
downloaded_workflow = self._download_workflow(workflow_id)
assert "report" in downloaded_workflow
report_config = downloaded_workflow["report"]
assert "markdown" in report_config
report_json = self.workflow_populator.workflow_report_json(workflow_id, invocation_id)
assert "markdown" in report_json, f"markdown not in report json {report_json}"
self._assert_has_keys(report_json, "markdown", "render_format")
assert report_json["render_format"] == "markdown"
markdown_content = report_json["markdown"]
assert "## Workflow Outputs" in markdown_content
assert "\n```galaxy\nhistory_dataset_display(history_dataset_id=" in markdown_content
assert "## Workflow Inputs" in markdown_content
assert "## About This Report" in markdown_content
[docs] @skip_without_tool("cat1")
def test_export_invocation_bco(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id)
invocation_id = summary.invocation_id
bco = self.workflow_populator.get_biocompute_object(invocation_id)
self.workflow_populator.validate_biocompute_object(bco)
assert bco["provenance_domain"]["name"] == "Simple Workflow"
[docs] @skip_without_tool("cat1")
def test_export_invocation_ro_crate(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id)
invocation_id = summary.invocation_id
crate = self.workflow_populator.get_ro_crate(invocation_id, include_files=True)
workflow = crate.mainEntity
assert workflow
[docs] @skip_without_tool("__MERGE_COLLECTION__")
def test_merge_collection_scheduling(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
collection:
type: collection
collection_type: list
outputs:
merge_out:
outputSource: merge/output
steps:
sleep:
tool_id: cat_data_and_sleep
in:
input1: collection
state:
sleep_time: 5
merge:
tool_id: __MERGE_COLLECTION__
in:
inputs_1|input: sleep/out_file1
inputs_0|input: sleep/out_file1
test_data:
collection:
collection_type: list
elements:
- identifier: 1
content: A
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
merge_out_id = invocation["output_collections"]["merge_out"]["id"]
merge_out = self.dataset_populator.get_history_collection_details(history_id, content_id=merge_out_id)
assert merge_out["element_count"] == 1
assert merge_out["elements"][0]["object"]["state"] == "ok"
[docs] @skip_without_tool("__MERGE_COLLECTION__")
@skip_without_tool("cat_collection")
@skip_without_tool("head")
def test_export_invocation_ro_crate_adv(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input collection 1:
type: collection
collection_type: list
optional: false
input collection 2:
type: collection
collection_type: list
optional: false
num_lines_param:
type: int
optional: false
default: 2
outputs:
_anonymous_output_1:
outputSource: num_lines_param
output_collection:
outputSource: merge collections tool
concatenated_collection:
outputSource: concat collection/out_file1
output:
outputSource: select lines/out_file1
steps:
merge collections tool:
tool_id: __MERGE_COLLECTION__
tool_version: 1.0.0
tool_state:
advanced:
conflict:
__current_case__: 0
duplicate_options: keep_first
inputs:
- __index__: 0
input:
__class__: ConnectedValue
- __index__: 1
input:
__class__: ConnectedValue
in:
inputs_1|input:
source: input collection 2
inputs_0|input:
source: input collection 1
concat collection:
tool_id: cat_collection
tool_state:
input1:
__class__: RuntimeValue
in:
input1:
source: merge collections tool
select lines:
tool_id: head
tool_state:
input:
__class__: RuntimeValue
lineNum:
__class__: ConnectedValue
in:
lineNum:
source: num_lines_param
input:
source: concat collection/out_file1
""",
test_data="""
num_lines_param:
type: int
value: 2
input collection 1:
collection_type: list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
input collection 2:
collection_type: list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
""",
history_id=history_id,
wait=True,
)
invocation_id = summary.invocation_id
crate = self.workflow_populator.get_ro_crate(invocation_id, include_files=True)
workflow = crate.mainEntity
root = crate.root_dataset
assert len(root["mentions"]) == 4
actions = [_ for _ in crate.contextual_entities if "CreateAction" in _.type]
assert len(actions) == 1
wf_action = actions[0]
wf_objects = wf_action["object"]
assert len(workflow["input"]) == 3
assert len(workflow["output"]) == 3
collections = [_ for _ in crate.contextual_entities if "Collection" in _.type]
assert len(collections) == 3
collection = collections[0]
assert (
collection["additionalType"]
== "https://training.galaxyproject.org/training-material/faqs/galaxy/collections_build_list.html"
)
assert collection.type == "Collection"
assert len(collection["hasPart"]) == 2
assert collection in wf_objects
coll_dataset = collection["hasPart"][0].id
assert coll_dataset in [_.id for _ in collections[2]["hasPart"]]
property_values = [_ for _ in crate.contextual_entities if "PropertyValue" in _.type]
assert len(property_values) == 1
for pv in property_values:
assert pv in wf_objects
assert pv["exampleOfWork"] in workflow["input"]
[docs] @skip_without_tool("__APPLY_RULES__")
def test_workflow_run_apply_rules(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
WORKFLOW_WITH_RULES_1,
history_id=history_id,
wait=True,
assert_ok=True,
round_trip_format_conversion=True,
)
output_content = self.dataset_populator.get_history_collection_details(history_id, hid=6)
rules_test_data.check_example_2(output_content, self.dataset_populator)
[docs] def test_filter_failed_mapping(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_c: collection
steps:
mixed_collection:
tool_id: exit_code_from_file
state:
input:
$link: input_c
filtered_collection:
tool_id: "__FILTER_FAILED_DATASETS__"
state:
input:
$link: mixed_collection/out_file1
cat:
tool_id: cat1
state:
input1:
$link: filtered_collection
""",
test_data="""
input_c:
collection_type: list
elements:
- identifier: i1
content: "0"
- identifier: i2
content: "1"
""",
history_id=history_id,
wait=True,
assert_ok=False,
)
jobs = summary.jobs
def filter_jobs_by_tool(tool_id):
return [j for j in summary.jobs if j["tool_id"] == tool_id]
assert len(filter_jobs_by_tool("exit_code_from_file")) == 2, jobs
assert len(filter_jobs_by_tool("__FILTER_FAILED_DATASETS__")) == 1, jobs
# Follow proves one job was filtered out of the result of cat1
assert len(filter_jobs_by_tool("cat1")) == 1, jobs
[docs] def test_workflow_request(self):
workflow = self.workflow_populator.load_workflow(name="test_for_queue")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow)
run_workflow_response = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
)
invocation_id = run_workflow_response.json()["id"]
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
[docs] def test_workflow_new_autocreated_history(self):
workflow = self.workflow_populator.load_workflow(name="test_for_new_autocreated_history")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow)
del workflow_request[
"history"
] # Not passing a history param means asking for a new history to be automatically created
run_workflow_dict = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
).json()
new_history_id = run_workflow_dict["history_id"]
assert history_id != new_history_id
invocation_id = run_workflow_dict["id"]
self.workflow_populator.wait_for_invocation_and_jobs(new_history_id, workflow_id, invocation_id)
[docs] def test_workflow_output_dataset(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(WORKFLOW_SIMPLE, test_data={"input1": "hello world"}, history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 0
assert len(invocation["outputs"]) == 1
output_content = self.dataset_populator.get_history_dataset_content(
history_id, dataset_id=invocation["outputs"]["wf_output_1"]["id"]
)
assert "hello world" == output_content.strip()
[docs] @skip_without_tool("cat")
def test_workflow_output_dataset_collection(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow_with_output_collections(history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"]
)
self._assert_has_keys(output_content, "id", "elements")
assert output_content["collection_type"] == "list"
elements = output_content["elements"]
assert len(elements) == 1
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
[docs] def test_workflow_input_as_output(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow_with_inputs_as_outputs(history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 0
assert len(invocation["outputs"]) == 1
assert len(invocation["output_values"]) == 1
assert "wf_output_param" in invocation["output_values"]
assert invocation["output_values"]["wf_output_param"] == "A text variable", invocation["output_values"]
output_content = self.dataset_populator.get_history_dataset_content(
history_id, content_id=invocation["outputs"]["wf_output_1"]["id"]
)
assert output_content == "hello world\n"
[docs] def test_subworkflow_output_as_output(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: nested_workflow/inner_output
steps:
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
inner_output:
outputSource: inner_input
steps: []
in:
inner_input: input1
""",
test_data={"input1": "hello world"},
history_id=history_id,
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 0
assert len(invocation["outputs"]) == 1
output_content = self.dataset_populator.get_history_dataset_content(
history_id, content_id=invocation["outputs"]["wf_output_1"]["id"]
)
assert output_content == "hello world\n"
[docs] @skip_without_tool("cat")
def test_workflow_input_mapping(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat
in:
input1: input1
""",
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
""",
history_id=history_id,
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"]
)
self._assert_has_keys(output_content, "id", "elements")
elements = output_content["elements"]
assert len(elements) == 2
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
[docs] @skip_without_tool("collection_creates_pair")
def test_workflow_run_input_mapping_with_output_collections(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
text_input: data
outputs:
wf_output_1:
outputSource: split_up/paired_output
steps:
split_up:
tool_id: collection_creates_pair
in:
input1: text_input
""",
test_data="""
text_input:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
""",
history_id=history_id,
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["output_collections"]["wf_output_1"]["id"]
)
self._assert_has_keys(output_content, "id", "elements")
assert output_content["collection_type"] == "list:paired", output_content
elements = output_content["elements"]
assert len(elements) == 2
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
jobs_summary_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}/jobs_summary")
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
assert "states" in jobs_summary
invocation_states = jobs_summary["states"]
assert invocation_states and "ok" in invocation_states, jobs_summary
assert invocation_states["ok"] == 2, jobs_summary
assert jobs_summary["model"] == "WorkflowInvocation", jobs_summary
jobs_summary_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}/step_jobs_summary")
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
assert len(jobs_summary) == 1
collection_summary = jobs_summary[0]
assert "states" in collection_summary
collection_states = collection_summary["states"]
assert collection_states and "ok" in collection_states, collection_states
assert collection_states["ok"] == 2, collection_summary
assert collection_summary["model"] == "ImplicitCollectionJobs", collection_summary
[docs] def test_workflow_run_input_mapping_with_subworkflows(self):
with self.dataset_populator.test_history() as history_id:
test_data = """
outer_input:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
- identifier: el2
value: 1.fastq
type: File
"""
summary = self._run_workflow(WORKFLOW_NESTED_SIMPLE, test_data=test_data, history_id=history_id)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation_response = self._get(f"workflows/{workflow_id}/invocations/{invocation_id}")
self._assert_status_code_is(invocation_response, 200)
invocation = invocation_response.json()
self._assert_has_keys(invocation, "id", "outputs", "output_collections")
assert len(invocation["output_collections"]) == 1, invocation
assert len(invocation["outputs"]) == 0
output_content = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["output_collections"]["outer_output"]["id"]
)
self._assert_has_keys(output_content, "id", "elements")
assert output_content["collection_type"] == "list", output_content
elements = output_content["elements"]
assert len(elements) == 2
elements0 = elements[0]
assert elements0["element_identifier"] == "el1"
[docs] @skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_subworkflow_recover_mapping_1(self):
# This test case tests an outer workflow continues to scheduling and handle
# collection mapping properly after the last step of a subworkflow requires delayed
# evaluation. Testing rescheduling and propagating connections within a subworkflow
# is handled by the next test case.
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
workflow_output:
outputSource: random_lines/out_file1
steps:
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
in:
inner_input: first_cat/out_file1
split:
tool_id: split
in:
input1: nested_workflow/workflow_output
second_cat:
tool_id: cat_list
in:
input1: split/output
test_data:
outer_input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
round_trip_format_conversion=True,
)
assert (
self.dataset_populator.get_history_dataset_content(history_id)
== "chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n"
)
# assert self.dataset_populator.get_history_dataset_content(history_id) == "chr16\t142908\t143003\tCCDS10397.1_cds_0_0_chr16_142909_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n"
[docs] @skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_subworkflow_recover_mapping_2(self):
# Like the above test case, this test case tests an outer workflow continues to
# schedule and handle collection mapping properly after a subworkflow needs to be
# delayed, but this also tests recovering and handling scheduling within the subworkflow
# since the delayed step (split) isn't the last step of the subworkflow.
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
workflow_output:
outputSource: inner_cat/out_file1
steps:
random_lines:
tool_id: random_lines1
in:
input: inner_input
num_lines:
default: 2
seed_source|seed_source_selector:
default: set_seed
seed_source|seed:
default: asdf
split:
tool_id: split
in:
input1: random_lines/out_file1
inner_cat:
tool_id: cat1
in:
input1: split/output
in:
inner_input: first_cat/out_file1
second_cat:
tool_id: cat_list
in:
input1: nested_workflow/workflow_output
""",
test_data="""
outer_input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
round_trip_format_conversion=True,
)
assert (
self.dataset_populator.get_history_dataset_content(history_id)
== "chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n"
)
[docs] @skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_recover_mapping_in_subworkflow(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
outer_input: data
outputs:
outer_output:
outputSource: second_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: outer_input
nested_workflow:
run:
class: GalaxyWorkflow
inputs:
inner_input: data
outputs:
workflow_output:
outputSource: split/output
steps:
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
split:
tool_id: split
in:
input1: random_lines/out_file1
in:
inner_input: first_cat/out_file1
second_cat:
tool_id: cat_list
in:
input1: nested_workflow/workflow_output
""",
test_data="""
outer_input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
round_trip_format_conversion=True,
)
assert (
self.dataset_populator.get_history_dataset_content(history_id)
== "chr6\t108722976\t108723115\tCCDS5067.1_cds_0_0_chr6_108722977_f\t0\t+\nchrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n"
)
[docs] @skip_without_tool("empty_list")
@skip_without_tool("count_list")
@skip_without_tool("random_lines1")
def test_empty_list_mapping(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
count_list:
outputSource: count_list/out_file1
steps:
empty_list:
tool_id: empty_list
in:
input1: input1
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: empty_list/output
seed_source:
seed_source_selector: set_seed
seed: asdf
count_list:
tool_id: count_list
in:
input1: random_lines/out_file1
""",
test_data="""
input1:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
)
assert "0\n" == self.dataset_populator.get_history_dataset_content(history_id)
[docs] def test_subworkflow_map_over_data_column(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""class: GalaxyWorkflow
inputs:
input:
collection_type: list
outputs:
reduced:
outputSource: list:list reduction/out_file1
steps:
subworkflow:
in:
input collection:
source: input
input dataset:
source: input
run:
class: GalaxyWorkflow
inputs:
input dataset:
type: data
input collection:
collection_type: list
outputs:
subworkflow_out:
outputSource: join out/out_file1
steps:
join out:
tool_id: comp1
tool_state:
field1: '1'
field2: '1'
in:
input1:
source: input dataset
input2:
source: input collection
list:list reduction:
tool_id: cat_list
in:
input1:
source: subworkflow/subworkflow_out
test_data:
input:
collection_type: list
elements:
- identifier: 1
content: A 1
ext: tabular
- identifier: 2
content: B 2
ext: tabular
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
[docs] @skip_without_tool("implicit_conversion_format_input")
def test_run_with_implicit_collection_map_over(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
collection: collection
steps:
map_over:
tool_id: implicit_conversion_format_input
in:
input1: collection
test_data:
collection:
collection_type: list
elements:
- identifier: 1
value: 1.fasta.gz
type: File
""",
history_id=history_id,
assert_ok=True,
)
[docs] @skip_without_tool("random_lines1")
def test_change_datatype_collection_map_over(self):
with self.dataset_populator.test_history() as history_id:
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
text_input1: collection
steps:
map_over:
tool_id: random_lines1
in:
input: text_input1
outputs:
out_file1:
change_datatype: csv
""",
test_data="""
text_input1:
collection_type: "list:paired"
""",
history_id=history_id,
)
hdca = self.dataset_populator.get_history_collection_details(history_id=jobs_summary.history_id, hid=4)
assert hdca["collection_type"] == "list:paired"
assert len(hdca["elements"][0]["object"]["elements"]) == 2
forward, reverse = hdca["elements"][0]["object"]["elements"]
assert forward["object"]["file_ext"] == "csv"
assert reverse["object"]["file_ext"] == "csv"
[docs] @skip_without_tool("collection_type_source_map_over")
def test_mapping_and_subcollection_mapping(self):
with self.dataset_populator.test_history() as history_id:
jobs_summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
text_input1: collection
steps:
map_over:
tool_id: collection_type_source_map_over
in:
input_collect: text_input1
""",
test_data="""
text_input1:
collection_type: "list:paired"
""",
history_id=history_id,
)
hdca = self.dataset_populator.get_history_collection_details(history_id=jobs_summary.history_id, hid=1)
assert hdca["collection_type"] == "list:paired"
assert len(hdca["elements"][0]["object"]["elements"]) == 2
[docs] @skip_without_tool("empty_list")
@skip_without_tool("count_multi_file")
@skip_without_tool("random_lines1")
def test_empty_list_reduction(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
count_multi_file:
outputSource: count_multi_file/out_file1
steps:
empty_list:
tool_id: empty_list
in:
input1: input1
random_lines:
tool_id: random_lines1
state:
num_lines: 2
input:
$link: empty_list/output
seed_source:
seed_source_selector: set_seed
seed: asdf
count_multi_file:
tool_id: count_multi_file
in:
input1: random_lines/out_file1
""",
test_data="""
input1:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
round_trip_format_conversion=True,
)
assert "0\n" == self.dataset_populator.get_history_dataset_content(history_id)
[docs] @skip_without_tool("cat")
def test_cancel_new_workflow_when_history_deleted(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# There is no pause of anything in here, so likely the invocation is
# is still in a new state. If it isn't that is fine, continue with the
# test it will just happen to test the same thing as below.
# Wait for all the datasets to complete, make sure the workflow invocation
# is not complete.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
self._delete(f"histories/{history_id}")
invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "cancelled")
workflow_details = self._invocation_details(uploaded_workflow_id, invocation_id)
assert len(workflow_details["messages"]) == 1
message = workflow_details["messages"][0]
assert message["history_id"] == history_id
assert message["reason"] == "history_deleted"
assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("cat")
def test_cancel_ready_workflow_when_history_deleted(self):
# Same as previous test but make sure invocation isn't a new state before
# cancelling.
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Wait for all the datasets to complete, make sure the workflow invocation
# is not complete.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
self._delete(f"histories/{history_id}")
invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "cancelled")
assert invocation_cancelled, "Workflow state is not cancelled..."
workflow_details = self._invocation_details(uploaded_workflow_id, invocation_id)
assert len(workflow_details["messages"]) == 1
message = workflow_details["messages"][0]
assert message["history_id"] == history_id
assert message["reason"] == "history_deleted"
[docs] @skip_without_tool("cat")
def test_workflow_pause(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
# Review the paused steps to allow the workflow to continue.
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=2, action=True)
# Wait for the workflow to finish scheduling and ensure both the invocation
# and the history are in valid states.
invocation_scheduled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "scheduled")
assert invocation_scheduled, "Workflow state is not scheduled..."
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_tool("cat")
def test_workflow_pause_cancel(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
# Review the paused workflow and cancel it at the paused step.
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=2, action=False)
# Ensure the workflow eventually becomes cancelled.
invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "cancelled")
workflow_details = self._invocation_details(uploaded_workflow_id, invocation_id)
assert len(workflow_details["messages"]) == 1
message = workflow_details["messages"][0]
assert "workflow_step_id" in message
assert message["reason"] == "cancelled_on_review"
assert invocation_cancelled, "Workflow state is not cancelled..."
[docs] @skip_without_tool("head")
def test_workflow_map_reduce_pause(self):
with self.dataset_populator.test_history() as history_id:
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_map_reduce_pause")
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
hda1 = self.dataset_populator.new_dataset(history_id, content="reviewed\nunreviewed")
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["1\n2\n3", "4\n5\n6"]
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
index_map = {
"0": self._ds_entry(hda1),
"1": self._ds_entry(hdca1),
}
invocation_id = self.__invoke_workflow(uploaded_workflow_id, inputs=index_map, history_id=history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
self.__review_paused_steps(uploaded_workflow_id, invocation_id, order_index=4, action=True)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, uploaded_workflow_id, invocation_id)
invocation = self._invocation_details(uploaded_workflow_id, invocation_id)
assert invocation["state"] == "scheduled"
assert "reviewed\n1\nreviewed\n4\n" == self.dataset_populator.get_history_dataset_content(history_id)
[docs] @skip_without_tool("cat")
def test_cancel_workflow_invocation(self):
with self.dataset_populator.test_history() as history_id:
# Invoke a workflow with a pause step.
uploaded_workflow_id, invocation_id = self._invoke_paused_workflow(history_id)
# Wait for at least one scheduling step.
self._wait_for_invocation_non_new(uploaded_workflow_id, invocation_id)
# Make sure the history didn't enter a failed state in there.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
# Assert the workflow hasn't finished scheduling, we can be pretty sure we
# are at the pause step in this case then.
self._assert_invocation_non_terminal(uploaded_workflow_id, invocation_id)
invocation_url = self._api_url(f"workflows/{uploaded_workflow_id}/usage/{invocation_id}", use_key=True)
delete_response = delete(invocation_url)
self._assert_status_code_is(delete_response, 200)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id,
workflow_id=uploaded_workflow_id,
invocation_id=invocation_id,
assert_ok=False,
)
invocation = self._invocation_details(uploaded_workflow_id, invocation_id)
assert invocation["state"] == "cancelled"
message = invocation["messages"][0]
assert message["reason"] == "user_request"
[docs] @skip_without_tool("collection_creates_dynamic_nested")
def test_cancel_workflow_invocation_deletes_jobs(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
list_input:
type: collection
collection_type: list
steps:
first_step:
tool_id: cat_data_and_sleep
in:
input1: list_input
state:
sleep_time: 60
subworkflow_step:
run:
class: GalaxyWorkflow
inputs:
list_input:
type: collection
collection_type: list
steps:
intermediate_step:
tool_id: identifier_multiple
in:
input1: list_input
subworkflow:
in:
list_input: first_step/out_file1
test_data:
list_input:
collection_type: list
elements:
- identifier: 1
content: A
- identifier: 2
content: B
""",
history_id=history_id,
wait=False,
)
# wait_for_invocation just waits until scheduling complete, not jobs or subworkflow invocations
self.workflow_populator.wait_for_invocation("null", summary.invocation_id, assert_ok=True)
invocation_before_cancellation = self.workflow_populator.get_invocation(summary.invocation_id)
assert invocation_before_cancellation["state"] == "scheduled"
subworkflow_invocation_id = invocation_before_cancellation["steps"][2]["subworkflow_invocation_id"]
self.workflow_populator.cancel_invocation(summary.invocation_id)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id,
workflow_id=summary.workflow_id,
invocation_id=summary.invocation_id,
assert_ok=False,
)
invocation_jobs = self.workflow_populator.get_invocation_jobs(summary.invocation_id)
for job in invocation_jobs:
assert job["state"] == "deleted"
subworkflow_invocation_jobs = self.workflow_populator.get_invocation_jobs(subworkflow_invocation_id)
for job in subworkflow_invocation_jobs:
assert job["state"] == "deleted"
[docs] def test_workflow_failed_output_not_found(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs: []
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "my new name"
out_file2:
rename: "my other new name"
first_cat1:
tool_id: cat
in:
input1: create_2/does_not_exist
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id)
assert invocation["state"] == "failed"
assert len(invocation["messages"]) == 1
message = invocation["messages"][0]
assert message["reason"] == "output_not_found"
assert message["workflow_step_id"] == 1
assert message["dependent_workflow_step_id"] == 0
[docs] def test_workflow_warning_workflow_output_not_found(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs: []
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "my new name"
out_file2:
rename: "my other new name"
outputs:
main_out:
outputSource: create_2/does_not_exist
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id)
assert invocation["state"] == "scheduled"
assert len(invocation["messages"]) == 1
message = invocation["messages"][0]
assert message["reason"] == "workflow_output_not_found"
assert "workflow_step_id" in message
assert message["output_name"] == "does_not_exist"
[docs] @skip_without_tool("__APPLY_RULES__")
@skip_without_tool("job_properties")
def test_workflow_failed_input_not_ok(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
steps:
job_props:
tool_id: job_properties
state:
thebool: true
failbool: true
apply:
tool_id: __APPLY_RULES__
in:
input: job_props/list_output
state:
rules:
rules:
- type: add_column_metadata
value: identifier0
mapping:
- type: list_identifiers
columns: [0]
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "dataset_failed"
assert message["workflow_step_id"] == 1
[docs] @skip_without_tool("__RELABEL_FROM_FILE__")
def test_workflow_failed_with_message_exception(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_collection:
collection_type: list
type: collection
relabel_file:
type: data
steps:
relabel:
tool_id: __RELABEL_FROM_FILE__
in:
input: input_collection
how|labels: relabel_file
test_data:
input_collection:
collection_type: "list:list"
relabel_file:
value: 1.bed
type: File
""",
history_id=history_id,
assert_ok=False,
wait=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "unexpected_failure"
assert message["workflow_step_id"] == 2
assert "Invalid new collection identifier" in message["details"]
[docs] @skip_without_tool("identifier_multiple")
def test_invocation_map_over(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_collection:
collection_type: list
type: collection
outputs:
main_out:
outputSource: subworkflow/sub_out
steps:
subworkflow:
in:
data_input: input_collection
run:
class: GalaxyWorkflow
inputs:
data_input:
type: data
outputs:
sub_out:
outputSource: output_step/output1
steps:
intermediate_step:
tool_id: identifier_multiple
in:
input1: data_input
output_step:
tool_id: identifier_multiple
in:
input1: intermediate_step/output1
test_data:
input_collection:
collection_type: list
elements:
- identifier: 1
content: A
- identifier: 2
content: B
""",
history_id=history_id,
assert_ok=True,
wait=True,
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id)
# For consistency and conditional subworkflow steps this really needs to remain
# a collection and not get reduced.
assert "main_out" in invocation["output_collections"], invocation
hdca_details = self.dataset_populator.get_history_collection_details(history_id)
assert hdca_details["collection_type"] == "list"
elements = hdca_details["elements"]
assert len(elements) == 2
assert elements[0]["element_identifier"] == "1"
assert elements[0]["element_type"] == "hda"
hda_id = elements[0]["object"]["id"]
hda_content = self.dataset_populator.get_history_dataset_content(history_id, content_id=hda_id)
assert hda_content.strip() == "1"
[docs] @skip_without_tool("identifier_multiple")
def test_invocation_map_over_inner_collection(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_collection:
collection_type: list:list
type: collection
outputs:
main_out:
outputSource: subworkflow/sub_out
steps:
subworkflow:
in:
list_input: input_collection
run:
class: GalaxyWorkflow
inputs:
list_input:
type: collection
collection_type: list
outputs:
sub_out:
outputSource: output_step/output1
steps:
intermediate_step:
tool_id: identifier_multiple
in:
input1: list_input
output_step:
tool_id: identifier_multiple
in:
input1: intermediate_step/output1
test_data:
input_collection:
collection_type: list:list
""",
history_id=history_id,
assert_ok=True,
wait=True,
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id)
assert "main_out" in invocation["output_collections"], invocation
input_hdca_details = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["inputs"]["0"]["id"]
)
assert input_hdca_details["collection_type"] == "list:list"
assert len(input_hdca_details["elements"]) == 1
assert input_hdca_details["elements"][0]["element_identifier"] == "test_level_1"
hdca_details = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["output_collections"]["main_out"]["id"]
)
assert hdca_details["collection_type"] == "list"
elements = hdca_details["elements"]
assert len(elements) == 1
assert elements[0]["element_identifier"] == "test_level_1"
assert elements[0]["element_type"] == "hda"
[docs] @skip_without_tool("identifier_multiple")
def test_invocation_map_over_inner_collection_with_tool_collection_input(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_collection:
collection_type: list:list
type: collection
outputs:
main_out:
outputSource: subworkflow/sub_out
steps:
subworkflow:
in:
list_input: input_collection
run:
class: GalaxyWorkflow
inputs:
list_input:
type: collection
collection_type: list
outputs:
sub_out:
outputSource: output_step/output1
steps:
output_step:
tool_id: identifier_all_collection_types
in:
input1: list_input
test_data:
input_collection:
collection_type: list:list
""",
history_id=history_id,
assert_ok=True,
wait=True,
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id)
assert "main_out" in invocation["output_collections"], invocation
input_hdca_details = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["inputs"]["0"]["id"]
)
assert input_hdca_details["collection_type"] == "list:list"
assert len(input_hdca_details["elements"]) == 1
assert input_hdca_details["elements"][0]["element_identifier"] == "test_level_1"
hdca_details = self.dataset_populator.get_history_collection_details(
history_id, content_id=invocation["output_collections"]["main_out"]["id"]
)
assert hdca_details["collection_type"] == "list"
elements = hdca_details["elements"]
assert len(elements) == 1
assert elements[0]["element_identifier"] == "test_level_1"
assert elements[0]["element_type"] == "hda"
[docs] @skip_without_tool("cat")
def test_pause_outputs_with_deleted_inputs(self):
self._deleted_inputs_workflow(purge=False)
[docs] @skip_without_tool("cat")
def test_error_outputs_with_purged_inputs(self):
self._deleted_inputs_workflow(purge=True)
def _deleted_inputs_workflow(self, purge):
# We run a workflow on a collection with a deleted element.
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
second_cat:
tool_id: cat
in:
input1: first_cat/out_file1
"""
)
DELETED = 0
PAUSED_1 = 1
PAUSED_2 = 2
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("sample1-1", "1 2 3")], wait=True
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
deleted_id = hdca1["elements"][DELETED]["object"]["id"]
self.dataset_populator.delete_dataset(
history_id=history_id, content_id=deleted_id, purge=purge, wait_for_purge=True
)
label_map = {"input1": self._ds_entry(hdca1)}
workflow_request = dict(
history=f"hist_id={history_id}",
ds_map=self.workflow_populator.build_ds_map(workflow_id, label_map),
)
r = self.workflow_populator.invoke_workflow_raw(workflow_id, workflow_request)
self._assert_status_code_is(r, 200)
invocation_id = r.json()["id"]
# If this starts failing we may have prevented running workflows on collections with deleted members,
# in which case we can disable this test.
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id=workflow_id, invocation_id=invocation_id, assert_ok=False
)
contents = self.__history_contents(history_id)
datasets = [content for content in contents if content["history_content_type"] == "dataset"]
assert datasets[DELETED]["deleted"]
state = "error" if purge else "paused"
assert datasets[PAUSED_1]["state"] == state
assert datasets[PAUSED_2]["state"] == "paused"
[docs] def test_run_with_implicit_connection(self):
with self.dataset_populator.test_history() as history_id:
run_summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
third_cat:
tool_id: random_lines1
in:
$step: second_cat
state:
num_lines: 1
input:
$link: test_input
seed_source:
seed_source_selector: set_seed
seed: asdf
""",
test_data={"test_input": "hello world"},
history_id=history_id,
wait=False,
round_trip_format_conversion=True,
)
history_id = run_summary.history_id
workflow_id = run_summary.workflow_id
invocation_id = run_summary.invocation_id
# Wait for first two jobs to be scheduled - upload and first cat.
wait_on(lambda: len(self._history_jobs(history_id)) >= 2 or None, "history jobs")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation["state"] != "scheduled", invocation
# Expect two jobs - the upload and first cat. randomlines shouldn't run
# it is implicitly dependent on second cat.
self._assert_history_job_count(history_id, 2)
self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
self._assert_history_job_count(history_id, 4)
[docs] def test_run_with_optional_data_specified_to_multi_data(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
WORKFLOW_OPTIONAL_TRUE_INPUT_DATA,
test_data="""
input1:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "CCDS989.1_cds_0_0_chr1_147962193_r" in content
[docs] def test_run_with_optional_data_unspecified_to_multi_data(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_OPTIONAL_TRUE_INPUT_DATA, test_data={}, history_id=history_id, wait=True, assert_ok=True
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "No input selected" in content
[docs] def test_run_with_optional_data_unspecified_survives_delayed_step(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
WORKFLOW_OPTIONAL_INPUT_DELAYED_SCHEDULING,
history_id=history_id,
wait=True,
assert_ok=True,
)
[docs] def test_run_subworkflow_with_optional_data_unspecified(self):
with self.dataset_populator.test_history() as history_id:
subworkflow = yaml.safe_load(
"""
class: GalaxyWorkflow
inputs:
required: data
steps:
nested_workflow:
in:
required: required
test_data:
required:
value: 1.bed
type: File
"""
)
subworkflow["steps"]["nested_workflow"]["run"] = yaml.safe_load(WORKFLOW_OPTIONAL_INPUT_DELAYED_SCHEDULING)
self._run_workflow(
subworkflow,
history_id=history_id,
wait=True,
assert_ok=True,
)
[docs] def test_run_with_non_optional_data_unspecified_fails_invocation(self):
with self.dataset_populator.test_history() as history_id:
error = self._run_jobs(
WORKFLOW_OPTIONAL_FALSE_INPUT_DATA,
test_data={},
history_id=history_id,
wait=False,
assert_ok=False,
expected_response=400,
)
self._assert_failed_on_non_optional_input(error, "input1")
[docs] def test_run_with_optional_collection_specified(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION,
test_data="""
input1:
collection_type: paired
name: the_dataset_pair
elements:
- identifier: forward
value: 1.fastq
type: File
- identifier: reverse
value: 1.fastq
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "GAATTGATCAGGACATAGGACAACTGTAGGCACCAT" in content
[docs] def test_run_with_optional_collection_unspecified(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION, test_data={}, history_id=history_id, wait=True, assert_ok=True
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "No input specified." in content
[docs] def test_run_with_non_optional_collection_unspecified_fails_invocation(self):
with self.dataset_populator.test_history() as history_id:
error = self._run_jobs(
WORKFLOW_OPTIONAL_FALSE_INPUT_COLLECTION,
test_data={},
history_id=history_id,
wait=False,
assert_ok=False,
expected_response=400,
)
self._assert_failed_on_non_optional_input(error, "input1")
def _assert_failed_on_non_optional_input(self, error, input_name):
assert "err_msg" in error
err_msg = error["err_msg"]
assert input_name in err_msg
assert "is not optional and no input" in err_msg
[docs] def test_run_with_validated_parameter_connection_optional(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
text_input: text
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text:
$link: text_input
""",
test_data="""
text_input:
value: "abd"
type: raw
""",
history_id=history_id,
wait=True,
round_trip_format_conversion=True,
)
jobs = self._history_jobs(history_id)
assert len(jobs) == 1
[docs] def test_run_with_int_parameter(self):
with self.dataset_populator.test_history() as history_id:
failed = False
try:
self._run_jobs(
WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED,
test_data="""
data_input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
except AssertionError as e:
assert "(int_input) is not optional" in str(e)
failed = True
assert failed
run_response = self._run_workflow(
WORKFLOW_PARAMETER_INPUT_INTEGER_REQUIRED,
test_data="""
data_input:
value: 1.bed
type: File
int_input:
value: 1
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
# self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len(content.splitlines()) == 1, content
invocation = self.workflow_populator.get_invocation(run_response.invocation_id)
assert invocation["input_step_parameters"]["int_input"]["parameter_value"] == 1
run_response = self._run_workflow(
WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL,
test_data="""
data_input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation = self.workflow_populator.get_invocation(run_response.invocation_id)
# Optional step parameter without default value will not be recorded.
assert "int_input" not in invocation["input_step_parameters"]
[docs] def test_run_with_int_parameter_nested(self):
with self.dataset_populator.test_history() as history_id:
workflow = self.workflow_populator.load_workflow_from_resource("test_subworkflow_with_integer_input")
workflow_id = self.workflow_populator.create_workflow(workflow)
hda: dict = self.dataset_populator.new_dataset(history_id, content="1 2 3")
workflow_request = {
"history_id": history_id,
"inputs_by": "name",
"inputs": json.dumps(
{
"input_dataset": {"src": "hda", "id": hda["id"]},
"int_parameter": 1,
}
),
}
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
[docs] def test_run_with_validated_parameter_connection_default_values(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_PARAMETER_INPUT_INTEGER_DEFAULT,
test_data="""
data_input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert len(content.splitlines()) == 3, content
[docs] def test_run_with_default_file_dataset_input(self):
with self.dataset_populator.test_history() as history_id:
run_response = self._run_workflow(
WORKFLOW_WITH_DEFAULT_FILE_DATASET_INPUT,
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(run_response.invocation_id, step_details=True)
assert invocation_details["steps"][0]["outputs"]["output"]["src"] == "hda"
dataset_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset_id=invocation_details["steps"][1]["outputs"]["out_file1"]["id"]
)
assert dataset_details["file_ext"] == "txt"
assert "chr1" in dataset_details["peek"]
[docs] def test_run_with_default_file_dataset_input_and_explicit_input(self):
with self.dataset_populator.test_history() as history_id:
run_response = self._run_workflow(
WORKFLOW_WITH_DEFAULT_FILE_DATASET_INPUT,
test_data="""
default_file_input:
value: 1.fasta
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(run_response.invocation_id, step_details=True)
assert invocation_details["steps"][0]["outputs"]["output"]["src"] == "hda"
dataset_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset_id=invocation_details["steps"][1]["outputs"]["out_file1"]["id"]
)
assert dataset_details["file_ext"] == "txt"
assert (
"gtttgccatcttttgctgctctagggaatccagcagctgtcaccatgtaaacaagcccaggctagaccaGTTACCCTCATCATCTTAGCTGATAGCCAGCCAGCCACCACAGGCA"
in dataset_details["peek"]
)
[docs] def test_run_with_default_file_in_step_inline(self):
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
WORKFLOW_WITH_STEP_DEFAULT_FILE_DATASET_INPUT,
history_id=history_id,
wait=True,
assert_ok=True,
)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "chr1" in content
[docs] def test_run_with_validated_parameter_connection_invalid(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
text_input: text
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text:
$link: text_input
""",
test_data="""
text_input:
value: ""
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=False,
)
[docs] def test_run_with_text_input_connection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
data_input: data
text_input: text
steps:
randomlines:
tool_id: random_lines1
state:
num_lines: 1
input:
$link: data_input
seed_source:
seed_source_selector: set_seed
seed:
$link: text_input
""",
test_data="""
data_input:
value: 1.bed
type: File
text_input:
value: asdf
type: raw
""",
history_id=history_id,
)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
assert "chrX\t152691446\t152691471\tCCDS14735.1_cds_0_0_chrX_152691447_f\t0\t+\n" == content
[docs] def test_run_with_numeric_input_connection(self, history_id):
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
- label: forty_two
tool_id: expression_forty_two
state: {}
- label: consume_expression_parameter
tool_id: cheetah_casting
state:
floattest: 3.14
inttest:
$link: forty_two/out1
test_data: {}
""",
history_id=history_id,
)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_content(history_id)
lines = content.split("\n")
assert len(lines) == 4
str_43 = lines[0]
str_4point14 = lines[2]
assert lines[3] == ""
assert int(str_43) == 43
assert abs(float(str_4point14) - 4.14) < 0.0001
[docs] @skip_without_tool("param_value_from_file")
def test_expression_tool_map_over(self, history_id):
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
text_input1: collection
steps:
- label: param_out
tool_id: param_value_from_file
in:
input1: text_input1
- label: consume_expression_parameter
tool_id: validation_default
in:
input1: param_out/text_param
outputs:
out_file1:
rename: "replaced_param_collection"
test_data:
text_input1:
collection_type: list
elements:
- identifier: A
content: A
- identifier: B
content: B
""",
history_id=history_id,
)
history_contents = self._get(f"histories/{history_id}/contents").json()
collection = [
c
for c in history_contents
if c["history_content_type"] == "dataset_collection" and c["name"] == "replaced_param_collection"
][0]
collection_details = self._get(collection["url"]).json()
assert collection_details["element_count"] == 2
elements = collection_details["elements"]
assert elements[0]["element_identifier"] == "A"
assert elements[1]["element_identifier"] == "B"
element_a_content = self.dataset_populator.get_history_dataset_content(
history_id, dataset=elements[0]["object"]
)
element_b_content = self.dataset_populator.get_history_dataset_content(
history_id, dataset=elements[1]["object"]
)
assert element_a_content.strip() == "A"
assert element_b_content.strip() == "B"
[docs] @skip_without_tool("create_input_collection")
def test_workflow_optional_input_text_parameter_reevaluation(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
text_input:
type: text
optional: true
default: ''
steps:
create_collection:
tool_id: create_input_collection
nested_workflow:
in:
inner_input: create_collection/output
inner_text_input: text_input
run:
class: GalaxyWorkflow
inputs:
inner_input:
type: data_collection_input
inner_text_input:
type: text
optional: true
default: ''
steps:
apply:
tool_id: __APPLY_RULES__
in:
input: inner_input
state:
rules:
rules:
- type: add_column_metadata
value: identifier0
mapping:
- type: list_identifiers
columns: [0]
echo:
cat1:
in:
input1: apply/output
outputs:
out_file1:
rename: "#{inner_text_input} suffix"
""",
history_id=history_id,
)
[docs] @skip_without_tool("cat1")
def test_workflow_rerun_with_use_cached_job(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run")
# We launch a workflow
with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two:
workflow_request, _, workflow_id = self._setup_workflow_run(workflow, history_id=history_id_one)
invocation_id = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request
).json()["id"]
invocation_1 = self.workflow_populator.get_invocation(invocation_id)
# We copy the workflow inputs to a new history
new_workflow_request = workflow_request.copy()
new_ds_map = json.loads(new_workflow_request["ds_map"])
for key, input_values in invocation_1["inputs"].items():
copy_payload = {"content": input_values["id"], "source": "hda", "type": "dataset"}
copy_response = self._post(f"histories/{history_id_two}/contents", data=copy_payload, json=True).json()
new_ds_map[key]["id"] = copy_response["id"]
new_workflow_request["ds_map"] = json.dumps(new_ds_map, sort_keys=True)
new_workflow_request["history"] = f"hist_id={history_id_two}"
new_workflow_request["use_cached_job"] = True
# We run the workflow again, it should not produce any new outputs
new_workflow_response = self.workflow_populator.invoke_workflow_raw(
workflow_id, new_workflow_request, assert_ok=True
).json()
invocation_id = new_workflow_response["id"]
self.workflow_populator.wait_for_invocation_and_jobs(history_id_two, workflow_id, invocation_id)
# get_history_dataset_details defaults to last item in history, so since we've done
# wait_for_invocation_and_jobs - this will be the output of the cat1 job for both histories
# (the only job in the loaded workflow).
first_wf_output_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id_one)
second_wf_output_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id_two)
first_wf_output = self._get(f"datasets/{first_wf_output_hda['id']}").json()
second_wf_output = self._get(f"datasets/{second_wf_output_hda['id']}").json()
assert (
first_wf_output["file_name"] == second_wf_output["file_name"]
), f"first output:\n{first_wf_output}\nsecond output:\n{second_wf_output}"
[docs] @skip_without_tool("cat1")
@skip_without_tool("identifier_multiple")
def test_workflow_rerun_with_cached_job_consumes_implicit_hdca(self, history_id: str):
workflow = """
class: GalaxyWorkflow
inputs:
collection_input:
type: data_collection_input
steps:
map_over:
tool_id: cat1
in:
input1: collection_input
consume_hdca:
tool_id: identifier_multiple
in:
input1: map_over/out_file1
"""
workflow_id = self.workflow_populator.upload_yaml_workflow(name="Consume HDCA", yaml_content=workflow)
hdca1 = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("sample1-1", "1 2 3"), ("sample2-1", "7 8 9")]
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(hdca1)
workflow_request = {
"inputs": json.dumps({"collection_input": self._ds_entry(hdca1)}),
"history": f"hist_id={history_id}",
"use_cached_job": True,
"inputs_by": "name",
}
first_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request
).json()
first_invocation = self.workflow_populator.get_invocation(first_invocation_summary["id"], step_details=True)
final_job_id_first_invocation = first_invocation["steps"][2]["jobs"][0]["id"]
second_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request
).json()
second_invocation = self.workflow_populator.get_invocation(second_invocation_summary["id"], step_details=True)
final_job_id_second_invocation = second_invocation["steps"][2]["jobs"][0]["id"]
final_job = self.dataset_populator.get_job_details(final_job_id_second_invocation, full=True).json()
assert final_job["copied_from_job_id"] == final_job_id_first_invocation
[docs] @skip_without_tool("cat1")
def test_nested_workflow_rerun_with_use_cached_job(self):
with self.dataset_populator.test_history() as history_id_one, self.dataset_populator.test_history() as history_id_two:
test_data = """
outer_input:
value: 1.bed
type: File
"""
run_jobs_summary = self._run_workflow(
WORKFLOW_NESTED_SIMPLE, test_data=test_data, history_id=history_id_one
)
workflow_id = run_jobs_summary.workflow_id
workflow_request = run_jobs_summary.workflow_request
# We copy the inputs to a new history and re-run the workflow
inputs = json.loads(workflow_request["inputs"])
dataset_type = inputs["outer_input"]["src"]
dataset_id = inputs["outer_input"]["id"]
copy_payload = {"content": dataset_id, "source": dataset_type, "type": "dataset"}
copy_response = self._post(f"histories/{history_id_two}/contents", data=copy_payload, json=True)
self._assert_status_code_is(copy_response, 200)
new_dataset_id = copy_response.json()["id"]
inputs["outer_input"]["id"] = new_dataset_id
workflow_request["use_cached_job"] = True
workflow_request["history"] = f"hist_id={history_id_two}"
workflow_request["inputs"] = json.dumps(inputs)
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=run_jobs_summary.workflow_request)
# Now make sure that the HDAs in each history point to the same dataset instances
history_one_contents = self.__history_contents(history_id_one)
history_two_contents = self.__history_contents(history_id_two)
assert len(history_one_contents) == len(history_two_contents)
for i, (item_one, item_two) in enumerate(zip(history_one_contents, history_two_contents)):
assert (
item_one["dataset_id"] == item_two["dataset_id"]
), 'Dataset ids should match, but "{}" and "{}" are not the same for History item {}.'.format(
item_one["dataset_id"], item_two["dataset_id"], i + 1
)
[docs] def test_cannot_run_inaccessible_workflow(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run_cannot_access")
workflow_request, _, workflow_id = self._setup_workflow_run(workflow)
with self._different_user():
run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 403)
[docs] def test_400_on_invalid_workflow_id(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run_does_not_exist")
workflow_request, _, _ = self._setup_workflow_run(workflow)
run_workflow_response = self._post(f"workflows/{self._random_key()}/invocations", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 400)
[docs] def test_cannot_run_against_other_users_history(self):
workflow = self.workflow_populator.load_workflow(name="test_for_run_does_not_exist")
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow)
with self._different_user():
other_history_id = self.dataset_populator.new_history()
workflow_request["history"] = f"hist_id={other_history_id}"
run_workflow_response = self._post(f"workflows/{workflow_id}/invocations", data=workflow_request)
self._assert_status_code_is(run_workflow_response, 403)
[docs] def test_cannot_run_bootstrap_admin_workflow(self):
workflow = self.workflow_populator.load_workflow(name="test_bootstrap_admin_cannot_run")
workflow_request, *_ = self._setup_workflow_run(workflow)
run_workflow_response = self._post("workflows", data=workflow_request, key=self.master_api_key, json=True)
self._assert_status_code_is(run_workflow_response, 400)
[docs] @skip_without_tool("cat")
@skip_without_tool("cat_list")
def test_workflow_run_with_matching_lists(self):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_matching_lists")
workflow_id = self.workflow_populator.create_workflow(workflow)
with self.dataset_populator.test_history() as history_id:
hdca1 = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("sample1-1", "1 2 3"), ("sample2-1", "7 8 9")]
).json()
hdca2 = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("sample1-2", "4 5 6"), ("sample2-2", "0 a b")]
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(hdca1)
hdca2 = self.dataset_collection_populator.wait_for_fetched_collection(hdca2)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
label_map = {"list1": self._ds_entry(hdca1), "list2": self._ds_entry(hdca2)}
workflow_request = dict(
ds_map=self.workflow_populator.build_ds_map(workflow_id, label_map),
)
self.workflow_populator.invoke_workflow_and_wait(
workflow_id, history_id=history_id, request=workflow_request
)
assert "1 2 3\n4 5 6\n7 8 9\n0 a b\n" == self.dataset_populator.get_history_dataset_content(history_id)
[docs] def test_workflow_stability(self):
# Run this index stability test with following command:
# ./run_tests.sh test/api/test_workflows.py:TestWorkflowsApi.test_workflow_stability
num_tests = 1
for workflow_file in ["test_workflow_topoambigouity", "test_workflow_topoambigouity_auto_laidout"]:
workflow = self.workflow_populator.load_workflow_from_resource(workflow_file)
last_step_map = self._step_map(workflow)
for _ in range(num_tests):
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(uploaded_workflow_id)
step_map = self._step_map(downloaded_workflow)
assert step_map == last_step_map
last_step_map = step_map
def _step_map(self, workflow):
# Build dict mapping 'tep index to input name.
step_map = {}
for step_index, step in workflow["steps"].items():
if step["type"] == "data_input":
step_map[step_index] = step["inputs"][0]["name"]
return step_map
[docs] def test_empty_create(self):
response = self._post("workflows")
self._assert_status_code_is(response, 400)
self._assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_MISSING_PARAMETER"])
[docs] def test_invalid_create_multiple_types(self):
data = {"shared_workflow_id": "1234567890abcdef", "from_history_id": "1234567890abcdef"}
response = self._post("workflows", data)
self._assert_status_code_is(response, 400)
self._assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_INVALID_PARAMETER"])
[docs] @skip_without_tool("cat1")
def test_run_with_pja(self):
workflow = self.workflow_populator.load_workflow(name="test_for_pja_run", add_pja=True)
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow, inputs_by="step_index")
workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced"))
run_workflow_response = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
)
invocation_id = run_workflow_response.json()["id"]
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=True)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
assert content["name"] == "foo was replaced"
[docs] @skip_without_tool("output_filter")
def test_optional_workflow_output(self):
with self.dataset_populator.test_history() as history_id:
run_object = self._run_workflow(
"""
class: GalaxyWorkflow
inputs: []
outputs:
wf_output_1:
outputSource: output_filter/out_1
steps:
output_filter:
tool_id: output_filter
state:
produce_out_1: False
filter_text_1: '1'
produce_collection: False
""",
test_data={},
history_id=history_id,
wait=False,
)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id, run_object.workflow_id, run_object.invocation_id
)
contents = self.__history_contents(history_id)
assert len(contents) == 1
okay_dataset = contents[0]
assert okay_dataset["state"] == "ok"
[docs] @skip_without_tool("output_filter_with_input_optional")
def test_workflow_optional_input_filtering(self):
with self.dataset_populator.test_history() as history_id:
test_data = """
input1:
collection_type: list
elements:
- identifier: A
content: A
"""
run_object = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
outputs:
wf_output_1:
outputSource: output_filter/out_1
steps:
output_filter:
tool_id: output_filter_with_input_optional
in:
input_1: input1
""",
test_data=test_data,
history_id=history_id,
wait=False,
)
self.workflow_populator.wait_for_invocation_and_jobs(
history_id, run_object.workflow_id, run_object.invocation_id
)
contents = self.__history_contents(history_id)
assert len(contents) == 4
for content in contents:
if content["history_content_type"] == "dataset":
assert content["state"] == "ok"
else:
print(content)
assert content["populated_state"] == "ok"
[docs] @skip_without_tool("cat")
def test_run_rename_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
rename: "my new name"
""",
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, hid=4, wait=True, assert_ok=True)
name = content["name"]
assert name == "my new name", name
assert content["history_content_type"] == "dataset"
content = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
name = content["name"]
assert content["history_content_type"] == "dataset_collection", content
assert name == "my new name", name
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_inputs_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
rename: "#{input1} suffix"
""",
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
name = content["name"]
assert content["history_content_type"] == "dataset_collection", content
assert name == "the_dataset_list suffix", name
[docs] @skip_without_tool("collection_creates_pair")
def test_run_rename_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: collection_creates_pair
in:
input1: input1
outputs:
paired_output:
rename: "my new name"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=4, wait=True, assert_ok=True
)
assert details1["elements"][0]["object"]["visible"] is False
assert details1["name"] == "my new name", details1
assert details1["history_content_type"] == "dataset_collection"
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_hide_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
state:
datasets:
- id_cond:
id_select: id
outputs:
output:
hide: true
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert details1["elements"][0]["object"]["visible"] is False
assert details1["name"] == "data 1 (as list)", details1
assert details1["visible"] is False
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_delete_intermediate_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
state:
datasets:
- id_cond:
id_select: id
outputs:
output:
delete_intermediate_datasets: true
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert details1["elements"][0]["object"]["visible"] is False
assert details1["name"] == "data 1 (as list)", details1
# FIXME: this doesn't work because the workflow is still being scheduled
# TODO: Implement a way to run PJAs that couldn't be run during/after the job
# after the workflow has run to completion
assert details1["deleted"] is False
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_change_datatype_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
state:
datasets:
- id_cond:
id_select: idx
outputs:
output:
change_datatype: txt
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
state:
datasets:
- id_cond:
id_select: idx
""",
test_data="""
input1:
value: 1.fasta
type: File
file_type: fasta
name: fasta1
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert details1["name"] == "data 1 (as list)", details1
assert details1["elements"][0]["object"]["visible"] is False
assert details1["elements"][0]["object"]["file_ext"] == "txt"
details2 = self.dataset_populator.get_history_collection_details(
history_id, hid=5, wait=True, assert_ok=True
)
# Also check that we don't overwrite the original HDA's datatype
assert details2["elements"][0]["object"]["file_ext"] == "fasta"
[docs] @skip_without_tool("__EXTRACT_DATASET__")
def test_run_build_list_change_datatype_new_metadata_file_parameter(self):
# Regression test for changing datatype to a datatype with a MetadataFileParameter
with self.dataset_populator.test_history() as history_id:
self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
build_list:
tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
extract_dataset:
tool_id: __EXTRACT_DATASET__
in:
input: build_list/output
outputs:
output:
change_datatype: vcf_bgzip
""",
test_data="""
input1:
value: test.vcf.gz
type: File
file_type: vcf_bgzip
""",
history_id=history_id,
assert_ok=True,
wait=True,
)
[docs] @skip_without_tool("__BUILD_LIST__")
def test_run_build_list_rename_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
- tool_id: __BUILD_LIST__
in:
datasets_0|input: input1
state:
datasets:
- id_cond:
id_select: idx
outputs:
output:
rename: "my new name"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert details1["elements"][0]["object"]["visible"] is False
assert details1["name"] == "my new name", details1
assert details1["history_content_type"] == "dataset_collection"
[docs] @skip_without_tool("create_2")
def test_run_rename_multiple_outputs(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs: []
steps:
create_2:
tool_id: create_2
state:
sleep_time: 0
outputs:
out_file1:
rename: "my new name"
out_file2:
rename: "my other new name"
""",
test_data={},
history_id=history_id,
)
details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=True)
details2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert details1["name"] == "my new name"
assert details2["name"] == "my other new name"
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_input(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(WORKFLOW_RENAME_ON_INPUT, history_id=history_id)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fasta1 suffix", name
[docs] @skip_without_tool("fail_identifier")
@skip_without_tool("cat")
def test_run_rename_when_resuming_jobs(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_fail:
tool_id: fail_identifier
state:
failbool: true
input1:
$link: input1
outputs:
out_file1:
rename: "cat1 out"
cat:
tool_id: cat
in:
input1: first_fail/out_file1
outputs:
out_file1:
rename: "#{input1} suffix"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fail
""",
history_id=history_id,
wait=True,
assert_ok=False,
)
content = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=False)
name = content["name"]
assert content["state"] == "error", content
input1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False)
job_id = content["creating_job"]
inputs = {
"input1": {"values": [{"src": "hda", "id": input1["id"]}]},
"failbool": "false",
"rerun_remap_job_id": job_id,
}
self.dataset_populator.run_tool(
tool_id="fail_identifier",
inputs=inputs,
history_id=history_id,
)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(
history_id, wait=True, assert_ok=False
)
assert unpaused_dataset["state"] == "ok"
assert unpaused_dataset["name"] == f"{name} suffix"
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_input_recursive(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
rename: "#{input1} #{input1 | upper} suffix"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: '#{input1}'
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "#{input1} #{INPUT1} suffix", name
[docs] @skip_without_tool("cat")
def test_run_rename_based_on_input_repeat(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
input2: data
steps:
first_cat:
tool_id: cat
state:
input1:
$link: input1
queries:
- input2:
$link: input2
outputs:
out_file1:
rename: "#{queries_0.input2| basename} suffix"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
input2:
value: 1.fasta
type: File
name: fasta2
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fasta2 suffix", name
[docs] @skip_without_tool("mapper2")
def test_run_rename_based_on_input_conditional(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
fasta_input: data
fastq_input: data
steps:
mapping:
tool_id: mapper2
state:
fastq_input:
fastq_input_selector: single
fastq_input1:
$link: fastq_input
reference:
$link: fasta_input
outputs:
out_file1:
rename: "#{fastq_input.fastq_input1 | basename} suffix"
""",
test_data="""
fasta_input:
value: 1.fasta
type: File
name: fasta1
file_type: fasta
fastq_input:
value: 1.fastqsanger
type: File
name: fastq1
file_type: fastqsanger
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fastq1 suffix", name
[docs] @skip_without_tool("mapper2")
def test_run_rename_based_on_input_conditional_legacy_pja_reference(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
fasta_input: data
fastq_input: data
steps:
mapping:
tool_id: mapper2
state:
fastq_input:
fastq_input_selector: single
fastq_input1:
$link: fastq_input
reference:
$link: fasta_input
outputs:
out_file1:
# The fully prefixed variant test in "test_run_rename_based_on_input_conditional" should be preferred,
# but we don't want to break old workflow renaming actions
rename: "#{fastq_input1 | basename} suffix"
""",
test_data="""
fasta_input:
value: 1.fasta
type: File
name: fasta1
file_type: fasta
fastq_input:
value: 1.fastqsanger
type: File
name: fastq1
file_type: fastqsanger
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "fastq1 suffix", name
[docs] @skip_without_tool("mapper2")
def test_run_rename_based_on_input_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
fasta_input: data
fastq_inputs: data
steps:
mapping:
tool_id: mapper2
state:
fastq_input:
fastq_input_selector: paired_collection
fastq_input1:
$link: fastq_inputs
reference:
$link: fasta_input
outputs:
out_file1:
rename: "#{fastq_input.fastq_input1 | basename} suffix"
""",
test_data="""
fasta_input:
value: 1.fasta
type: File
name: fasta1
file_type: fasta
fastq_inputs:
collection_type: list
name: the_dataset_pair
elements:
- identifier: forward
value: 1.fastq
type: File
- identifier: reverse
value: 1.fastq
type: File
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
name = content["name"]
assert name == "the_dataset_pair suffix", name
[docs] @skip_without_tool("collection_creates_pair")
def test_run_hide_on_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
create_pair:
tool_id: collection_creates_pair
state:
input1:
$link: input1
outputs:
paired_output:
hide: true
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=4, wait=True, assert_ok=True
)
assert details1["history_content_type"] == "dataset_collection"
assert not details1["visible"], details1
[docs] @skip_without_tool("cat")
def test_run_hide_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
- id: input1
type: data_collection_input
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
hide: true
""",
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""",
history_id=history_id,
)
content = self.dataset_populator.get_history_dataset_details(history_id, hid=4, wait=True, assert_ok=True)
assert content["history_content_type"] == "dataset"
assert not content["visible"]
content = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert content["history_content_type"] == "dataset_collection", content
assert not content["visible"]
[docs] @skip_without_tool("cat")
def test_tag_auto_propagation(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
add_tags:
- "name:treated1fb"
- "group:condition:treated"
- "group:type:single-read"
- "machine:illumina"
second_cat:
tool_id: cat
in:
input1: first_cat/out_file1
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
round_trip_format_conversion=True,
)
details0 = self.dataset_populator.get_history_dataset_details(history_id, hid=2, wait=True, assert_ok=True)
tags = details0["tags"]
assert len(tags) == 4, details0
assert "name:treated1fb" in tags, tags
assert "group:condition:treated" in tags, tags
assert "group:type:single-read" in tags, tags
assert "machine:illumina" in tags, tags
details1 = self.dataset_populator.get_history_dataset_details(history_id, hid=3, wait=True, assert_ok=True)
tags = details1["tags"]
assert len(tags) == 1, details1
assert "name:treated1fb" in tags, tags
[docs] @skip_without_tool("collection_creates_pair")
def test_run_add_tag_on_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
create_pair:
tool_id: collection_creates_pair
in:
input1: input1
outputs:
paired_output:
add_tags:
- "name:foo"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
round_trip_format_conversion=True,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=4, wait=True, assert_ok=True
)
assert details1["history_content_type"] == "dataset_collection"
assert details1["tags"][0] == "name:foo", details1
[docs] @skip_without_tool("cat")
def test_run_add_tag_on_mapped_over_collection(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1:
type: collection
collection_type: list
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
add_tags:
- "name:foo"
""",
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""",
history_id=history_id,
round_trip_format_conversion=True,
)
details1 = self.dataset_populator.get_history_collection_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert details1["history_content_type"] == "dataset_collection"
assert details1["tags"][0] == "name:foo", details1
[docs] @skip_without_tool("collection_creates_pair")
@skip_without_tool("cat")
def test_run_remove_tag_on_collection_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
steps:
first_cat:
tool_id: cat
in:
input1: input1
outputs:
out_file1:
add_tags:
- "name:foo"
create_pair:
tool_id: collection_creates_pair
in:
input1: first_cat/out_file1
outputs:
paired_output:
remove_tags:
- "name:foo"
""",
test_data="""
input1:
value: 1.fasta
type: File
name: fasta1
""",
history_id=history_id,
round_trip_format_conversion=True,
)
details_dataset_with_tag = self.dataset_populator.get_history_dataset_details(
history_id, hid=2, wait=True, assert_ok=True
)
assert details_dataset_with_tag["history_content_type"] == "dataset", details_dataset_with_tag
assert details_dataset_with_tag["tags"][0] == "name:foo", details_dataset_with_tag
details_collection_without_tag = self.dataset_populator.get_history_collection_details(
history_id, hid=5, wait=True, assert_ok=True
)
assert (
details_collection_without_tag["history_content_type"] == "dataset_collection"
), details_collection_without_tag
assert len(details_collection_without_tag["tags"]) == 0, details_collection_without_tag
[docs] @skip_without_tool("collection_creates_pair")
@skip_without_tool("cat")
def test_run_add_tag_on_database_operation_output(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data_collection
steps:
extrat:
tool_id: __EXTRACT_DATASET__
in:
input: input1
outputs:
output:
add_tags:
- "name:foo"
""",
test_data="""
input1:
collection_type: list
name: the_dataset_list
elements:
- identifier: el1
value: 1.fastq
type: File
""",
history_id=history_id,
round_trip_format_conversion=True,
)
details_dataset_with_tag = self.dataset_populator.get_history_dataset_details(
history_id, hid=3, wait=True, assert_ok=True
)
assert details_dataset_with_tag["history_content_type"] == "dataset", details_dataset_with_tag
assert details_dataset_with_tag["tags"][0] == "name:foo", details_dataset_with_tag
[docs] @skip_without_tool("cat1")
def test_run_with_runtime_pja(self):
workflow = self.workflow_populator.load_workflow(name="test_for_pja_runtime")
uuid0, uuid1, uuid2 = str(uuid4()), str(uuid4()), str(uuid4())
workflow["steps"]["0"]["uuid"] = uuid0
workflow["steps"]["1"]["uuid"] = uuid1
workflow["steps"]["2"]["uuid"] = uuid2
workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow, inputs_by="step_index")
workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced"))
pja_map = {
"RenameDatasetActionout_file1": dict(
action_type="RenameDatasetAction",
output_name="out_file1",
action_arguments=dict(newname="foo ${replaceme}"),
)
}
workflow_request["parameters"] = dumps({uuid2: {"__POST_JOB_ACTIONS__": pja_map}})
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True)
assert content["name"] == "foo was replaced", content["name"]
# Test for regression of previous behavior where runtime post job actions
# would be added to the original workflow post job actions.
downloaded_workflow = self._download_workflow(workflow_id)
pjas = list(downloaded_workflow["steps"]["2"]["post_job_actions"].values())
assert len(pjas) == 0, len(pjas)
[docs] @skip_without_tool("cat1")
def test_run_with_delayed_runtime_pja(self):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
test_input: data
steps:
first_cat:
tool_id: cat1
in:
input1: test_input
the_pause:
type: pause
in:
input: first_cat/out_file1
second_cat:
tool_id: cat1
in:
input1: the_pause
""",
round_trip_format_conversion=True,
)
downloaded_workflow = self._download_workflow(workflow_id)
uuid_dict = {int(index): step["uuid"] for index, step in downloaded_workflow["steps"].items()}
with self.dataset_populator.test_history() as history_id:
hda = self.dataset_populator.new_dataset(history_id, content="1 2 3")
self.dataset_populator.wait_for_history(history_id)
inputs = {
"0": self._ds_entry(hda),
}
uuid2 = uuid_dict[3]
workflow_request = {}
workflow_request["replacement_params"] = dumps(dict(replaceme="was replaced"))
pja_map = {
"RenameDatasetActionout_file1": dict(
action_type="RenameDatasetAction",
output_name="out_file1",
action_arguments=dict(newname="foo ${replaceme}"),
)
}
workflow_request["parameters"] = dumps({uuid2: {"__POST_JOB_ACTIONS__": pja_map}})
invocation_id = self.__invoke_workflow(
workflow_id, inputs=inputs, request=workflow_request, history_id=history_id
)
time.sleep(2)
self.dataset_populator.wait_for_history(history_id)
self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True)
self.workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id)
time.sleep(1)
content = self.dataset_populator.get_history_dataset_details(history_id)
assert content["name"] == "foo was replaced", content["name"]
[docs] @skip_without_tool("cat1")
def test_delete_intermediate_datasets_pja_1(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: third_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
second_cat:
tool_id: cat1
in:
input1: first_cat/out_file1
third_cat:
tool_id: cat1
in:
input1: second_cat/out_file1
outputs:
out_file1:
delete_intermediate_datasets: true
""",
test_data={"input1": "hello world"},
history_id=history_id,
)
hda1 = self.dataset_populator.get_history_dataset_details(history_id, hid=1)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
hda3 = self.dataset_populator.get_history_dataset_details(history_id, hid=3)
hda4 = self.dataset_populator.get_history_dataset_details(history_id, hid=4)
assert not hda1["deleted"]
assert hda2["deleted"]
# I think hda3 should be deleted, but the inputs to
# steps with workflow outputs are not deleted.
# assert hda3["deleted"]
print(hda3["deleted"])
assert not hda4["deleted"]
[docs] @skip_without_tool("cat1")
def test_validated_post_job_action_validated(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
post_job_actions:
ValidateOutputsAction:
action_type: ValidateOutputsAction
""",
test_data={"input1": {"type": "File", "file_type": "fastqsanger", "value": "1.fastqsanger"}},
history_id=history_id,
)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert hda2["validated_state"] == "ok"
[docs] @skip_without_tool("cat1")
def test_validated_post_job_action_unvalidated_default(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_SIMPLE,
test_data={"input1": {"type": "File", "file_type": "fastqsanger", "value": "1.fastqsanger"}},
history_id=history_id,
)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert hda2["validated_state"] == UNKNOWN
[docs] @skip_without_tool("cat1")
def test_validated_post_job_action_invalid(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
post_job_actions:
ValidateOutputsAction:
action_type: ValidateOutputsAction
""",
test_data={"input1": {"type": "File", "file_type": "fastqcssanger", "value": "1.fastqsanger"}},
history_id=history_id,
)
hda2 = self.dataset_populator.get_history_dataset_details(history_id, hid=2)
assert hda2["validated_state"] == "invalid"
[docs] def test_value_restriction_with_select_and_text_param(self):
workflow_id = self.workflow_populator.upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
select_text:
type: text
restrictOnConnections: true
steps:
select:
tool_id: multi_select
in:
select_ex: select_text
tool_with_text_input:
tool_id: param_text_option
in:
text_param: select_text
"""
)
with self.dataset_populator.test_history() as history_id:
run_workflow = self._download_workflow(workflow_id, style="run", history_id=history_id)
options = run_workflow["steps"][0]["inputs"][0]["options"]
assert len(options) == 5
assert options[0] == ["Ex1", "--ex1", False]
[docs] def test_value_restriction_with_select_from_subworkflow_input(self):
workflow_id = self.workflow_populator.upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
Outer input parameter:
optional: false
restrictOnConnections: true
type: string
steps:
- in:
inner input parameter:
source: Outer input parameter
run:
class: GalaxyWorkflow
label: Restriction from subworkflow param
inputs:
inner input parameter:
optional: false
restrictOnConnections: true
type: string
steps:
- tool_id: multi_select
in:
select_ex:
source: inner input parameter
"""
)
with self.dataset_populator.test_history() as history_id:
run_workflow = self._download_workflow(workflow_id, style="run", history_id=history_id)
options = run_workflow["steps"][0]["inputs"][0]["options"]
assert len(options) == 5
assert options[0] == ["Ex1", "--ex1", False]
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_by_tool(self):
workflow_request, history_id, workflow_id = self._setup_random_x2_workflow("test_for_replace_tool_params")
workflow_request["parameters"] = dumps(dict(random_lines1=dict(num_lines=5)))
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
# Would be 8 and 6 without modification
self.__assert_lines_hid_line_count_is(history_id, 2, 5)
self.__assert_lines_hid_line_count_is(history_id, 3, 5)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_by_uuid(self):
workflow_request, history_id, workflow_id = self._setup_random_x2_workflow("test_for_replace_")
workflow_request["parameters"] = dumps(
{
"58dffcc9-bcb7-4117-a0e1-61513524b3b1": dict(num_lines=4),
"58dffcc9-bcb7-4117-a0e1-61513524b3b2": dict(num_lines=3),
}
)
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
# Would be 8 and 6 without modification
self.__assert_lines_hid_line_count_is(history_id, 2, 4)
self.__assert_lines_hid_line_count_is(history_id, 3, 3)
[docs] @skip_without_tool("cat1")
@skip_without_tool("addValue")
def test_run_batch(self):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_batch")
workflow_id = self.workflow_populator.create_workflow(workflow)
with self.dataset_populator.test_history() as history_id:
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True)
hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6", wait=True)
hda3 = self.dataset_populator.new_dataset(history_id, content="7 8 9", wait=True)
hda4 = self.dataset_populator.new_dataset(history_id, content="10 11 12", wait=True)
parameters = {
"0": {
"input": {
"batch": True,
"values": [
{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"},
{"id": hda2.get("id"), "hid": hda2.get("hid"), "src": "hda"},
{"id": hda3.get("id"), "hid": hda2.get("hid"), "src": "hda"},
{"id": hda4.get("id"), "hid": hda2.get("hid"), "src": "hda"},
],
}
},
"1": {
"input": {"batch": False, "values": [{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}]},
"exp": "2",
},
}
workflow_request = {
"history_id": history_id,
"batch": True,
"parameters_normalized": True,
"parameters": dumps(parameters),
}
invocation_response = self._post(f"workflows/{workflow_id}/usage", data=workflow_request)
self._assert_status_code_is(invocation_response, 200)
time.sleep(5)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
r1 = "1 2 3\t1\n1 2 3\t2\n"
r2 = "4 5 6\t1\n1 2 3\t2\n"
r3 = "7 8 9\t1\n1 2 3\t2\n"
r4 = "10 11 12\t1\n1 2 3\t2\n"
t1 = self.dataset_populator.get_history_dataset_content(history_id, hid=7)
t2 = self.dataset_populator.get_history_dataset_content(history_id, hid=10)
t3 = self.dataset_populator.get_history_dataset_content(history_id, hid=13)
t4 = self.dataset_populator.get_history_dataset_content(history_id, hid=16)
assert r1 == t1
assert r2 == t2
assert r3 == t3
assert r4 == t4
[docs] @skip_without_tool("cat1")
@skip_without_tool("addValue")
def test_run_batch_inputs(self):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_batch")
workflow_id = self.workflow_populator.create_workflow(workflow)
with self.dataset_populator.test_history() as history_id:
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
hda2 = self.dataset_populator.new_dataset(history_id, content="4 5 6")
hda3 = self.dataset_populator.new_dataset(history_id, content="7 8 9")
hda4 = self.dataset_populator.new_dataset(history_id, content="10 11 12")
inputs = {
"coolinput": {
"batch": True,
"values": [
{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"},
{"id": hda2.get("id"), "hid": hda2.get("hid"), "src": "hda"},
{"id": hda3.get("id"), "hid": hda2.get("hid"), "src": "hda"},
{"id": hda4.get("id"), "hid": hda2.get("hid"), "src": "hda"},
],
}
}
parameters = {
"1": {
"input": {"batch": False, "values": [{"id": hda1.get("id"), "hid": hda1.get("hid"), "src": "hda"}]},
"exp": "2",
}
}
workflow_request = {
"history_id": history_id,
"batch": True,
"inputs": dumps(inputs),
"inputs_by": "name",
"parameters_normalized": True,
"parameters": dumps(parameters),
}
invocation_response = self._post(f"workflows/{workflow_id}/usage", data=workflow_request)
self._assert_status_code_is(invocation_response, 200)
time.sleep(5)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
r1 = "1 2 3\t1\n1 2 3\t2\n"
r2 = "4 5 6\t1\n1 2 3\t2\n"
r3 = "7 8 9\t1\n1 2 3\t2\n"
r4 = "10 11 12\t1\n1 2 3\t2\n"
t1 = self.dataset_populator.get_history_dataset_content(history_id, hid=7)
t2 = self.dataset_populator.get_history_dataset_content(history_id, hid=10)
t3 = self.dataset_populator.get_history_dataset_content(history_id, hid=13)
t4 = self.dataset_populator.get_history_dataset_content(history_id, hid=16)
assert r1 == t1
assert r2 == t2
assert r3 == t3
assert r4 == t4
[docs] @skip_without_tool("validation_default")
def test_parameter_substitution_sanitization(self):
substitions = dict(input1='" ; echo "moo')
run_workflow_response, history_id = self._run_validation_workflow_with_substitions(substitions)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
assert "__dq__ X echo __dq__moo\n" == self.dataset_populator.get_history_dataset_content(history_id, hid=1)
[docs] @skip_without_tool("validation_repeat")
def test_parameter_substitution_validation_value_errors_0(self):
with self.dataset_populator.test_history() as history_id:
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text: "abd"
"""
)
workflow_request = dict(
history=f"hist_id={history_id}", parameters=dumps(dict(validation_repeat={"r2_0|text": ""}))
)
url = f"workflows/{workflow_id}/invocations"
invocation_response = self._post(url, data=workflow_request)
# Take a valid stat and make it invalid, assert workflow won't run.
self._assert_status_code_is(invocation_response, 400)
[docs] @skip_without_tool("validation_default")
def test_parameter_substitution_validation_value_errors_1(self):
substitions = dict(select_param='" ; echo "moo')
run_workflow_response, history_id = self._run_validation_workflow_with_substitions(substitions)
self._assert_status_code_is(run_workflow_response, 400)
[docs] @skip_without_tool("validation_repeat")
def test_workflow_import_state_validation_1(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
"""
class: GalaxyWorkflow
steps:
validation:
tool_id: validation_repeat
state:
r2:
- text: ""
""",
history_id=history_id,
wait=False,
expected_response=400,
assert_ok=False,
)
def _run_validation_workflow_with_substitions(self, substitions):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_validation_1")
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
history_id = self.dataset_populator.new_history()
workflow_request = dict(
history=f"hist_id={history_id}",
workflow_id=uploaded_workflow_id,
parameters=dumps(dict(validation_default=substitions)),
)
run_workflow_response = self.workflow_populator.invoke_workflow_raw(uploaded_workflow_id, workflow_request)
return run_workflow_response, history_id
[docs] def test_subworkflow_import_order_maintained(self, history_id):
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
outer_input_1:
type: int
default: 1
position:
left: 0
top: 0
outer_input_2:
type: int
default: 2
position:
left: 100
top: 0
steps:
nested_workflow:
in:
inner_input_1: outer_input_1
inner_input_2: outer_input_2
run:
class: GalaxyWorkflow
inputs:
inner_input_1:
type: int
position:
left: 100
top: 0
inner_input_2:
type: int
position:
left: 0
top: 0
steps: []
outputs:
- label: nested_out_1
outputSource: inner_input_1/output
- label: nested_out_2
outputSource: inner_input_2/output
outputs:
- label: out_1
outputSource: nested_workflow/nested_out_1
- label: out_2
outputSource: nested_workflow/nested_out_2
""",
history_id=history_id,
assert_ok=False,
wait=False,
)
self.workflow_populator.wait_for_invocation(summary.workflow_id, summary.invocation_id)
self.workflow_populator.wait_for_history_workflows(
summary.history_id, assert_ok=False, expected_invocation_count=2
)
invocation = self.workflow_populator.get_invocation(summary.invocation_id)
output_values = invocation["output_values"]
assert output_values["out_1"] == 1
assert output_values["out_2"] == 2
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_by_steps(self):
workflow_request, history_id, workflow_id, steps = self._setup_random_x2_workflow_steps(
"test_for_replace_step_params"
)
params = dumps({str(steps[1]["id"]): dict(num_lines=5)})
workflow_request["parameters"] = params
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
# Would be 8 and 6 without modification
self.__assert_lines_hid_line_count_is(history_id, 2, 8)
self.__assert_lines_hid_line_count_is(history_id, 3, 5)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_nested(self):
workflow_request, history_id, workflow_id, steps = self._setup_random_x2_workflow_steps(
"test_for_replace_step_params_nested"
)
seed_source = dict(
seed_source_selector="set_seed",
seed="moo",
)
params = dumps(
{
str(steps[0]["id"]): dict(num_lines=1, seed_source=seed_source),
str(steps[1]["id"]): dict(num_lines=1, seed_source=seed_source),
}
)
workflow_request["parameters"] = params
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
assert "2\n" == self.dataset_populator.get_history_dataset_content(history_id)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_nested_normalized(self):
workflow_request, history_id, workflow_id, steps = self._setup_random_x2_workflow_steps(
"test_for_replace_step_normalized_params_nested"
)
parameters = {
"num_lines": 1,
"seed_source|seed_source_selector": "set_seed",
"seed_source|seed": "moo",
}
params = dumps({str(steps[0]["id"]): parameters, str(steps[1]["id"]): parameters})
workflow_request["parameters"] = params
workflow_request["parameters_normalized"] = False
self.workflow_populator.invoke_workflow_and_wait(workflow_id, request=workflow_request)
assert "2\n" == self.dataset_populator.get_history_dataset_content(history_id)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_over_default(self):
with self.dataset_populator.test_history() as history_id:
self._run_jobs(
WORKFLOW_ONE_STEP_DEFAULT,
test_data="""
step_parameters:
'1':
num_lines: 4
input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=True,
assert_ok=True,
round_trip_format_conversion=True,
)
result = self.dataset_populator.get_history_dataset_content(history_id)
assert result.count("\n") == 4
[docs] @skip_without_tool("random_lines1")
def test_defaults_editor(self):
workflow_id = self._upload_yaml_workflow(WORKFLOW_ONE_STEP_DEFAULT, publish=True)
workflow_object = self._download_workflow(workflow_id, style="editor")
put_response = self._update_workflow(workflow_id, workflow_object)
assert put_response.status_code == 200
[docs] def test_empty_collection_sort(self, history_id):
self._run_workflow(
"""class: GalaxyWorkflow
inputs:
input: collection
filter_file: data
steps:
filter_collection:
tool_id: __FILTER_FROM_FILE__
in:
input: input
how|filter_source: filter_file
sort_collection_1:
tool_id: __SORTLIST__
in:
input: filter_collection/output_filtered
sort_collection_2:
tool_id: __SORTLIST__
in:
input: filter_collection/output_discarded
merge_collection:
tool_id: __MERGE_COLLECTION__
in:
inputs_0|input: sort_collection_1/output
inputs_1|input: sort_collection_2/output
test_data:
input:
collection_type: list
elements:
- identifier: i1
content: "0"
filter_file: i1
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
[docs] @skip_without_tool("random_lines1")
def test_run_replace_params_over_default_delayed(self):
with self.dataset_populator.test_history() as history_id:
run_summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input: data
steps:
first_cat:
tool_id: cat1
in:
input1: input
the_pause:
type: pause
in:
input: first_cat/out_file1
randomlines:
tool_id: random_lines1
in:
input: the_pause
num_lines:
default: 6
""",
test_data="""
step_parameters:
'3':
num_lines: 4
input:
value: 1.bed
type: File
""",
history_id=history_id,
wait=False,
)
wait_on(lambda: len(self._history_jobs(history_id)) >= 2 or None, "history jobs")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
workflow_id = run_summary.workflow_id
invocation_id = run_summary.invocation_id
self.__review_paused_steps(workflow_id, invocation_id, order_index=2, action=True)
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
result = self.dataset_populator.get_history_dataset_content(history_id)
assert result.count("\n") == 4
[docs] def test_pja_import_export(self):
workflow = self.workflow_populator.load_workflow(name="test_for_pja_import", add_pja=True)
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
downloaded_workflow = self._download_workflow(uploaded_workflow_id)
self._assert_has_keys(downloaded_workflow["steps"], "0", "1", "2")
pjas = list(downloaded_workflow["steps"]["2"]["post_job_actions"].values())
assert len(pjas) == 1, len(pjas)
pja = pjas[0]
self._assert_has_keys(pja, "action_type", "output_name", "action_arguments")
[docs] def test_invocation_filtering(self):
with self._different_user(email=f"{uuid4()}@test.com"):
history_id = self.dataset_populator.new_history()
# new user, start with no invocations
assert not self._assert_invocation_for_url_is("invocations")
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input:
type: data
optional: true
steps: []
""",
history_id=history_id,
wait=False,
)
first_invocation = self._assert_invocation_for_url_is("invocations")
new_history_id = self.dataset_populator.new_history()
# new history has no invocations
assert not self._assert_invocation_for_url_is(f"invocations?history_id={new_history_id}")
self._run_jobs(
"""
class: GalaxyWorkflow
inputs:
input:
type: data
optional: true
steps: []
""",
history_id=new_history_id,
wait=False,
)
# new history has one invocation now
new_invocation = self._assert_invocation_for_url_is(f"invocations?history_id={new_history_id}")
# filter invocation by workflow instance id
self._assert_invocation_for_url_is(
f"invocations?workflow_id={first_invocation['workflow_id']}&instance=true", first_invocation
)
# limit to 1, newest invocation first by default
self._assert_invocation_for_url_is("invocations?limit=1", target_invocation=new_invocation)
# limit to 1, descending sort on date
self._assert_invocation_for_url_is(
"invocations?limit=1&sort_by=create_time&sort_desc=true", target_invocation=new_invocation
)
# limit to 1, ascending sort on date
self._assert_invocation_for_url_is(
"invocations?limit=1&sort_by=create_time&sort_desc=false", target_invocation=first_invocation
)
# limit to 1, ascending sort on date, offset 1
self._assert_invocation_for_url_is(
"invocations?limit=1&sort_by=create_time&sort_desc=false&offset=1", target_invocation=new_invocation
)
def _assert_invocation_for_url_is(self, route, target_invocation=None):
response = self._get(route)
self._assert_status_code_is(response, 200)
invocations = response.json()
if target_invocation:
assert len(invocations) == 1
assert invocations[0]["id"] == target_invocation["id"]
if invocations:
assert len(invocations) == 1
return invocations[0]
[docs] @skip_without_tool("cat1")
def test_only_own_invocations_indexed_and_accessible(self):
workflow_id, usage = self._run_workflow_once_get_invocation("test_usage_accessiblity")
with self._different_user():
usage_details_response = self._get(f"workflows/{workflow_id}/usage/{usage['id']}")
self._assert_status_code_is(usage_details_response, 403)
index_response = self._get(f"workflows/{workflow_id}/invocations")
self._assert_status_code_is(index_response, 200)
assert len(index_response.json()) == 0
invocation_ids = self._all_user_invocation_ids()
assert usage["id"] in invocation_ids
with self._different_user():
invocation_ids = self._all_user_invocation_ids()
assert usage["id"] not in invocation_ids
[docs] @skip_without_tool("cat1")
def test_invocation_usage(self):
workflow_id, usage = self._run_workflow_once_get_invocation("test_usage")
invocation_id = usage["id"]
usage_details = self._invocation_details(workflow_id, invocation_id)
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id", "history_id")
# Check invocations for this workflow invocation by history and regardless of history.
history_invocations_response = self._get("invocations", {"history_id": usage_details["history_id"]})
self._assert_status_code_is(history_invocations_response, 200)
assert len(history_invocations_response.json()) == 1
assert history_invocations_response.json()[0]["id"] == invocation_id
# Check history invocations for this workflow invocation.
invocation_ids = self._all_user_invocation_ids()
assert invocation_id in invocation_ids
# Wait for the invocation to be fully scheduled, so we have details on all steps.
self._wait_for_invocation_state(workflow_id, invocation_id, "scheduled")
usage_details = self._invocation_details(workflow_id, invocation_id)
invocation_steps = usage_details["steps"]
invocation_input_step, invocation_tool_step = {}, {}
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
order_index = invocation_step["order_index"]
assert order_index in [0, 1, 2], order_index
if order_index == 0:
invocation_input_step = invocation_step
elif order_index == 2:
invocation_tool_step = invocation_step
# Tool steps have non-null job_ids (deprecated though they may be)
assert invocation_input_step.get("job_id", None) is None
job_id = invocation_tool_step.get("job_id", None)
assert job_id is not None
invocation_tool_step_id = invocation_tool_step["id"]
invocation_tool_step_response = self._get(
f"workflows/{workflow_id}/invocations/{invocation_id}/steps/{invocation_tool_step_id}"
)
self._assert_status_code_is(invocation_tool_step_response, 200)
self._assert_has_keys(invocation_tool_step_response.json(), "id", "order_index", "job_id")
assert invocation_tool_step_response.json()["job_id"] == job_id
[docs] def test_invocation_with_collection_mapping(self):
workflow_id, invocation_id = self._run_mapping_workflow()
usage_details = self._invocation_details(workflow_id, invocation_id)
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
invocation_steps = usage_details["steps"]
for step_index, invocation_step in enumerate(invocation_steps):
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
assert step_index == invocation_step["order_index"]
invocation_input_step = invocation_steps[0]
invocation_tool_step = invocation_steps[1]
# Tool steps have non-null job_ids (deprecated though they may be)
assert invocation_input_step.get("job_id") is None
assert invocation_tool_step.get("job_id") is None
assert invocation_tool_step["state"] == "scheduled"
usage_details = self._invocation_details(workflow_id, invocation_id, legacy_job_state="true")
# Assert some high-level things about the structure of data returned.
self._assert_has_keys(usage_details, "inputs", "steps", "workflow_id")
invocation_steps = usage_details["steps"]
assert len(invocation_steps) == 3
for invocation_step in invocation_steps:
self._assert_has_keys(invocation_step, "workflow_step_id", "order_index", "id")
assert invocation_steps[1]["state"] == "ok"
def _run_mapping_workflow(self):
history_id = self.dataset_populator.new_history()
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
input_c: collection
steps:
cat1:
tool_id: cat1
in:
input1: input_c
""",
test_data="""
input_c:
collection_type: list
elements:
- identifier: i1
content: "0"
- identifier: i2
content: "1"
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
workflow_id = summary.workflow_id
invocation_id = summary.invocation_id
return workflow_id, invocation_id
[docs] @skip_without_tool("cat1")
def test_invocations_accessible_imported_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True)
with self._different_user():
other_import_response = self.__import_workflow(workflow_id)
self._assert_status_code_is(other_import_response, 200)
other_id = other_import_response.json()["id"]
workflow_request, history_id, _ = self._setup_workflow_run(workflow_id=other_id)
response = self._get(f"workflows/{other_id}/usage")
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
)
run_workflow_dict = run_workflow_response.json()
invocation_id = run_workflow_dict["id"]
usage_details_response = self._get(f"workflows/{other_id}/usage/{invocation_id}")
self._assert_status_code_is(usage_details_response, 200)
[docs] @skip_without_tool("cat1")
def test_invocations_accessible_published_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True)
with self._different_user():
workflow_request, history_id, _ = self._setup_workflow_run(workflow_id=workflow_id)
response = self._get(f"workflows/{workflow_id}/usage")
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
)
run_workflow_dict = run_workflow_response.json()
invocation_id = run_workflow_dict["id"]
usage_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}")
self._assert_status_code_is(usage_details_response, 200)
[docs] @skip_without_tool("cat1")
def test_invocations_not_accessible_by_different_user_for_published_workflow(self):
workflow_id = self.workflow_populator.simple_workflow("test_usage", publish=True)
workflow_request, history_id, _ = self._setup_workflow_run(workflow_id=workflow_id)
response = self._get(f"workflows/{workflow_id}/usage")
self._assert_status_code_is(response, 200)
assert len(response.json()) == 0
run_workflow_response = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
)
run_workflow_dict = run_workflow_response.json()
invocation_id = run_workflow_dict["id"]
with self._different_user():
usage_details_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}")
self._assert_status_code_is(usage_details_response, 403)
[docs] def test_workflow_publishing(self):
workflow_id = self.workflow_populator.simple_workflow("dummy")
response = self._show_workflow(workflow_id)
assert not response["published"]
assert not response["importable"]
published_worklow = self._put(f"workflows/{workflow_id}", data={"published": True}, json=True).json()
assert published_worklow["published"]
importable_worklow = self._put(f"workflows/{workflow_id}", data={"importable": True}, json=True).json()
assert importable_worklow["importable"]
unpublished_worklow = self._put(f"workflows/{workflow_id}", data={"published": False}, json=True).json()
assert not unpublished_worklow["published"]
unimportable_worklow = self._put(f"workflows/{workflow_id}", data={"importable": False}, json=True).json()
assert not unimportable_worklow["importable"]
[docs] def test_workflow_from_path_requires_admin(self):
# There are two ways to import workflows from paths, just verify both require an admin.
workflow_directory = mkdtemp()
try:
workflow_path = os.path.join(workflow_directory, "workflow.yml")
with open(workflow_path, "w") as f:
f.write(WORKFLOW_NESTED_REPLACEMENT_PARAMETER)
import_response = self.workflow_populator.import_workflow_from_path_raw(workflow_path)
self._assert_status_code_is(import_response, 403)
self._assert_error_code_is(import_response, error_codes.error_codes_by_name["ADMIN_REQUIRED"])
path_as_uri = f"file://{workflow_path}"
import_data = dict(archive_source=path_as_uri)
import_response = self._post("workflows", data=import_data)
self._assert_status_code_is(import_response, 403)
self._assert_error_code_is(import_response, error_codes.error_codes_by_name["ADMIN_REQUIRED"])
finally:
shutil.rmtree(workflow_directory)
[docs] def test_cannot_run_workflow_on_immutable_history(self) -> None:
with self.dataset_populator.test_history() as history_id:
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
with self.assertRaisesRegex(AssertionError, "History is immutable"):
self.workflow_populator.run_workflow(
WORKFLOW_INPUTS_AS_OUTPUTS,
test_data={"input1": "hello world", "text_input": {"value": "A text variable", "type": "raw"}},
history_id=history_id,
)
def _invoke_paused_workflow(self, history_id):
workflow = self.workflow_populator.load_workflow_from_resource("test_workflow_pause")
workflow_id = self.workflow_populator.create_workflow(workflow)
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
index_map = {
"0": self._ds_entry(hda1),
}
invocation_id = self.__invoke_workflow(
workflow_id,
history_id=history_id,
inputs=index_map,
)
return workflow_id, invocation_id
def _wait_for_invocation_non_new(self, workflow_id, invocation_id):
target_state_reached = False
for _ in range(50):
invocation = self._invocation_details(workflow_id, invocation_id)
if invocation["state"] != "new":
target_state_reached = True
break
time.sleep(0.25)
return target_state_reached
def _assert_invocation_non_terminal(self, workflow_id, invocation_id):
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation["state"] in ["ready", "new"], invocation
def _wait_for_invocation_state(self, workflow_id, invocation_id, target_state):
target_state_reached = False
for _ in range(25):
invocation = self._invocation_details(workflow_id, invocation_id)
if invocation["state"] == target_state:
target_state_reached = True
break
time.sleep(0.5)
return target_state_reached
def _update_workflow(self, workflow_id, workflow_object):
return self.workflow_populator.update_workflow(workflow_id, workflow_object)
def _invocation_step_details(self, workflow_id, invocation_id, step_id):
invocation_step_response = self._get(f"workflows/{workflow_id}/usage/{invocation_id}/steps/{step_id}")
self._assert_status_code_is(invocation_step_response, 200)
invocation_step_details = invocation_step_response.json()
return invocation_step_details
def _execute_invocation_step_action(self, workflow_id, invocation_id, step_id, action):
raw_url = f"workflows/{workflow_id}/usage/{invocation_id}/steps/{step_id}"
url = self._api_url(raw_url, use_key=True)
payload = dumps(dict(action=action))
action_response = put(url, data=payload)
self._assert_status_code_is(action_response, 200)
invocation_step_details = action_response.json()
return invocation_step_details
def _setup_random_x2_workflow_steps(self, name: str):
workflow_request, history_id, workflow_id = self._setup_random_x2_workflow(name)
random_line_steps = self._random_lines_steps(workflow_request, workflow_id)
return workflow_request, history_id, workflow_id, random_line_steps
def _random_lines_steps(self, workflow_request: dict, workflow_id: str):
workflow_summary_response = self._get(f"workflows/{workflow_id}")
self._assert_status_code_is(workflow_summary_response, 200)
steps = workflow_summary_response.json()["steps"]
return sorted(
(step for step in steps.values() if step["tool_id"] == "random_lines1"), key=lambda step: step["id"]
)
def _setup_random_x2_workflow(self, name: str):
workflow = self.workflow_populator.load_random_x2_workflow(name)
uploaded_workflow_id = self.workflow_populator.create_workflow(workflow)
workflow_inputs = self.workflow_populator.workflow_inputs(uploaded_workflow_id)
key = next(iter(workflow_inputs.keys()))
history_id = self.dataset_populator.new_history()
ten_lines = "\n".join(str(_) for _ in range(10))
hda1 = self.dataset_populator.new_dataset(history_id, content=ten_lines)
workflow_request = dict(
history=f"hist_id={history_id}",
ds_map=dumps(
{
key: self._ds_entry(hda1),
}
),
)
return workflow_request, history_id, uploaded_workflow_id
def __review_paused_steps(self, uploaded_workflow_id, invocation_id, order_index, action=True):
invocation = self._invocation_details(uploaded_workflow_id, invocation_id)
invocation_steps = invocation["steps"]
pause_steps = [s for s in invocation_steps if s["order_index"] == order_index]
for pause_step in pause_steps:
pause_step_id = pause_step["id"]
self._execute_invocation_step_action(uploaded_workflow_id, invocation_id, pause_step_id, action=action)
def __assert_lines_hid_line_count_is(self, history, hid, lines):
contents_url = f"histories/{history}/contents"
history_contents = self.__history_contents(history)
hda_summary = next(hc for hc in history_contents if hc["hid"] == hid)
hda_info_response = self._get(f"{contents_url}/{hda_summary['id']}")
self._assert_status_code_is(hda_info_response, 200)
assert hda_info_response.json()["metadata_data_lines"] == lines
def __history_contents(self, history_id):
contents_url = f"histories/{history_id}/contents"
history_contents_response = self._get(contents_url)
self._assert_status_code_is(history_contents_response, 200)
return history_contents_response.json()
def __invoke_workflow(self, *args, **kwds) -> str:
return self.workflow_populator.invoke_workflow_and_assert_ok(*args, **kwds)
def __import_workflow(self, workflow_id, deprecated_route=False):
if deprecated_route:
route = "workflows/import"
import_data = dict(
workflow_id=workflow_id,
)
else:
route = "workflows"
import_data = dict(
shared_workflow_id=workflow_id,
)
return self._post(route, import_data)
def _show_workflow(self, workflow_id):
show_response = self._get(f"workflows/{workflow_id}")
self._assert_status_code_is(show_response, 200)
return show_response.json()
def _assert_looks_like_instance_workflow_representation(self, workflow):
self._assert_has_keys(workflow, "url", "owner", "inputs", "annotation", "steps")
for step in workflow["steps"].values():
self._assert_has_keys(
step,
"id",
"type",
"tool_id",
"tool_version",
"annotation",
"tool_inputs",
"input_steps",
)
def _all_user_invocation_ids(self):
all_invocations_for_user = self._get("invocations")
self._assert_status_code_is(all_invocations_for_user, 200)
invocation_ids = [i["id"] for i in all_invocations_for_user.json()]
return invocation_ids
[docs]class TestAdminWorkflowsApi(BaseWorkflowsApiTestCase):
require_admin_user = True
[docs] def test_import_export_dynamic_tools(self, history_id):
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
- type: input
label: input1
- tool_id: cat1
label: first_cat
state:
input1:
$link: 0
- label: embed1
run:
class: GalaxyTool
command: echo 'hello world 2' > $output1
outputs:
output1:
format: txt
- tool_id: cat1
state:
input1:
$link: first_cat/out_file1
queries:
- input2:
$link: embed1/output1
test_data:
input1: "hello world"
"""
)
downloaded_workflow = self._download_workflow(workflow_id)
response = self.workflow_populator.create_workflow_response(downloaded_workflow)
workflow_id = response.json()["id"]
hda1 = self.dataset_populator.new_dataset(history_id, content="Hello World Second!")
workflow_request = dict(
inputs_by="name",
inputs=json.dumps({"input1": self._ds_entry(hda1)}),
)
self.workflow_populator.invoke_workflow_and_wait(workflow_id, history_id=history_id, request=workflow_request)
assert self.dataset_populator.get_history_dataset_content(history_id) == "Hello World Second!\nhello world 2\n"