Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.selenium.test_workflow_run
import json
from uuid import uuid4
import yaml
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from typing_extensions import Literal
from galaxy_test.base import rules_test_data
from galaxy_test.base.workflow_fixtures import (
WORKFLOW_LIST_PAIRED_MAPPED_OVER_PAIRED,
WORKFLOW_LIST_PAIRED_OR_UNPAIRED_INPUT,
WORKFLOW_NESTED_REPLACEMENT_PARAMETER,
WORKFLOW_NESTED_RUNTIME_PARAMETER,
WORKFLOW_NESTED_SIMPLE,
WORKFLOW_RENAME_ON_REPLACEMENT_PARAM,
WORKFLOW_RUNTIME_PARAMETER_SIMPLE,
WORKFLOW_SELECT_FROM_OPTIONAL_DATASET,
WORKFLOW_SIMPLE_CAT_TWICE,
WORKFLOW_WITH_CUSTOM_REPORT_1,
WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA,
WORKFLOW_WITH_DATA_TAG_FILTER,
WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION,
WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION,
WORKFLOW_WITH_OLD_TOOL_VERSION,
WORKFLOW_WITH_RULES_1,
)
from .framework import (
managed_history,
RunsWorkflows,
selenium_test,
SeleniumTestCase,
UsesHistoryItemAssertions,
)
from .test_workflow_editor import CHIPSEQ_COLUMNS
[docs]
class TestWorkflowRun(SeleniumTestCase, UsesHistoryItemAssertions, RunsWorkflows):
ensure_registered = True
[docs]
@selenium_test
@managed_history
def test_workflow_export_file_rocrate(self):
self._setup_simple_invocation_for_export_testing()
invocations = self.components.invocations
self.workflow_run_wait_for_ok(hid=2)
invocations.export_tab_disabled.wait_for_absent()
invocations.export_tab.wait_for_and_click()
self.screenshot("invocation_export_formats")
invocations.export_output_format(type="ro-crate").wait_for_and_click()
invocations.wizard_next_button.wait_for_and_click()
download_option = invocations.export_destination(destination="download")
download_option.wait_for_present()
self.screenshot("invocation_export_rocrate_destinations")
download_option.wait_for_and_click()
invocations.wizard_next_button.wait_for_and_click()
export_button = invocations.wizard_export_button
export_button.wait_for_present()
self.screenshot("invocation_export_rocrate_download_options")
export_button.wait_for_and_click()
self.sleep_for(self.wait_types.UX_TRANSITION)
self.screenshot("invocation_export_crate_preparing_download")
invocations.export_download_link.wait_for_present()
self.screenshot("invocation_export_crate_download_ready")
[docs]
@selenium_test
@managed_history
def test_workflow_export_file_native(self):
self._setup_simple_invocation_for_export_testing()
invocations = self.components.invocations
self.workflow_run_wait_for_ok(hid=2)
invocations.export_tab_disabled.wait_for_absent()
invocations.export_tab.wait_for_and_click()
self.screenshot("invocation_export_formats")
invocations.export_output_format(type="default-file").wait_for_and_click()
invocations.wizard_next_button.wait_for_and_click()
download_option = invocations.export_destination(destination="download")
download_option.wait_for_present()
self.screenshot("invocation_export_native_destinations")
download_option.wait_for_and_click()
invocations.wizard_next_button.wait_for_and_click()
export_button = invocations.wizard_export_button
export_button.wait_for_present()
self.screenshot("invocation_export_native_download_options")
export_button.wait_for_and_click()
self.sleep_for(self.wait_types.UX_TRANSITION)
self.screenshot("invocation_export_native_preparing_download")
invocations.export_download_link.wait_for_present()
self.screenshot("invocation_export_native_download_ready")
[docs]
@selenium_test
@managed_history
def test_simple_execution(self):
self.perform_upload(self.get_filename("1.fasta"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_SIMPLE_CAT_TWICE)
self.screenshot("workflow_run_simple_ready")
self.workflow_run_submit()
self.sleep_for(self.wait_types.UX_TRANSITION)
self.screenshot("workflow_run_simple_submitted")
self.workflow_run_wait_for_ok(hid=2, expand=True)
self.assert_item_summary_includes(2, "2 sequences")
self.screenshot("workflow_run_simple_complete")
[docs]
@selenium_test
@managed_history
def test_expanded_execution_of_simple_workflow(self):
self.perform_upload(self.get_filename("1.fasta"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_SIMPLE_CAT_TWICE)
self.workflow_run_ensure_expanded()
self.screenshot("workflow_run_expanded_ready")
self.workflow_run_submit()
self.sleep_for(self.wait_types.UX_TRANSITION)
self.screenshot("workflow_run_expanded_submitted")
self.workflow_run_wait_for_ok(hid=2, expand=True)
self.assert_item_summary_includes(2, "2 sequences")
self.screenshot("workflow_run_simple_complete")
def _setup_chipseq_input_workflow(self):
editor = self.components.workflow_editor
name = self.workflow_create_new()
self.workflow_editor_add_input(item_name="data_collection_input")
editor.label_input.wait_for_and_send_keys("input1")
editor.annotation_input.wait_for_and_send_keys("chipseq example input")
self.sleep_for(self.wait_types.UX_RENDER)
editor.collection_type_input.wait_for_and_clear_and_send_keys("sample_sheet:paired")
self.workflow_editor_enter_column_definitions(CHIPSEQ_COLUMNS)
self.tool_open("__SAMPLE_SHEET_TO_TABULAR__")
self.sleep_for(self.wait_types.UX_RENDER)
editor.label_input.wait_for_and_send_keys("as_table")
self.components.workflow_editor.tool_bar.auto_layout.wait_for_and_click()
self.sleep_for(self.wait_types.UX_RENDER)
self.workflow_editor_connect("input1#output", "as_table#input")
self.workflow_editor_click_save()
return name
def _chipseq_data_entry(self, element_identifier_mutable: bool = False) -> None:
workflow_run = self.components.workflow_run
sample_sheet = workflow_run.input.sample_sheet
sample_sheet.grid_cell_input(row_index=0, column_name="Condition").assert_absent()
sample_sheet.grid_cell(row_index=0, column_name="Condition").wait_for_and_double_click()
sample_sheet.grid_cell_input(row_index=0, column_name="Condition").wait_for_visible()
action_chains = self.action_chains()
def tab_if_element_identifier_mutable():
if element_identifier_mutable:
return action_chains.send_keys(Keys.TAB)
# 0: row for SRR5680995
action_chains.send_keys("input")
action_chains.send_keys(Keys.TAB)
# no replicate here...
action_chains.send_keys(Keys.TAB)
# no control here...
action_chains.send_keys(Keys.TAB)
# 1: row for SRR5680996
tab_if_element_identifier_mutable()
action_chains.send_keys("H3K4me3")
action_chains.send_keys(Keys.TAB)
action_chains.send_keys("1")
action_chains.send_keys(Keys.TAB)
# action_chains.send_keys("SRR5680995")
action_chains.send_keys(Keys.TAB)
# 2: row for SRR5680997
# identifier correct...
tab_if_element_identifier_mutable()
action_chains.send_keys("H3K27me3")
action_chains.send_keys(Keys.TAB)
action_chains.send_keys("1")
action_chains.send_keys(Keys.TAB)
# action_chains.send_keys("SRR5680995")
action_chains.send_keys(Keys.TAB)
# 3: row for SRR5681007
# identifier correct...
tab_if_element_identifier_mutable()
action_chains.send_keys("H3K27me3")
action_chains.send_keys(Keys.TAB)
action_chains.send_keys("2")
action_chains.send_keys(Keys.TAB)
# action_chains.send_keys("SRR5681005")
action_chains.send_keys(Keys.TAB)
# 4: row for SRR5681006
# identifier correct...
tab_if_element_identifier_mutable()
action_chains.send_keys("H3K4me3")
action_chains.send_keys(Keys.TAB)
action_chains.send_keys("2")
action_chains.send_keys(Keys.TAB)
# action_chains.send_keys("SRR5681005")
action_chains.send_keys(Keys.TAB)
# 5: row for SRR5680998
# identifier correct...
tab_if_element_identifier_mutable()
action_chains.send_keys("CTCF")
action_chains.send_keys(Keys.TAB)
action_chains.send_keys("1")
action_chains.send_keys(Keys.TAB)
# action_chains.send_keys("SRR5680995")
action_chains.send_keys(Keys.TAB)
# 6: row for SRR5681008
# identifier correct...
tab_if_element_identifier_mutable()
action_chains.send_keys("CTCF")
action_chains.send_keys(Keys.TAB)
action_chains.send_keys("2")
action_chains.send_keys(Keys.TAB)
# action_chains.send_keys("SRR5681005")
action_chains.send_keys(Keys.TAB)
# 7: row for SRR5681005
# identifier correct...
tab_if_element_identifier_mutable()
action_chains.send_keys("input")
action_chains.send_keys(Keys.TAB)
action_chains.click()
action_chains.perform()
controls = {
1: "SRR5680995",
2: "SRR5680995",
3: "SRR5681005",
4: "SRR5681005",
5: "SRR5680995",
6: "SRR5681005",
}
for row_index, control in controls.items():
sample_sheet.grid_cell(row_index=row_index, column_name="Control").wait_for_and_double_click()
sample_sheet.select_picker.wait_for_and_click()
sample_sheet.select_item(item=control).wait_for_and_click()
[docs]
@selenium_test
@managed_history
def test_collection_input_sample_sheet_chipseq_example_from_uris(self):
history_id = self.current_history_id()
name = self._setup_chipseq_input_workflow()
self.workflow_run_with_name(name)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input1")
input.upload.wait_for_and_click()
sample_sheet = workflow_run.input.sample_sheet
sample_sheet._.wait_for_present()
self.screenshot("workflow_run_sample_sheet_chipseq_source")
sample_sheet.data_import_source_from(source="pasted_table").wait_for_and_click()
sample_sheet.wizard_next_button.wait_for_and_click()
base_url = self.dataset_populator.base64_url_for_bytes(b"hello world")
urls = [
f"{base_url}/SRR5680995_R1.fastq.gz",
f"{base_url}/SRR5680995_R2.fastq.gz",
f"{base_url}/SRR5680996_R1.fastq.gz",
f"{base_url}/SRR5680996_R2.fastq.gz",
f"{base_url}/SRR5680997_R1.fastq.gz",
f"{base_url}/SRR5680997_R2.fastq.gz",
f"{base_url}/SRR5681007_R1.fastq.gz",
f"{base_url}/SRR5681007_R2.fastq.gz",
f"{base_url}/SRR5681006_R1.fastq.gz",
f"{base_url}/SRR5681006_R2.fastq.gz",
f"{base_url}/SRR5680998_R1.fastq.gz",
f"{base_url}/SRR5680998_R2.fastq.gz",
f"{base_url}/SRR5681008_R1.fastq.gz",
f"{base_url}/SRR5681008_R2.fastq.gz",
f"{base_url}/SRR5681005_R1.fastq.gz",
f"{base_url}/SRR5681005_R2.fastq.gz",
]
pasted_data = "\n".join(urls)
sample_sheet.paste_table_textarea.wait_for_and_send_keys(pasted_data)
self.screenshot("workflow_run_sample_sheet_chipseq_pasted_data")
sample_sheet.wizard_next_button.wait_for_and_click()
self.screenshot("workflow_run_sample_sheet_chipseq_table_empty")
self._chipseq_data_entry(element_identifier_mutable=True)
self.screenshot("workflow_run_sample_sheet_chipseq_table_full")
self.sleep_for(self.wait_types.UX_RENDER)
sample_sheet.wizard_next_button.wait_for_and_click()
self.history_panel_wait_for_hid_ok(1)
self.screenshot("workflow_run_sample_sheet_chipseq_sheet_created")
sample_sheet.collection_created_message.wait_for_present()
self.workflow_run_submit()
self._expect_chipseq_table(history_id, 18)
def _expect_chipseq_table(self, history_id: str, hid: int):
self.history_panel_wait_for_hid_ok(hid)
contents = self.dataset_populator.get_history_dataset_content(history_id, hid=hid)
expected_contents = """SRR5680995\tinput\t\t
SRR5680996\tH3K4me3\t1\tSRR5680995
SRR5680997\tH3K27me3\t1\tSRR5680995
SRR5681007\tH3K27me3\t2\tSRR5681005
SRR5681006\tH3K4me3\t2\tSRR5681005
SRR5680998\tCTCF\t1\tSRR5680995
SRR5681008\tCTCF\t2\tSRR5681005
SRR5681005\tinput\t\t
"""
assert (
contents == expected_contents
), f"Expected chipseq sample sheet table:\n{expected_contents}\nGot:\n{contents}"
[docs]
@selenium_test
@managed_history
def test_collection_input_sample_sheet_chipseq_example_from_list_pairs(self):
history_id = self.current_history_id()
base_url = self.dataset_populator.base64_url_for_bytes(b"hello world")
urls = [
f"{base_url}/SRR5680995_R1.fastq.gz",
f"{base_url}/SRR5680995_R2.fastq.gz",
f"{base_url}/SRR5680996_R1.fastq.gz",
f"{base_url}/SRR5680996_R2.fastq.gz",
f"{base_url}/SRR5680997_R1.fastq.gz",
f"{base_url}/SRR5680997_R2.fastq.gz",
f"{base_url}/SRR5681007_R1.fastq.gz",
f"{base_url}/SRR5681007_R2.fastq.gz",
f"{base_url}/SRR5681006_R1.fastq.gz",
f"{base_url}/SRR5681006_R2.fastq.gz",
f"{base_url}/SRR5680998_R1.fastq.gz",
f"{base_url}/SRR5680998_R2.fastq.gz",
f"{base_url}/SRR5681008_R1.fastq.gz",
f"{base_url}/SRR5681008_R2.fastq.gz",
f"{base_url}/SRR5681005_R1.fastq.gz",
f"{base_url}/SRR5681005_R2.fastq.gz",
]
pasted_data = "\n".join(urls)
self.perform_upload_of_pasted_content(pasted_data)
self.history_panel_wait_for_and_select([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16])
self.history_panel_build_list_of_pairs()
self.collection_builder_set_name("inputaslist")
self.collection_builder_create()
self.history_panel_wait_for_hid_visible(33)
name = self._setup_chipseq_input_workflow()
self.workflow_run_with_name(name)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input1")
input.upload.wait_for_and_click()
sample_sheet = workflow_run.input.sample_sheet
sample_sheet._.wait_for_present()
sample_sheet.data_import_source_from(source="collection").wait_for_and_click()
sample_sheet.wizard_next_button.wait_for_and_click()
self.screenshot("workflow_run_sample_sheet_from_collection")
# no longer needed since the collection dialog just pops up when navigates to form
# sample_sheet.select_collection.wait_for_and_click()
collection_id = self.hid_to_history_item(33)["id"]
sample_sheet.collection_selection(id=collection_id).wait_for_present()
self.screenshot("workflow_run_sample_sheet_from_collection_select_collection")
sample_sheet.collection_selection(id=collection_id).wait_for_and_click()
# sample_sheet.wizard_next_button.wait_for_and_click()
self.screenshot("workflow_run_sample_sheet_from_collection_grid")
self._chipseq_data_entry(element_identifier_mutable=False)
self.screenshot("workflow_run_sample_sheet_from_collection_grid_full")
self.sleep_for(self.wait_types.UX_RENDER)
sample_sheet.wizard_next_button.wait_for_and_click()
self.history_panel_wait_for_hid_ok(50)
sample_sheet.collection_created_message.wait_for_present()
self.workflow_run_submit()
self._expect_chipseq_table(history_id, 51)
[docs]
@selenium_test
@managed_history
def test_runtime_parameters_simple(self):
self.perform_upload(self.get_filename("1.txt"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_RUNTIME_PARAMETER_SIMPLE)
self.tool_parameter_div("num_lines")
self.screenshot("workflow_run_runtime_parameters_initial")
self._set_num_lines_to_3("num_lines")
self.screenshot("workflow_run_runtime_parameters_modified")
self.workflow_run_submit()
self._assert_has_3_lines_after_run(hid=2)
[docs]
@selenium_test
@managed_history
def test_runtime_parameters_simple_optional(self):
self.workflow_run_open_workflow(
"""
class: GalaxyWorkflow
inputs: {}
steps:
int_step:
tool_id: expression_null_handling_integer
runtime_inputs:
- int_input
"""
)
self.tool_parameter_div("int_input")
self._set_num_lines_to_3("int_input")
self.screenshot("workflow_run_optional_runtime_parameters_modified")
self.workflow_run_submit()
self.workflow_run_wait_for_ok(hid=1)
history_id = self.current_history_id()
content = self.dataset_populator.get_history_dataset_content(history_id, hid=1)
assert json.loads(content) == 3
[docs]
@selenium_test
@managed_history
def test_subworkflows_expanded(self):
self.perform_upload(self.get_filename("1.txt"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_NESTED_SIMPLE)
self.workflow_run_ensure_expanded()
self.components.workflow_run.subworkflow_step_icon.wait_for_visible()
self.screenshot("workflow_run_nested_collapsed")
self.components.workflow_run.subworkflow_step_icon.wait_for_and_click()
self.screenshot("workflow_run_nested_open")
[docs]
@selenium_test
@managed_history
def test_subworkflow_runtime_parameters(self):
self.perform_upload(self.get_filename("1.txt"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_NESTED_RUNTIME_PARAMETER)
self.workflow_run_ensure_expanded()
self.components.workflow_run.subworkflow_step_icon.wait_for_visible()
self.screenshot("workflow_run_nested_parameters_collapsed")
self.components.workflow_run.subworkflow_step_icon.wait_for_and_click()
self.screenshot("workflow_run_nested_parameters_open")
self._set_num_lines_to_3("1|num_lines")
self.workflow_run_submit()
self._assert_has_3_lines_after_run(hid=2)
[docs]
@selenium_test
@managed_history
def test_replacement_parameters(self):
self.perform_upload(self.get_filename("1.txt"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_RENAME_ON_REPLACEMENT_PARAM)
self.workflow_run_ensure_expanded()
self.screenshot("workflow_run_rename_simple_empty")
self._set_replacement_parameter("replaceme", "moocow")
self.screenshot("workflow_run_rename_simple_input")
self.workflow_run_submit()
output_hid = 2
self.workflow_run_wait_for_ok(hid=output_hid)
history_id = self.current_history_id()
details = self.dataset_populator.get_history_dataset_details(history_id, hid=output_hid)
assert details["name"] == "moocow suffix", details
[docs]
@selenium_test
@managed_history
def test_step_parameter_inputs(self):
self.perform_upload(self.get_filename("1.txt"))
self.wait_for_history()
self.workflow_run_open_workflow(
"""
class: GalaxyWorkflow
inputs:
input_int: integer
input_data: data
steps:
simple_constructs:
tool_id: simple_constructs
label: tool_exec
in:
inttest: input_int
files_0|file: input_data
"""
)
self.workflow_run_ensure_expanded()
workflow_run = self.components.workflow_run
input_div_element = workflow_run.input_div(label="input_int").wait_for_visible()
input_element = input_div_element.find_element(By.CSS_SELECTOR, "input")
input_element.clear()
input_element.send_keys("12345")
self.screenshot("workflow_run_step_parameter_input")
self.workflow_run_submit()
output_hid = 2
self.workflow_run_wait_for_ok(hid=output_hid)
history_id = self.current_history_id()
content = self.dataset_populator.get_history_dataset_content(history_id, hid=output_hid)
assert "12345" in content, content
assert "chr6_hla_hap2" in content
[docs]
@selenium_test
@managed_history
def test_replacement_parameters_on_subworkflows(self):
self.perform_upload(self.get_filename("1.txt"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_NESTED_REPLACEMENT_PARAMETER)
self.workflow_run_ensure_expanded()
self.screenshot("workflow_run_rename_subworkflow_empty")
self._set_replacement_parameter("replaceme", "moocow")
self.screenshot("workflow_run_rename_subworkflow_input")
self.workflow_run_submit()
output_hid = 2
self.workflow_run_wait_for_ok(hid=output_hid)
history_id = self.current_history_id()
details = self.dataset_populator.get_history_dataset_details(history_id, hid=output_hid)
assert details["name"] == "moocow suffix", details
[docs]
@selenium_test
def test_execution_with_tool_upgrade(self):
name = self.workflow_upload_yaml_with_random_name(WORKFLOW_WITH_OLD_TOOL_VERSION, exact_tools=True)
self.workflow_run_with_name(name)
self.sleep_for(self.wait_types.UX_TRANSITION)
# Check that this tool form contains a warning about different versions.
self.assert_message(self.components.workflow_run.warning, contains="tools which have changed")
self.screenshot("workflow_run_tool_upgrade")
[docs]
@selenium_test
def test_run_form_safe_upgrade_handling(self):
workflow_with_rules = yaml.safe_load(WORKFLOW_WITH_RULES_1)
# 0.9.0 is a version that does not exist in WORKFLOW_SAFE_TOOL_VERSION_UPDATES
workflow_with_rules["steps"]["apply"]["tool_version"] = "0.9.0"
workflow_with_rules_json = json.dumps(workflow_with_rules)
name = self.workflow_upload_yaml_with_random_name(workflow_with_rules_json, exact_tools=True)
self.workflow_run_with_name(name)
self.sleep_for(self.wait_types.UX_TRANSITION)
self.assert_message(self.components.workflow_run.warning, contains="tools which have changed")
# 1.0.0 is a version that exists in WORKFLOW_SAFE_TOOL_VERSION_UPDATES
workflow_with_rules["steps"]["apply"]["tool_version"] = "1.0.0"
workflow_with_rules_json = json.dumps(workflow_with_rules)
name = self.workflow_upload_yaml_with_random_name(workflow_with_rules_json, exact_tools=True)
self.workflow_run_with_name(name)
self.sleep_for(self.wait_types.UX_TRANSITION)
# Check that this tool form does not contain a warning about different versions.
assert self.components.workflow_run.warning.is_absent
[docs]
@selenium_test
@managed_history
def test_execution_with_multiple_inputs(self):
history_id = self.workflow_run_and_submit(
WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION,
WORKFLOW_WITH_DYNAMIC_OUTPUT_COLLECTION,
inputs_specified_screenshot_name="workflow_run_two_inputs",
ensure_expanded=True,
)
self.workflow_run_wait_for_ok(hid=7)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=7)
assert "10.0\n30.0\n20.0\n40.0\n" == content
[docs]
@selenium_test
@managed_history
def test_execution_with_text_default_value_connected_to_restricted_select(self):
self.workflow_run_open_workflow(
"""
class: GalaxyWorkflow
inputs:
text_param:
optional: true
default: ex2
restrictOnConnections: true
type: text
steps:
multi_select:
tool_id: multi_select
in:
select_ex:
source: text_param
"""
)
element = self.components.workflow_run.input_select_field(label="text_param").wait_for_present()
assert element.text == "Ex2"
self.workflow_run_submit()
history_id = self.current_history_id()
self.workflow_populator.wait_for_history_workflows(history_id, expected_invocation_count=1)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=1)
assert content == "ex2"
[docs]
@selenium_test
@managed_history
def test_execution_with_rules(self):
history_id = self.workflow_run_and_submit(
WORKFLOW_WITH_RULES_1,
WORKFLOW_WITH_RULES_1,
landing_screenshot_name="workflow_run_rules_landing",
inputs_specified_screenshot_name="workflow_run_rules",
ensure_expanded=True,
)
self.workflow_run_wait_for_ok(hid=6)
output_content = self.dataset_populator.get_history_collection_details(history_id, hid=6)
rules_test_data.check_example_2(output_content, self.dataset_populator)
[docs]
@selenium_test
@managed_history
def test_execution_with_custom_invocation_report(self):
history_id = self.workflow_run_and_submit(
WORKFLOW_WITH_CUSTOM_REPORT_1,
WORKFLOW_WITH_CUSTOM_REPORT_1_TEST_DATA,
ensure_expanded=True,
)
self.screenshot("workflow_run_invocation_report")
self.workflow_populator.wait_for_history_workflows(history_id, expected_invocation_count=1)
invocation_0 = self.workflow_populator.history_invocations(history_id)[0]
self.get(f"workflows/invocations/report?id={invocation_0['id']}")
self.wait_for_selector_visible(".markdown-component")
self.screenshot("workflow_report_custom_1")
[docs]
@selenium_test
@managed_history
def test_execution_with_null_optional_select_from_data(self):
history_id = self.workflow_run_and_submit(
WORKFLOW_SELECT_FROM_OPTIONAL_DATASET,
)
self.workflow_populator.wait_for_history_workflows(history_id, expected_invocation_count=1)
[docs]
@selenium_test
@managed_history
def test_workflow_run_button_disabled_when_required_input_missing(self):
self.workflow_run_open_workflow(
"""
class: GalaxyWorkflow
inputs:
text_param:
type: text
optional: false
data_param:
type: data
collection_param:
type: data_collection
steps: {}
"""
)
workflow_run = self.components.workflow_run
# None of the required parameters are present
workflow_run.run_workflow_disabled.wait_for_present()
# upload datasets and collections
history_id = self.current_history_id()
self.dataset_populator.new_dataset(history_id, wait=True)
self.dataset_collection_populator.create_list_in_history(history_id, wait=True)
input_element = workflow_run.simplified_input(label="text_param").wait_for_and_click()
input_element.send_keys("3")
# Everything is set, workflow is runnable, disabled class should be removed
workflow_run.run_workflow_disabled.wait_for_absent()
input_element.clear()
workflow_run.run_workflow_disabled.wait_for_present()
[docs]
@selenium_test
@managed_history
def test_workflow_run_tag_filter(self):
history_id = self.current_history_id()
dataset = self.dataset_populator.new_dataset(history_id, wait=True)
self.dataset_populator.tag_dataset(history_id, dataset["id"], tags=["genomescope_model"])
# Add another possible input that should not be selected
self.dataset_populator.new_dataset(history_id, wait=True)
workflow_id, workflow_name = self._create_workflow_with_unique_name(WORKFLOW_WITH_DATA_TAG_FILTER, "ga")
self.workflow_run_with_name(workflow_name)
self.workflow_run_submit()
self.sleep_for(self.wait_types.HISTORY_POLL)
invocations = self.workflow_populator.workflow_invocations(workflow_id=workflow_id)
invocation = self.workflow_populator.get_invocation(invocations[-1]["id"])
assert invocation["inputs"]["0"]["id"] == dataset["id"]
[docs]
@selenium_test
@managed_history
def test_workflow_run_list_paired_or_unpaired_with_paired_list(self):
history_id = self.current_history_id()
self.perform_upload_of_pasted_content(
{
"foo_1.fasta": "forward content",
"foo_2.fasta": "reverse content",
}
)
self.history_panel_wait_for_and_select([1, 2])
self.history_panel_build_list_of_pairs()
self.collection_builder_set_name("my awesome paired list")
self.collection_builder_create()
self.history_panel_wait_for_hid_ok(5)
self._create_and_run_workflow_with_unique_name(WORKFLOW_LIST_PAIRED_OR_UNPAIRED_INPUT)
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(6)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=6)
assert content.strip() == "forward content\nreverse content"
[docs]
@selenium_test
@managed_history
def test_workflow_run_list_paired_or_unpaired_with_flat_list(self):
history_id = self.current_history_id()
self.perform_upload_of_pasted_content(
{
"foo_1.fasta": "forward content",
"foo_2.fasta": "reverse content",
}
)
self.history_panel_wait_for_and_select([1, 2])
self.history_panel_build_list_advanced_and_select_builder("list")
self.collection_builder_set_name("my awesome flat list")
self.collection_builder_create()
self.history_panel_wait_for_hid_ok(5)
self._create_and_run_workflow_with_unique_name(WORKFLOW_LIST_PAIRED_OR_UNPAIRED_INPUT)
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(6)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=6)
assert content.strip() == "reverse content\nforward content"
[docs]
@selenium_test
@managed_history
def test_workflow_run_list_paired_or_unpaired_with_mixed_list(self):
history_id = self.current_history_id()
self.perform_upload_of_pasted_content(
{
"foo_1.fasta": "forward content",
"foo_2.fasta": "reverse content",
"other.fasta": "unpaired content",
}
)
self.history_panel_wait_for_and_select([1, 2, 3])
self.history_panel_build_list_of_paired_or_unpaireds()
self.collection_builder_set_name("my awesome flat list")
self.collection_builder_create()
self.history_panel_wait_for_hid_ok(7)
self._create_and_run_workflow_with_unique_name(WORKFLOW_LIST_PAIRED_OR_UNPAIRED_INPUT)
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(8)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=8)
assert content.strip() == "forward content\nreverse content\nunpaired content"
[docs]
@selenium_test
@managed_history
def test_upload_dataset_from_workflow_simple(self):
history_id = self.current_history_id()
self._create_and_run_workflow_with_unique_name(WORKFLOW_SIMPLE_CAT_TWICE)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input1")
input.upload.wait_for_and_click()
self._upload_hello_world_for_input(input)
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(2)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=2)
assert content.strip() == "hello world\nhello world"
[docs]
@selenium_test
@managed_history
def test_modal_upload_updates_form(self):
history_id = self.current_history_id()
self.perform_upload_of_pasted_content("goodbye land")
self._create_and_run_workflow_with_unique_name(WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input1")
input.upload.wait_for_and_click()
self.perform_upload_of_pasted_content("hello world", on_current_page=True)
self.history_panel_wait_for_hid_ok(2)
builder = workflow_run.input.collection_builder._(label="input1")
# it is a div so I don't think it works to click directly but we can go to it and click
# on that part of the screen.
element = builder.element_by_hid(hid=2).wait_for_present()
action_chains = self.action_chains()
action_chains.move_to_element(element)
action_chains.click()
action_chains.perform()
input.collection_tab_build_link.wait_for_and_click()
builder.create.wait_for_and_click()
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(5)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=6)
assert content.strip() == "hello world"
[docs]
@selenium_test
@managed_history
def test_upload_list_from_workflow_simple(self):
self._create_and_run_workflow_with_unique_name(WORKFLOW_WITH_MAPPED_OUTPUT_COLLECTION)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input1")
input.upload.wait_for_and_click()
input.collection_tab_upload_link.wait_for_and_click()
builder = workflow_run.input.collection_builder._(label="input1")
self._upload_hello_world_for_input(builder, count=2)
builder.create.wait_for_and_click()
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(6)
[docs]
@selenium_test
@managed_history
def test_upload_list_paired_from_workflow(self):
history_id = self.current_history_id()
self._create_and_run_workflow_with_unique_name(WORKFLOW_LIST_PAIRED_MAPPED_OVER_PAIRED)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input_list")
input.upload.wait_for_and_click()
input.collection_tab_upload_link.wait_for_and_click()
builder = workflow_run.input.collection_builder._(label="input_list")
self._upload_hello_world_for_input(builder, count=2)
builder.create.wait_for_and_click()
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(6)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=7)
assert content.strip() == "hello world\nhello world"
[docs]
@selenium_test
@managed_history
def test_upload_list_paired_or_unpaired_from_workflow(self):
history_id = self.current_history_id()
self.perform_upload_of_pasted_content(
{
"foo_1.fasta": "forward content",
"foo_2.fasta": "reverse content",
"other.fasta": "unpaired content",
}
)
self.history_panel_wait_for_hid_ok(3)
self._create_and_run_workflow_with_unique_name(WORKFLOW_LIST_PAIRED_OR_UNPAIRED_INPUT)
workflow_run = self.components.workflow_run
input = workflow_run.input._(label="input_list")
input.upload.wait_for_and_click()
builder = workflow_run.input.collection_builder._(label="input_list")
builder.element_by_hid(hid=3).wait_for_present()
# self.sleep_for(self.wait_types.UX_TRANSITION)
builder.select_all.wait_for_and_click()
input.collection_tab_build_link.wait_for_and_click()
builder.create.wait_for_and_click()
self.workflow_run_submit()
self.history_panel_wait_for_hid_ok(8)
content = self.dataset_populator.get_history_dataset_content(history_id, hid=8)
assert content.strip() == "unpaired content\nreverse content\nforward content"
def _upload_hello_world_for_input(self, workflow_input, count=1, from_hid=1):
# assumes fresh history...
for i in range(count):
workflow_input.create_button.wait_for_and_click()
url = self.dataset_populator.base64_url_for_string("hello world")
workflow_input.paste_content(n=i).wait_for_and_send_keys(url)
workflow_input.title(n=i).wait_for_and_clear_and_send_keys(f"hello world.{i + 1}.fastq")
workflow_input.embedded_start_button.wait_for_and_click()
workflow_input.use_button_disabled.wait_for_absent()
workflow_input.use_button.wait_for_and_click()
def _create_and_run_workflow_with_unique_name(
self, workflow_contents: str, format: Literal["ga", "gxformat2"] = "gxformat2"
):
workflow_id, workflow_name = self._create_workflow_with_unique_name(workflow_contents, format)
self.workflow_run_with_name(workflow_name)
return workflow_id, workflow_name
def _create_workflow_with_unique_name(
self, workflow_contents: str, format: Literal["ga", "gxformat2"] = "gxformat2"
):
workflow_name = str(uuid4())
if format == "gxformat2":
wf = yaml.safe_load(workflow_contents)
wf["name"] = workflow_name
workflow_id = self.workflow_populator.upload_yaml_workflow(wf)
else:
wf = json.loads(workflow_contents)
wf["name"] = workflow_name
workflow_id = self.workflow_populator.create_workflow(wf)
return (workflow_id, workflow_name)
def _assert_has_3_lines_after_run(self, hid):
self.workflow_run_wait_for_ok(hid=hid)
history_id = self.current_history_id()
content = self.dataset_populator.get_history_dataset_content(history_id, hid=hid)
assert len([x for x in content.split("\n") if x]) == 3, content
def _set_num_lines_to_3(self, element_id):
# for random_lines num_lines parameter as runtime parameter in workflow form.
div = self.tool_parameter_div(element_id)
input_element = div.find_element(By.CSS_SELECTOR, "input")
# runtime parameters not being set to tool default value:
# https://github.com/galaxyproject/galaxy/pull/7157
# initial_value = input_element.get_attribute("value")
# assert initial_value == "1", initial_value
input_element.clear()
input_element.send_keys("3")
def _set_replacement_parameter(self, element_id, value):
# for random_lines num_lines parameter as runtime parameter in workflow form.
div = self.tool_parameter_div(element_id)
input_element = div.find_element(By.CSS_SELECTOR, "input")
initial_value = input_element.get_attribute("value")
assert initial_value == "", initial_value
input_element.clear()
input_element.send_keys(value)
def _setup_simple_invocation_for_export_testing(self):
# precondition: refresh history
self.perform_upload(self.get_filename("1.fasta"))
self.wait_for_history()
self.workflow_run_open_workflow(WORKFLOW_SIMPLE_CAT_TWICE)
self.workflow_run_submit()
history_id = self.current_history_id()
self.workflow_populator.wait_for_history_workflows(history_id, expected_invocation_count=1)
return self.workflow_populator.history_invocations(history_id)[0]