Source code for galaxy_test.api.test_tools
# Test tools API.
import contextlib
import json
import os
import zipfile
from io import BytesIO
from typing import (
Any,
Dict,
List,
)
import pytest
from requests import (
get,
put,
)
from galaxy.tool_util.verify.interactor import ValidToolTestDict
from galaxy.util import galaxy_root_path
from galaxy.util.unittest_utils import skip_if_github_down
from galaxy_test.base import rules_test_data
from galaxy_test.base.api_asserts import (
assert_has_keys,
assert_status_code_is,
)
from galaxy_test.base.decorators import requires_new_history
from galaxy_test.base.populators import (
BaseDatasetCollectionPopulator,
DatasetCollectionPopulator,
DatasetPopulator,
LibraryPopulator,
skip_without_tool,
stage_rules_example,
)
from ._framework import ApiTestCase
MINIMAL_TOOL = {
"id": "minimal_tool",
"name": "Minimal Tool",
"class": "GalaxyTool",
"version": "1.0.0",
"command": "echo 'Hello World' > $output1",
"inputs": [],
"outputs": dict(
output1=dict(format="txt"),
),
}
MINIMAL_TOOL_NO_ID = {
"name": "Minimal Tool",
"class": "GalaxyTool",
"version": "1.0.0",
"command": "echo 'Hello World 2' > $output1",
"inputs": [],
"outputs": dict(
output1=dict(format="txt"),
),
}
[docs]class TestsTools:
dataset_populator: DatasetPopulator
dataset_collection_populator: BaseDatasetCollectionPopulator
def _build_pair(self, history_id, contents):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id,
contents=contents,
direct_upload=True,
wait=True,
)
hdca_id = create_response.json()["outputs"][0]["id"]
return hdca_id
def _run_and_check_simple_collection_mapping(self, history_id, inputs):
create = self._run_cat(history_id, inputs=inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(outputs) == 2
assert len(implicit_collections) == 1
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123"
assert output2_content.strip() == "456"
def _run_cat(self, history_id, inputs, assert_ok=False, **kwargs):
return self._run("cat", history_id, inputs, assert_ok=assert_ok, **kwargs)
def _run(
self,
tool_id=None,
history_id=None,
inputs=None,
tool_uuid=None,
assert_ok=False,
tool_version=None,
use_cached_job=False,
wait_for_job=False,
input_format="legacy",
):
if inputs is None:
inputs = {}
if tool_id is None:
assert tool_uuid is not None
payload = self.dataset_populator.run_tool_payload(
tool_id=tool_id,
inputs=inputs,
history_id=history_id,
input_format=input_format,
)
if tool_uuid:
payload["tool_uuid"] = tool_uuid
if tool_version is not None:
payload["tool_version"] = tool_version
if use_cached_job:
payload["use_cached_job"] = True
create_response = self.dataset_populator._post("tools", data=payload)
if wait_for_job:
self.dataset_populator.wait_for_job(job_id=create_response.json()["jobs"][0]["id"])
if assert_ok:
assert_status_code_is(create_response, 200)
create = create_response.json()
assert_has_keys(create, "outputs")
return create
else:
return create_response
[docs]class TestToolsApi(ApiTestCase, TestsTools):
dataset_populator: DatasetPopulator
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] @skip_without_tool("cat1")
def test_search_cat(self):
url = self._api_url("tools")
payload = dict(q="concat")
get_response = get(url, payload).json()
assert "cat1" in get_response
[docs] @skip_without_tool("trimmer")
def test_search_trimmer(self):
url = self._api_url("tools")
payload = dict(q="leading or trailing characters")
get_response = get(url, payload).json()
assert "trimmer" in get_response
[docs] @skip_without_tool("Grep1")
def test_search_grep(self):
url = self._api_url("tools")
payload = dict(q="Select lines that match an expression")
get_response = get(url, payload).json()
assert "Grep1" in get_response
[docs] def test_no_panel_index(self):
index = self._get("tools", data=dict(in_panel=False))
tools_index = index.json()
# No need to flatten out sections, with in_panel=False, only tools are
# returned.
tool_ids = [_["id"] for _ in tools_index]
assert "upload1" in tool_ids
[docs] @skip_without_tool("test_sam_to_bam_conversions")
def test_requirements(self):
requirements_response = self._get("tools/test_sam_to_bam_conversions/requirements", admin=True)
self._assert_status_code_is_ok(requirements_response)
requirements = requirements_response.json()
assert len(requirements) == 1, requirements
requirement = requirements[0]
assert requirement["name"] == "samtools", requirement
[docs] @skip_without_tool("cat1")
def test_show_repeat(self):
tool_info = self._show_valid_tool("cat1")
parameters = tool_info["inputs"]
assert len(parameters) == 2, f"Expected two inputs - got [{parameters}]"
assert parameters[0]["name"] == "input1"
assert parameters[1]["name"] == "queries"
repeat_info = parameters[1]
self._assert_has_keys(repeat_info, "min", "max", "title", "help")
repeat_params = repeat_info["inputs"]
assert len(repeat_params) == 1
assert repeat_params[0]["name"] == "input2"
[docs] @skip_without_tool("random_lines1")
def test_show_conditional(self):
tool_info = self._show_valid_tool("random_lines1")
cond_info = tool_info["inputs"][2]
self._assert_has_keys(cond_info, "cases", "test_param")
self._assert_has_keys(cond_info["test_param"], "name", "type", "label", "help")
cases = cond_info["cases"]
assert len(cases) == 2
case1 = cases[0]
self._assert_has_keys(case1, "value", "inputs")
assert case1["value"] == "no_seed"
assert len(case1["inputs"]) == 0
case2 = cases[1]
self._assert_has_keys(case2, "value", "inputs")
case2_inputs = case2["inputs"]
assert len(case2_inputs) == 1
self._assert_has_keys(case2_inputs[0], "name", "type", "label", "help", "argument")
assert case2_inputs[0]["name"] == "seed"
[docs] @skip_without_tool("multi_data_param")
def test_show_multi_data(self):
tool_info = self._show_valid_tool("multi_data_param")
f1_info, f2_info = tool_info["inputs"][0], tool_info["inputs"][1]
self._assert_has_keys(f1_info, "min", "max")
assert f1_info["min"] == 1
assert f1_info["max"] == 1235
self._assert_has_keys(f2_info, "min", "max")
assert f2_info["min"] is None
assert f2_info["max"] is None
[docs] @skip_without_tool("collection_creates_list")
def test_show_output_collection(self):
tool_info = self._show_valid_tool("collection_creates_list")
outputs = tool_info["outputs"]
assert len(outputs) == 1
output = outputs[0]
assert output["label"] == "Duplicate List"
assert output["inherit_format"] is True
[docs] @skip_without_tool("test_data_source")
def test_data_source_build_request(self):
with self.dataset_populator.test_history() as history_id:
build = self.dataset_populator.build_tool_state("test_data_source", history_id)
galaxy_url_param = build["inputs"][0]
assert galaxy_url_param["name"] == "GALAXY_URL"
galaxy_url = galaxy_url_param["value"]
assert galaxy_url.startswith("http")
assert galaxy_url.endswith("tool_runner?tool_id=ratmine")
[docs] @skip_without_tool("cheetah_problem_unbound_var_input")
def test_legacy_biotools_xref_injection(self):
url = self._api_url("tools/cheetah_problem_unbound_var_input")
get_response = get(url)
get_response.raise_for_status()
get_json = get_response.json()
assert "xrefs" in get_json
assert len(get_json["xrefs"]) == 1
xref = get_json["xrefs"][0]
assert xref["reftype"] == "bio.tools"
assert xref["value"] == "bwa"
[docs] @skip_without_tool("test_data_source")
@skip_if_github_down
def test_data_source_ok_request(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.run_tool_payload(
tool_id="test_data_source",
inputs={
"URL": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed",
"URL_method": "get",
"data_type": "bed",
},
history_id=history_id,
)
create_response = self._post("tools", data=payload)
self._assert_status_code_is(create_response, 200)
create_object = create_response.json()
self._assert_has_keys(create_object, "outputs")
assert len(create_object["outputs"]) == 1
output = create_object["outputs"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.startswith("chr1\t147962192\t147962580")
output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert output_details["file_ext"] == "bed"
[docs] @skip_without_tool("test_data_source")
def test_data_source_sniff_fastqsanger(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.run_tool_payload(
tool_id="test_data_source",
inputs={
"URL": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.fastqsanger.gz",
"URL_method": "get",
},
history_id=history_id,
)
create_response = self._post("tools", data=payload)
self._assert_status_code_is(create_response, 200)
create_object = create_response.json()
self._assert_has_keys(create_object, "outputs")
assert len(create_object["outputs"]) == 1
output = create_object["outputs"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert output_details["file_ext"] == "fastqsanger.gz", output_details
[docs] @skip_without_tool("test_data_source")
def test_data_sources_block_file_parameters(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.run_tool_payload(
tool_id="test_data_source",
inputs={
"URL": f"file://{os.path.join(os.getcwd(), 'README.rst')}",
"URL_method": "get",
"data_type": "bed",
},
history_id=history_id,
)
create_response = self._post("tools", data=payload)
self._assert_status_code_is(create_response, 200)
create_object = create_response.json()
self._assert_has_keys(create_object, "outputs")
assert len(create_object["outputs"]) == 1
output = create_object["outputs"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output, wait=False)
assert output_details["state"] == "error", output_details
assert "has not sent back a URL parameter" in output_details["misc_info"], output_details
def _show_valid_tool(self, tool_id, tool_version=None):
data = dict(io_details=True)
if tool_version:
data["tool_version"] = tool_version
tool_show_response = self._get(f"tools/{tool_id}", data=data)
self._assert_status_code_is(tool_show_response, 200)
tool_info = tool_show_response.json()
self._assert_has_keys(tool_info, "inputs", "outputs", "panel_section_id")
return tool_info
[docs] @skip_without_tool("model_attributes")
def test_model_attributes_sanitization(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
cool_name_with_quote = 'cool name with a quo"te'
cool_name_without_quote = "cool name with a quo__dq__te"
current_user = self._get("users/current").json()
user_info_url = self._api_url(f"users/{current_user['id']}/information/inputs", use_key=True)
put_response = put(user_info_url, data=json.dumps({"address_0|desc": cool_name_with_quote}))
put_response.raise_for_status()
response = get(user_info_url).json()
assert len(response["addresses"]) == 1
assert response["addresses"][0]["desc"] == cool_name_with_quote
hda1 = self.dataset_populator.new_dataset(history_id, content="1\t2\t3", name=cool_name_with_quote)
assert hda1["name"] == cool_name_with_quote
rval = self._run(
tool_id="model_attributes",
inputs={"input1": dataset_to_param(hda1)},
history_id=history_id,
assert_ok=True,
wait_for_job=True,
)
sanitized_dataset_name = self.dataset_populator.get_history_dataset_content(
history_id, dataset=rval["outputs"][0]
)
assert sanitized_dataset_name.strip() == cool_name_without_quote
sanitized_email = self.dataset_populator.get_history_dataset_content(history_id, dataset=rval["outputs"][1])
assert '"' not in sanitized_email
sanitized_address = self.dataset_populator.get_history_dataset_content(
history_id, dataset=rval["outputs"][2]
)
assert sanitized_address.strip() == cool_name_without_quote
[docs] @skip_without_tool("composite_output")
def test_test_data_filepath_security(self):
test_data_response = self._get("tools/composite_output/test_data_path?filename=../CONTRIBUTORS.md", admin=True)
assert test_data_response.status_code == 404, test_data_response.text
[docs] @skip_without_tool("composite_output")
def test_test_data_admin_security(self):
test_data_response = self._get("tools/composite_output/test_data_path?filename=../CONTRIBUTORS.md")
assert test_data_response.status_code == 403, test_data_response.text
[docs] @skip_without_tool("dbkey_filter_multi_input")
def test_data_table_requirement_annotated(self):
test_data_response = self._get("tools/dbkey_filter_multi_input/test_data")
assert test_data_response.status_code == 200
test_case = test_data_response.json()[0]
assert test_case["required_data_tables"][0] == "test_fasta_indexes"
assert len(test_case["required_loc_files"]) == 0
[docs] @skip_without_tool("composite_output")
def test_test_data_composite_output(self):
test_data_response = self._get("tools/composite_output/test_data")
assert test_data_response.status_code == 200
test_data = test_data_response.json()
assert len(test_data) == 1
test_case = test_data[0]
self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files")
assert len(test_case["inputs"]) == 1, test_case
# input0 = next(iter(test_case["inputs"].values()))
[docs] @skip_without_tool("collection_two_paired")
def test_test_data_collection_two_paired(self):
test_data_response = self._get("tools/collection_two_paired/test_data")
assert test_data_response.status_code == 200
test_data = test_data_response.json()
assert len(test_data) == 2
test_case = test_data[0]
assert len(test_case["required_data_tables"]) == 0
assert len(test_case["required_loc_files"]) == 0
self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files")
assert len(test_case["inputs"]) == 3, test_case
[docs] @skip_without_tool("collection_nested_test")
def test_test_data_collection_nested(self):
test_data_response = self._get("tools/collection_nested_test/test_data")
assert test_data_response.status_code == 200
test_data = test_data_response.json()
assert len(test_data) == 2
test_case = test_data[0]
self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files")
assert len(test_case["inputs"]) == 1, test_case
[docs] @skip_without_tool("expression_null_handling_boolean")
def test_test_data_null_boolean_inputs(self):
test_data_response = self._get("tools/expression_null_handling_boolean/test_data")
assert test_data_response.status_code == 200
test_data = test_data_response.json()
assert len(test_data) == 3
test_case = test_data[2]
self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files")
inputs = test_case["inputs"]
assert len(inputs) == 1, test_case
assert "bool_input" in inputs, inputs
assert inputs["bool_input"] is None, inputs
[docs] @skip_without_tool("simple_constructs_y")
def test_test_data_yaml_tools(self):
test_data_response = self._get("tools/simple_constructs_y/test_data")
assert test_data_response.status_code == 200
test_data = test_data_response.json()
assert len(test_data) == 3
[docs] @skip_without_tool("cat1")
def test_test_data_download(self):
test_data_response = self._get("tools/cat1/test_data_download?filename=1.bed")
assert test_data_response.status_code == 200, test_data_response.text.startswith("chr")
[docs] @skip_without_tool("composite_output")
def test_test_data_downloads_security(self):
test_data_response = self._get("tools/composite_output/test_data_download?filename=../CONTRIBUTORS.md")
assert test_data_response.status_code == 404, test_data_response.text
[docs] @skip_without_tool("composite_output")
def test_test_data_download_composite(self):
test_data_response = self._get("tools/composite_output/test_data_download?filename=velveth_test1")
assert test_data_response.status_code == 200
with zipfile.ZipFile(BytesIO(test_data_response.content)) as contents:
namelist = contents.namelist()
assert len(namelist) == 6
expected_names = {
"velveth_test1/Roadmaps",
"velveth_test1/output.html",
"velveth_test1/Sequences",
"velveth_test1/Log",
"velveth_test1/output/",
"velveth_test1/output/1",
}
assert set(namelist) == expected_names
[docs] def test_convert_dataset_explicit_history(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
fasta1_contents = open(self.get_filename("1.fasta")).read()
hda1 = self.dataset_populator.new_dataset(history_id, content=fasta1_contents)
payload = {
"src": "hda",
"id": hda1["id"],
"source_type": "fasta",
"target_type": "tabular",
"history_id": history_id,
}
create_response = self._post("tools/CONVERTER_fasta_to_tabular/convert", data=payload)
self.dataset_populator.wait_for_job(create_response.json()["jobs"][0]["id"], assert_ok=True)
create_response.raise_for_status()
assert len(create_response.json()["implicit_collections"]) == 0
for output in create_response.json()["outputs"]:
assert output["file_ext"] == "tabular"
[docs] def test_convert_dataset_implicit_history(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
fasta1_contents = open(self.get_filename("1.fasta")).read()
hda1 = self.dataset_populator.new_dataset(history_id, content=fasta1_contents)
payload = {"src": "hda", "id": hda1["id"], "source_type": "fasta", "target_type": "tabular"}
create_response = self._post("tools/CONVERTER_fasta_to_tabular/convert", data=payload)
self.dataset_populator.wait_for_job(create_response.json()["jobs"][0]["id"], assert_ok=True)
create_response.raise_for_status()
assert len(create_response.json()["implicit_collections"]) == 0
for output in create_response.json()["outputs"]:
assert output["file_ext"] == "tabular"
[docs] def test_convert_hdca(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
data = [
{
"name": "test0",
"elements": [
{"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "fasta"},
{"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "fasta"},
],
},
{
"name": "test1",
"elements": [
{"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "fasta"},
{"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "fasta"},
],
},
]
hdca1 = self.dataset_collection_populator.upload_collection(
history_id, "list:paired", elements=data, wait=True
)
self._assert_status_code_is(hdca1, 200)
payload = {
"src": "hdca",
"id": hdca1.json()["outputs"][0]["id"],
"source_type": "fasta",
"target_type": "tabular",
"history_id": history_id,
}
create_response = self._post("tools/CONVERTER_fasta_to_tabular/convert", payload)
create_response.raise_for_status()
assert create_response.json()["implicit_collections"] != []
hid = create_response.json()["implicit_collections"][0]["hid"]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=hid)
for element in collection_details["elements"][0]["object"]["elements"]:
assert element["object"]["file_ext"] == "tabular"
[docs] def test_unzip_collection(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"input": {"src": "hdca", "id": hdca_id},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
response = self._run("__UNZIP_COLLECTION__", history_id, inputs, assert_ok=True)
outputs = response["outputs"]
assert len(outputs) == 2
output_forward = outputs[0]
output_reverse = outputs[1]
output_forward_content = self.dataset_populator.get_history_dataset_content(
history_id, dataset=output_forward
)
output_reverse_content = self.dataset_populator.get_history_dataset_content(
history_id, dataset=output_reverse
)
assert output_forward_content.strip() == "123"
assert output_reverse_content.strip() == "456"
output_forward = self.dataset_populator.get_history_dataset_details(history_id, dataset=output_forward)
output_reverse = self.dataset_populator.get_history_dataset_details(history_id, dataset=output_reverse)
assert output_forward["history_id"] == history_id
assert output_reverse["history_id"] == history_id
[docs] def test_unzip_nested(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
response = self.dataset_collection_populator.upload_collection(
history_id,
"list:paired",
elements=[
{
"name": "test0",
"elements": [
{
"src": "pasted",
"paste_content": "123\n",
"name": "forward",
"ext": "txt",
"tags": ["#foo"],
},
{
"src": "pasted",
"paste_content": "456\n",
"name": "reverse",
"ext": "txt",
"tags": ["#bar"],
},
],
}
],
wait=True,
)
self._assert_status_code_is(response, 200)
hdca_id = response.json()["outputs"][0]["id"]
inputs = {
"input": {
"batch": True,
"values": [{"src": "hdca", "map_over_type": "paired", "id": hdca_id}],
}
}
response = self._run("__UNZIP_COLLECTION__", history_id, inputs, assert_ok=True)
implicit_collections = response["implicit_collections"]
assert len(implicit_collections) == 2
unzipped_hdca = self.dataset_populator.get_history_collection_details(
history_id, hid=implicit_collections[0]["hid"]
)
assert unzipped_hdca["elements"][0]["element_type"] == "hda", unzipped_hdca
[docs] def test_zip_inputs(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hda1 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="1\t2\t3"))
hda2 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="4\t5\t6"))
inputs = {
"input_forward": hda1,
"input_reverse": hda2,
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
response = self._run("__ZIP_COLLECTION__", history_id, inputs, assert_ok=True)
output_collections = response["output_collections"]
assert len(output_collections) == 1
self.dataset_populator.wait_for_job(response["jobs"][0]["id"], assert_ok=True)
zipped_hdca = self.dataset_populator.get_history_collection_details(
history_id, hid=output_collections[0]["hid"]
)
assert zipped_hdca["collection_type"] == "paired"
[docs] @skip_without_tool("__ZIP_COLLECTION__")
def test_collection_operation_dataset_input_permissions(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hda1 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="1\t2\t3"))
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self.dataset_populator.make_private(history_id, hda1["id"])
inputs = {
"input_forward": hda1,
"input_reverse": hda1,
}
with self._different_user_and_history() as other_history_id:
response = self._run("__ZIP_COLLECTION__", other_history_id, inputs, assert_ok=False)
self._assert_dataset_permission_denied_response(response)
[docs] @skip_without_tool("__UNZIP_COLLECTION__")
def test_collection_operation_collection_input_permissions(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id, direct_upload=True, wait=True
)
self._assert_status_code_is(create_response, 200)
collection = create_response.json()["outputs"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
collection = self.dataset_populator.get_history_collection_details(history_id, hid=collection["hid"])
element_id = collection["elements"][0]["object"]["id"]
self.dataset_populator.make_private(history_id, element_id)
inputs = {
"input": {"src": "hdca", "id": collection["id"]},
}
with self._different_user_and_history() as other_history_id:
response = self._run("__UNZIP_COLLECTION__", other_history_id, inputs, assert_ok=False)
self._assert_dataset_permission_denied_response(response)
[docs] def test_zip_list_inputs(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hdca1_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True
).json()["outputs"][0]["id"]
hdca2_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["1\n2\n3\n4", "5\n6\n7\n8"], wait=True
).json()["outputs"][0]["id"]
inputs = {
"input_forward": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]},
"input_reverse": {"batch": True, "values": [{"src": "hdca", "id": hdca2_id}]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
response = self._run("__ZIP_COLLECTION__", history_id, inputs, assert_ok=True)
implicit_collections = response["implicit_collections"]
assert len(implicit_collections) == 1
self.dataset_populator.wait_for_job(response["jobs"][0]["id"], assert_ok=True)
zipped_hdca = self.dataset_populator.get_history_collection_details(
history_id, hid=implicit_collections[0]["hid"]
)
assert zipped_hdca["collection_type"] == "list:paired"
[docs] @skip_without_tool("__EXTRACT_DATASET__")
@skip_without_tool("cat_data_and_sleep")
def test_database_operation_tool_with_pending_inputs(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hdca1_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True
).json()["outputs"][0]["id"]
run_response = self.dataset_populator.run_tool(
tool_id="cat_data_and_sleep",
inputs={
"sleep_time": 15,
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]},
},
history_id=history_id,
)
output_hdca_id = run_response["implicit_collections"][0]["id"]
run_response = self.dataset_populator.run_tool(
tool_id="__EXTRACT_DATASET__",
inputs={
"data_collection": {"src": "hdca", "id": output_hdca_id},
},
history_id=history_id,
)
assert run_response["outputs"][0]["state"] != "ok"
[docs] @skip_without_tool("__EXTRACT_DATASET__")
def test_extract_dataset_invalid_element_identifier(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hdca1_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True
).json()["outputs"][0]["id"]
run_response = self.dataset_populator.run_tool_raw(
tool_id="__EXTRACT_DATASET__",
inputs={
"data_collection": {"src": "hdca", "id": hdca1_id},
"which": {"which_dataset": "by_index", "index": 100},
},
history_id=history_id,
input_format="21.01",
)
assert run_response.status_code == 400
assert run_response.json()["err_msg"] == "Dataset collection has no element_index with key 100."
[docs] @skip_without_tool("__FILTER_FAILED_DATASETS__")
def test_filter_failed_list(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
ok_hdca_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["0", "1", "0", "1"], wait=True
).json()["outputs"][0]["id"]
response = self.dataset_populator.run_exit_code_from_file(history_id, ok_hdca_id)
mixed_implicit_collections = response["implicit_collections"]
assert len(mixed_implicit_collections) == 1
mixed_hdca_hid = mixed_implicit_collections[0]["hid"]
mixed_hdca = self.dataset_populator.get_history_collection_details(
history_id, hid=mixed_hdca_hid, wait=False
)
def get_state(dce):
return dce["object"]["state"]
mixed_states = [get_state(_) for _ in mixed_hdca["elements"]]
assert mixed_states == ["ok", "error", "ok", "error"], mixed_states
filtered_hdca = self._run_filter(history_id=history_id, failed_hdca_id=mixed_hdca["id"])
filtered_states = [get_state(_) for _ in filtered_hdca["elements"]]
assert filtered_states == ["ok", "ok"], filtered_states
[docs] @skip_without_tool("__FILTER_FAILED_DATASETS__")
def test_filter_failed_list_paired(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
pair1 = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["0", "0"], wait=True
).json()["outputs"][0]["id"]
pair2 = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["0", "1"], wait=True
).json()["outputs"][0]["id"]
ok_hdca_id = self.dataset_collection_populator.create_list_from_pairs(history_id, [pair1, pair2]).json()[
"id"
]
response = self.dataset_populator.run_exit_code_from_file(history_id, ok_hdca_id)
mixed_implicit_collections = response["implicit_collections"]
assert len(mixed_implicit_collections) == 1
mixed_hdca_hid = mixed_implicit_collections[0]["hid"]
mixed_hdca = self.dataset_populator.get_history_collection_details(
history_id, hid=mixed_hdca_hid, wait=False
)
def get_state(dce):
return dce["object"]["state"]
mixed_states = [get_state(element) for _ in mixed_hdca["elements"] for element in _["object"]["elements"]]
assert mixed_states == ["ok", "ok", "ok", "error"], mixed_states
filtered_hdca = self._run_filter(history_id=history_id, failed_hdca_id=mixed_hdca["id"])
filtered_states = [
get_state(element) for _ in filtered_hdca["elements"] for element in _["object"]["elements"]
]
assert filtered_states == ["ok", "ok"], filtered_states
# Also try list:list:paired
llp = self.dataset_collection_populator.create_nested_collection(
history_id=history_id, collection_type="list:list:paired", collection=[mixed_hdca["id"]]
).json()
filtered_nested_hdca = self._run_filter(history_id=history_id, failed_hdca_id=llp["id"], batch=True)
filtered_states = [
get_state(element)
for _ in filtered_nested_hdca["elements"]
for parent_element in _["object"]["elements"]
for element in parent_element["object"]["elements"]
]
assert filtered_states == ["ok", "ok"], filtered_states
def _run_filter(self, history_id, failed_hdca_id, batch=False):
if batch:
inputs = {
"input": {
"batch": batch,
"values": [{"map_over_type": "list:paired", "src": "hdca", "id": failed_hdca_id}],
},
}
else:
inputs = {
"input": {"batch": batch, "src": "hdca", "id": failed_hdca_id},
}
response = self._run("__FILTER_FAILED_DATASETS__", history_id, inputs, assert_ok=False).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
filter_output_collections = response["output_collections"]
if batch:
return response["implicit_collections"][0]
assert len(filter_output_collections) == 1
filtered_hid = filter_output_collections[0]["hid"]
filtered_hdca = self.dataset_populator.get_history_collection_details(history_id, hid=filtered_hid, wait=False)
return filtered_hdca
def _apply_rules_and_check(self, example: Dict[str, Any]) -> None:
with self.dataset_populator.test_history(require_new=False) as history_id:
inputs = stage_rules_example(self.galaxy_interactor, history_id, example)
hdca = inputs["input"]
inputs = {"input": {"src": "hdca", "id": hdca["id"]}, "rules": example["rules"]}
self.dataset_populator.wait_for_history(history_id)
response = self._run("__APPLY_RULES__", history_id, inputs, assert_ok=True)
output_collections = response["output_collections"]
assert len(output_collections) == 1
output_hid = output_collections[0]["hid"]
output_hdca = self.dataset_populator.get_history_collection_details(history_id, hid=output_hid, wait=False)
example["check"](output_hdca, self.dataset_populator)
[docs] @skip_without_tool("multi_select")
def test_multi_select_as_list(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
inputs = {
"select_ex": ["--ex1", "ex2"],
}
response = self._run("multi_select", history_id, inputs, assert_ok=True)
output = response["outputs"][0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output1_content == "--ex1,ex2"
[docs] @skip_without_tool("multi_select")
def test_multi_select_optional(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
inputs = {
"select_ex": ["--ex1"],
"select_optional": None,
}
response = self._run("multi_select", history_id, inputs, assert_ok=True)
output = response["outputs"]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[0])
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[1])
assert output1_content.strip() == "--ex1"
assert output2_content.strip() == "None", output2_content
[docs] @skip_without_tool("library_data")
def test_library_data_param(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
ld = LibraryPopulator(self.galaxy_interactor).new_library_dataset("lda_test_library")
inputs = {"library_dataset": ld["ldda_id"], "library_dataset_multiple": [ld["ldda_id"], ld["ldda_id"]]}
response = self._run("library_data", history_id, inputs, assert_ok=True)
output = response["outputs"]
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[0])
assert output_content == "TestData\n", output_content
output_multiple_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[1])
assert output_multiple_content == "TestData\nTestData\n", output_multiple_content
[docs] @skip_without_tool("multi_data_param")
def test_multidata_param(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
hda1 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="1\t2\t3"))
hda2 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="4\t5\t6"))
inputs = {
"f1": {"batch": False, "values": [hda1, hda2]},
"f2": {"batch": False, "values": [hda2, hda1]},
}
response = self._run("multi_data_param", history_id, inputs, assert_ok=True)
output1 = response["outputs"][0]
output2 = response["outputs"][1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content == "1\t2\t3\n4\t5\t6\n", output1_content
assert output2_content == "4\t5\t6\n1\t2\t3\n", output2_content
[docs] @skip_without_tool("cat1")
def test_run_cat1(self):
with self.dataset_populator.test_history(require_new=False) as history_id:
# Run simple non-upload tool with an input data parameter.
new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Test")
inputs = dict(
input1=dataset_to_param(new_dataset),
)
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "Cat1Test"
[docs] @skip_without_tool("cat1")
@requires_new_history
def test_run_cat1_use_cached_job(self):
with self.dataset_populator.test_history_for(self.test_run_cat1_use_cached_job) as history_id:
# Run simple non-upload tool with an input data parameter.
new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Test")
inputs = dict(
input1=dataset_to_param(new_dataset),
)
outputs_one = self._run_cat1(history_id, inputs=inputs, assert_ok=True, wait_for_job=True)
outputs_two = self._run_cat1(
history_id, inputs=inputs, use_cached_job=False, assert_ok=True, wait_for_job=True
)
outputs_three = self._run_cat1(
history_id, inputs=inputs, use_cached_job=True, assert_ok=True, wait_for_job=True
)
dataset_details = []
for output in [outputs_one, outputs_two, outputs_three]:
output_id = output["outputs"][0]["id"]
dataset_details.append(self._get(f"datasets/{output_id}").json())
filenames = [dd["file_name"] for dd in dataset_details]
assert len(filenames) == 3, filenames
assert len(set(filenames)) <= 2, filenames
[docs] @skip_without_tool("cat1")
def test_run_cat1_listified_param(self):
with self.dataset_populator.test_history_for(self.test_run_cat1_listified_param) as history_id:
# Run simple non-upload tool with an input data parameter.
new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Testlistified")
inputs = dict(
input1=[dataset_to_param(new_dataset)],
)
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "Cat1Testlistified"
[docs] @skip_without_tool("multiple_versions")
def test_run_by_versions(self):
with self.dataset_populator.test_history_for(self.test_run_by_versions) as history_id:
for version in ["0.1", "0.2"]:
# Run simple non-upload tool with an input data parameter.
outputs = self._run_and_get_outputs(
tool_id="multiple_versions", history_id=history_id, tool_version=version
)
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == f"Version {version}"
[docs] @skip_without_tool("multiple_versions")
def test_test_by_versions(self):
test_data_response = self._get("tools/multiple_versions/test_data")
test_data_response.raise_for_status()
test_data_dicts = test_data_response.json()
assert len(test_data_dicts) == 1
assert test_data_dicts[0]["tool_version"] == "0.2"
test_data_response = self._get("tools/multiple_versions/test_data?tool_version=*")
test_data_response.raise_for_status()
test_data_dicts = test_data_response.json()
# this found a bug - tools that appear in the toolbox twice should not cause
# multiple copies of test data to be returned. This assertion broke when
# we placed multiple_versions in the test tool panel in multiple places. We need
# to fix this but it isn't as important as the existing bug.
# assert len(test_data_dicts) == 3
[docs] @skip_without_tool("multiple_versions")
def test_show_with_wrong_tool_version_in_tool_id(self):
tool_info = self._show_valid_tool("multiple_versions", tool_version="0.01")
# Return last version
assert tool_info["version"] == "0.2"
[docs] @skip_without_tool("cat1")
def test_run_cat1_single_meta_wrapper(self):
with self.dataset_populator.test_history_for(self.test_run_cat1_single_meta_wrapper) as history_id:
# Wrap input in a no-op meta parameter wrapper like Sam is planning to
# use for all UI API submissions.
new_dataset = self.dataset_populator.new_dataset(history_id, content="123")
inputs = dict(
input1={"batch": False, "values": [dataset_to_param(new_dataset)]},
)
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "123"
[docs] @skip_without_tool("cat1")
@requires_new_history
def test_guess_derived_permissions(self):
with self.dataset_populator.test_history_for(self.test_run_cat1_single_meta_wrapper) as history_id:
def assert_inputs(inputs, can_be_used=True):
# Until we make the dataset private, _different_user() can use it:
with self._different_user_and_history() as other_history_id:
response = self._run("cat1", other_history_id, inputs)
if can_be_used:
assert response.status_code == 200
else:
self._assert_dataset_permission_denied_response(response)
new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Test")
inputs = dict(
input1=dataset_to_param(new_dataset),
)
# Until we make the dataset private, _different_user() can use it:
assert_inputs(inputs, can_be_used=True)
self.dataset_populator.make_private(history_id, new_dataset["id"])
# _different_user can no longer use the input dataset.
assert_inputs(inputs, can_be_used=False)
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 1
output1 = outputs[0]
inputs_2 = dict(
input1=dataset_to_param(output1),
)
# _different_user cannot use datasets derived from the private input.
assert_inputs(inputs_2, can_be_used=False)
[docs] @skip_without_tool("collection_creates_list")
def test_guess_derived_permissions_collections(self, history_id):
def first_element_dataset_id(hdca):
# Fetch full and updated details for HDCA
print(hdca)
full_hdca = self.dataset_populator.get_history_collection_details(history_id, hid=hdca["hid"])
elements = full_hdca["elements"]
element0 = elements[0]["object"]
return element0["id"]
response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], direct_upload=True, wait=True
)
self._assert_status_code_is(response, 200)
hdca = response.json()["output_collections"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"input1": {"src": "hdca", "id": hdca["id"]},
}
public_output_response = self._run("collection_creates_list", history_id, inputs)
self._assert_status_code_is(public_output_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
input_element_id = first_element_dataset_id(hdca)
self.dataset_populator.make_private(history_id, input_element_id)
private_output_response = self._run("collection_creates_list", history_id, inputs)
self._assert_status_code_is(private_output_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
public_element_id = first_element_dataset_id(public_output_response.json()["output_collections"][0])
private_element_id = first_element_dataset_id(private_output_response.json()["output_collections"][0])
def _dataset_accessible(dataset_id):
contents_response = self._get(f"histories/{history_id}/contents/{dataset_id}").json()
return "name" in contents_response
with self._different_user():
assert _dataset_accessible(public_element_id)
assert not _dataset_accessible(private_element_id)
[docs] @skip_without_tool("validation_default")
def test_validation(self, history_id):
inputs = {
"select_param": '" ; echo "moo',
}
response = self._run("validation_default", history_id, inputs)
self._assert_status_code_is(response, 400)
[docs] @skip_without_tool("validation_empty_dataset")
def test_validation_empty_dataset(self, history_id):
outputs = self._run_and_get_outputs("empty_output", history_id)
empty_dataset = outputs[0]
inputs = {
"input1": dataset_to_param(empty_dataset),
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
response = self._run("validation_empty_dataset", history_id, inputs)
self._assert_status_code_is(response, 400)
[docs] @skip_without_tool("validation_repeat")
def test_validation_in_repeat(self, history_id):
inputs = {
"r1_0|text": "123",
"r2_0|text": "",
}
response = self._run("validation_repeat", history_id, inputs)
self._assert_status_code_is(response, 400)
[docs] @skip_without_tool("multi_select")
def test_select_legal_values(self, history_id):
inputs = {
"select_ex": "not_option",
}
response = self._run("multi_select", history_id, inputs)
self._assert_status_code_is(response, 400)
[docs] @skip_without_tool("column_param")
def test_column_legal_values(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="#col1\tcol2")
inputs = {
"input1": {"src": "hda", "id": new_dataset1["id"]},
"col": "' ; echo 'moo",
}
response = self._run("column_param", history_id, inputs)
# This needs to either fail at submit time or at job prepare time, but we have
# to make sure the job doesn't run.
if response.status_code == 200:
job = response.json()["jobs"][0]
final_job_state = self.dataset_populator.wait_for_job(job["id"])
assert final_job_state == "error"
[docs] @requires_new_history
@skip_without_tool("collection_paired_test")
def test_collection_parameter(self, history_id):
hdca_id = self._build_pair(history_id, ["123\n", "456\n"])
inputs = {
"f1": {"src": "hdca", "id": hdca_id},
}
output = self._run("collection_paired_test", history_id, inputs, assert_ok=True)
assert len(output["jobs"]) == 1
assert len(output["implicit_collections"]) == 0
assert len(output["outputs"]) == 1
contents = self.dataset_populator.get_history_dataset_content(history_id, hid=4)
assert contents.strip() == "123\n456", contents
[docs] @skip_without_tool("collection_creates_pair")
def test_paired_collection_output(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789\n0ab")
inputs = {
"input1": {"src": "hda", "id": new_dataset1["id"]},
}
# TODO: shouldn't need this wait
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_creates_pair", history_id, inputs, assert_ok=True)
output_collection = self._assert_one_job_one_collection_run(create)
element0, element1 = self._assert_elements_are(output_collection, "forward", "reverse")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self._verify_element(history_id, element0, contents="123\n789\n", file_ext="txt", visible=False)
self._verify_element(history_id, element1, contents="456\n0ab\n", file_ext="txt", visible=False)
[docs] @skip_without_tool("collection_creates_list")
def test_list_collection_output(self, history_id):
create_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
create = self.dataset_populator.run_collection_creates_list(history_id, hdca_id)
output_collection = self._assert_one_job_one_collection_run(create)
element0, element1 = self._assert_elements_are(output_collection, "data0", "data1")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self._verify_element(history_id, element0, contents="identifier is data0\n", file_ext="txt")
self._verify_element(history_id, element1, contents="identifier is data1\n", file_ext="txt")
[docs] @skip_without_tool("collection_creates_list_2")
def test_list_collection_output_format_source(self, history_id):
# test using format_source with a tool
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="#col1\tcol2")
create_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\tb\nc\td", "e\tf\ng\th"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
inputs = {
"header": {"src": "hda", "id": new_dataset1["id"]},
"input_collect": {"src": "hdca", "id": hdca_id},
}
# TODO: real problem here - shouldn't have to have this wait.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_creates_list_2", history_id, inputs, assert_ok=True)
output_collection = self._assert_one_job_one_collection_run(create)
element0, element1 = self._assert_elements_are(output_collection, "data0", "data1")
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self._verify_element(history_id, element0, contents="#col1\tcol2\na\tb\nc\td\n", file_ext="txt")
self._verify_element(history_id, element1, contents="#col1\tcol2\ne\tf\ng\th\n", file_ext="txt")
[docs] @skip_without_tool("collection_split_on_column")
def test_dynamic_list_output(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(
history_id, content="samp1\t1\nsamp1\t3\nsamp2\t2\nsamp2\t4\n"
)
inputs = {
"input1": dataset_to_param(new_dataset1),
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_split_on_column", history_id, inputs, assert_ok=True)
output_collection = self._assert_one_job_one_collection_run(create)
self._assert_has_keys(output_collection, "id", "name", "elements", "populated")
assert not output_collection["populated"]
assert len(output_collection["elements"]) == 0
assert output_collection["name"] == "Table split on first column"
self.dataset_populator.wait_for_job(create["jobs"][0]["id"], assert_ok=True)
get_collection_response = self._get(
f"dataset_collections/{output_collection['id']}", data={"instance_type": "history"}
)
self._assert_status_code_is(get_collection_response, 200)
output_collection = get_collection_response.json()
self._assert_has_keys(output_collection, "id", "name", "elements", "populated")
assert output_collection["populated"]
assert output_collection["name"] == "Table split on first column"
assert len(output_collection["elements"]) == 2
output_element_0 = output_collection["elements"][0]
assert output_element_0["element_index"] == 0
assert output_element_0["element_identifier"] == "samp1"
output_element_hda_0 = output_element_0["object"]
assert output_element_hda_0["metadata_column_types"] is not None
[docs] @skip_without_tool("collection_creates_dynamic_nested")
def test_dynamic_list_output_datasets_in_failed_state(self, history_id):
inputs = {"fail_bool": True}
create = self._run("collection_creates_dynamic_nested", history_id, inputs, assert_ok=False, wait_for_job=True)
self._assert_status_code_is(create, 200)
collection = self._get(
f"dataset_collections/{create.json()['output_collections'][0]['id']}", data={"instance_type": "history"}
).json()
assert collection["element_count"] == 3
for nested_collection in collection["elements"]:
nested_collection = nested_collection["object"]
assert nested_collection["element_count"] == 2
for element in nested_collection["elements"]:
assert element["object"]["state"] == "error"
[docs] def test_nonadmin_users_cannot_create_tools(self):
payload = dict(
representation=json.dumps(MINIMAL_TOOL),
)
create_response = self._post("dynamic_tools", data=payload, admin=False)
self._assert_status_code_is(create_response, 403)
[docs] def test_dynamic_tool_1(self):
# Create tool.
self.dataset_populator.create_tool(MINIMAL_TOOL)
# Run tool.
history_id = self.dataset_populator.new_history()
self._run("minimal_tool", history_id)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id)
assert output_content == "Hello World\n"
[docs] def test_dynamic_tool_from_path(self):
# Create tool.
dynamic_tool_path = os.path.join(
galaxy_root_path, "lib", "galaxy_test", "base", "data", "minimal_tool_no_id.json"
)
tool_response = self.dataset_populator.create_tool_from_path(dynamic_tool_path)
self._assert_has_keys(tool_response, "uuid")
# Run tool.
history_id = self.dataset_populator.new_history()
self._run(history_id=history_id, tool_uuid=tool_response["uuid"])
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id)
assert output_content == "Hello World 2\n"
[docs] def test_dynamic_tool_no_id(self):
# Create tool.
tool_response = self.dataset_populator.create_tool(MINIMAL_TOOL_NO_ID)
self._assert_has_keys(tool_response, "uuid")
# Run tool.
history_id = self.dataset_populator.new_history()
self._run(history_id=history_id, tool_uuid=tool_response["uuid"])
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id)
assert output_content == "Hello World 2\n"
[docs] def test_show_dynamic_tools(self):
# Create tool.
original_list = self.dataset_populator.list_dynamic_tools()
created_dynamic_tool_dict = self.dataset_populator.create_tool(MINIMAL_TOOL_NO_ID)
self._assert_has_keys(created_dynamic_tool_dict, "id", "uuid", "active")
created_id = created_dynamic_tool_dict["id"]
created_uuid = created_dynamic_tool_dict["uuid"]
new_list = self.dataset_populator.list_dynamic_tools()
for dynamic_tool_dict in original_list:
self._assert_has_keys(dynamic_tool_dict, "id", "uuid", "active")
assert dynamic_tool_dict["id"] != created_id
assert dynamic_tool_dict["uuid"] != created_uuid
found_id = False
found_uuid = False
for dynamic_tool_dict in new_list:
self._assert_has_keys(dynamic_tool_dict, "id", "uuid", "active")
found_id = found_id or dynamic_tool_dict["id"] == created_id
found_uuid = found_uuid or dynamic_tool_dict["uuid"] == created_uuid
assert found_id
assert found_uuid
[docs] def test_show_tool_source_admin(self):
response = self._get("tools/cat1/raw_tool_source", admin=True)
response.raise_for_status()
assert "Concatenate datasets" in response.text
assert response.headers["language"] == "xml"
[docs] def test_show_tool_source_denied(self):
with self._different_user(anon=True):
response = self._get("tools/cat1/raw_tool_source")
assert response.status_code == 403
[docs] def test_tool_deactivate(self):
# Create tool.
tool_response = self.dataset_populator.create_tool(MINIMAL_TOOL_NO_ID)
self._assert_has_keys(tool_response, "id", "uuid", "active")
assert tool_response["active"]
deactivate_response = self.dataset_populator.deactivate_dynamic_tool(tool_response["uuid"])
assert not deactivate_response["active"]
# Run tool.
history_id = self.dataset_populator.new_history()
response = self._run(history_id=history_id, tool_uuid=tool_response["uuid"], assert_ok=False)
# Get a 404 when trying to run a deactivated tool.
self._assert_status_code_is(response, 404)
[docs] @skip_without_tool("cat1")
def test_run_cat1_with_two_inputs(self, history_id):
# Run tool with an multiple data parameter and grouping (repeat)
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="Cat1Test")
new_dataset2 = self.dataset_populator.new_dataset(history_id, content="Cat2Test")
inputs = {"input1": dataset_to_param(new_dataset1), "queries_0|input2": dataset_to_param(new_dataset2)}
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "Cat1Test\nCat2Test"
[docs] @skip_without_tool("mapper_two")
def test_bam_state_regression(self, history_id):
# Test regression of https://github.com/galaxyproject/galaxy/issues/6856. With changes
# to metadata file flushing to optimize creating bam outputs and copying bam datasets
# we observed very subtle problems with HDA state changes on other files being flushed at
# the same time. This tests txt datasets finalized before and after the bam outputs as
# well as other bam files all flush properly during job completion.
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789")
inputs = {
"input1": dataset_to_param(new_dataset1),
"reference": dataset_to_param(new_dataset1),
}
outputs = self._run_and_get_outputs("mapper_two", history_id, inputs)
assert len(outputs) == 4
for output in outputs:
details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert details["state"] == "ok"
[docs] @skip_without_tool("qc_stdout")
def test_qc_messages(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789")
inputs = {
"input1": dataset_to_param(new_dataset1),
"quality": 3,
}
create = self._run("qc_stdout", history_id, inputs, wait_for_job=True, assert_ok=True)
assert "jobs" in create, create
job_id = create["jobs"][0]["id"]
details = self.dataset_populator.get_job_details(job_id, full=True).json()
assert "job_messages" in details, details
# test autogenerated message (if regex defines no description attribute)
qc_message = details["job_messages"][0]
# assert qc_message["code_desc"] == "QC Metrics for Tool", qc_message
assert qc_message["desc"] == "QC: Matched on Quality of sample is 30%."
assert qc_message["match"] == "Quality of sample is 30%."
assert qc_message["error_level"] == 1.1
# test message generated from the description containing a reference to group defined in the regex
qc_message = details["job_messages"][1]
assert qc_message["desc"] == "QC: Sample quality 30"
assert qc_message["match"] == "Quality of sample is 30%."
assert qc_message["error_level"] == 1.1
[docs] @skip_without_tool("cat1")
def test_multirun_cat1(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123")
new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456")
datasets = [dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)]
inputs = {
"input1": {
"batch": True,
"values": datasets,
},
}
self._check_cat1_multirun(history_id, inputs)
def _check_cat1_multirun(self, history_id, inputs):
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 2
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123"
assert output2_content.strip() == "456"
[docs] @skip_without_tool("random_lines1")
def test_multirun_non_data_parameter(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789")
inputs = {"input": dataset_to_param(new_dataset1), "num_lines": {"batch": True, "values": [1, 2, 3]}}
outputs = self._run_and_get_outputs("random_lines1", history_id, inputs)
# Assert we have three outputs with 1, 2, and 3 lines respectively.
assert len(outputs) == 3
outputs_contents = [
self.dataset_populator.get_history_dataset_content(history_id, dataset=o).strip() for o in outputs
]
assert sorted(len(c.split("\n")) for c in outputs_contents) == [1, 2, 3]
[docs] @skip_without_tool("cat1")
def test_multirun_in_repeat(self):
history_id, common_dataset, repeat_datasets = self._setup_repeat_multirun()
inputs = {
"input1": common_dataset,
"queries_0|input2": {"batch": True, "values": repeat_datasets},
}
self._check_repeat_multirun(history_id, inputs)
[docs] @skip_without_tool("cat1")
def test_multirun_in_repeat_mismatch(self):
history_id, common_dataset, repeat_datasets = self._setup_repeat_multirun()
inputs = {
"input1": {"batch": False, "values": [common_dataset]},
"queries_0|input2": {"batch": True, "values": repeat_datasets},
}
self._check_repeat_multirun(history_id, inputs)
[docs] @skip_without_tool("cat1")
def test_multirun_on_multiple_inputs(self):
history_id, first_two, second_two = self._setup_two_multiruns()
inputs = {
"input1": {"batch": True, "values": first_two},
"queries_0|input2": {"batch": True, "values": second_two},
}
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 2
outputs_contents = [
self.dataset_populator.get_history_dataset_content(history_id, dataset=o).strip() for o in outputs
]
assert "123\n789" in outputs_contents
assert "456\n0ab" in outputs_contents
[docs] @skip_without_tool("cat1")
def test_multirun_on_multiple_inputs_unlinked(self):
history_id, first_two, second_two = self._setup_two_multiruns()
inputs = {
"input1": {"batch": True, "linked": False, "values": first_two},
"queries_0|input2": {"batch": True, "linked": False, "values": second_two},
}
outputs = self._cat1_outputs(history_id, inputs=inputs)
outputs_contents = [
self.dataset_populator.get_history_dataset_content(history_id, dataset=o).strip() for o in outputs
]
assert len(outputs) == 4
assert "123\n789" in outputs_contents
assert "456\n0ab" in outputs_contents
assert "123\n0ab" in outputs_contents
assert "456\n789" in outputs_contents
[docs] @skip_without_tool("dbkey_output_action")
def test_dynamic_parameter_error_handling(self):
# Run test with valid index once, then supply invalid dbkey and invalid table
# entry to ensure dynamic param errors are register.
job_data_list = []
def register_job_data(job_data):
job_data_list.append(job_data)
def tool_test_case_list(inputs, required_files) -> List[ValidToolTestDict]:
return [
{
"inputs": inputs,
"outputs": {},
"required_files": required_files,
"output_collections": [],
"test_index": 0,
"tool_version": "0.1.0",
"tool_id": "dbkey_output_action",
"error": False,
}
]
tool_test_dicts = tool_test_case_list(
{
"input": ["simple_line.txt"],
"index": ["hg18_value"],
},
[["simple_line.txt", {"value": "simple_line.txt", "dbkey": "hg18"}]],
)
dynamic_param_error = None
test_driver = self.driver_or_skip_test_if_remote()
try:
test_driver.run_tool_test("dbkey_output_action", _tool_test_dicts=tool_test_dicts)
except Exception as e:
dynamic_param_error = getattr(e, "dynamic_param_error", False)
assert dynamic_param_error is None
tool_test_dicts = tool_test_case_list(
{
"input": ["simple_line.txt"],
"index": ["hg18_value"],
},
[["simple_line.txt", {"value": "simple_line.txt", "dbkey": "hgnot18"}]],
)
dynamic_param_error = None
try:
test_driver.run_tool_test("dbkey_output_action", _tool_test_dicts=tool_test_dicts)
except Exception as e:
dynamic_param_error = getattr(e, "dynamic_param_error", False)
assert dynamic_param_error
tool_test_dicts = tool_test_case_list(
{
"input": ["simple_line.txt"],
"index": ["hgnot18"],
},
[["simple_line.txt", {"value": "simple_line.txt", "dbkey": "hg18"}]],
)
dynamic_param_error = None
try:
test_driver.run_tool_test(
"dbkey_output_action", _tool_test_dicts=tool_test_dicts, register_job_data=register_job_data
)
except Exception as e:
dynamic_param_error = getattr(e, "dynamic_param_error", False)
assert dynamic_param_error
assert len(job_data_list) == 1
job_data = job_data_list[0]
assert job_data["status"] == "error"
job_data_list.clear()
dynamic_param_error = None
try:
test_driver.run_tool_test(
"dbkey_output_action",
_tool_test_dicts=tool_test_dicts,
skip_on_dynamic_param_errors=True,
register_job_data=register_job_data,
)
except Exception as e:
dynamic_param_error = getattr(e, "dynamic_param_error", False)
assert dynamic_param_error
assert len(job_data_list) == 1
job_data = job_data_list[0]
assert job_data["status"] == "skip"
def _assert_one_job_one_collection_run(self, create):
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
collections = create["output_collections"]
assert len(jobs) == 1
assert len(implicit_collections) == 0
assert len(collections) == 1
output_collection = collections[0]
return output_collection
def _assert_elements_are(self, collection, *args):
elements = collection["elements"]
assert len(elements) == len(args)
for index, element in enumerate(elements):
arg = args[index]
assert arg == element["element_identifier"]
return elements
def _verify_element(self, history_id, element, **props):
object_id = element["object"]["id"]
if "contents" in props:
expected_contents = props["contents"]
contents = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=object_id)
assert contents == expected_contents
del props["contents"]
if props:
details = self.dataset_populator.get_history_dataset_details(history_id, dataset_id=object_id)
for key, value in props.items():
assert details[key] == value
def _setup_repeat_multirun(self):
history_id = self.dataset_populator.new_history()
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123")
new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456")
common_dataset = self.dataset_populator.new_dataset(history_id, content="Common")
return (
history_id,
dataset_to_param(common_dataset),
[dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)],
)
def _check_repeat_multirun(self, history_id, inputs):
outputs = self._cat1_outputs(history_id, inputs=inputs)
assert len(outputs) == 2
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "Common\n123"
assert output2_content.strip() == "Common\n456"
def _setup_two_multiruns(self):
history_id = self.dataset_populator.new_history()
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123")
new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456")
new_dataset3 = self.dataset_populator.new_dataset(history_id, content="789")
new_dataset4 = self.dataset_populator.new_dataset(history_id, content="0ab")
return (
history_id,
[dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)],
[dataset_to_param(new_dataset3), dataset_to_param(new_dataset4)],
)
[docs] @skip_without_tool("cat")
def test_map_over_collection(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
self._run_and_check_simple_collection_mapping(history_id, inputs)
[docs] @skip_without_tool("cat1")
def test_map_over_empty_collection(self, history_id):
response = self.dataset_collection_populator.create_list_in_history(history_id, contents=[], wait=True).json()
hdca_id = response["output_collections"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
create = self._run_cat1(history_id, inputs=inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 0
assert len(outputs) == 0
assert len(implicit_collections) == 1
empty_output = implicit_collections[0]
assert empty_output["name"] == "Concatenate datasets on collection 1", empty_output
[docs] @skip_without_tool("output_action_change_format")
def test_map_over_with_output_format_actions(self, history_id):
for use_action in ["do", "dont"]:
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"input_cond|dispatch": use_action,
"input_cond|input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
create = self._run("output_action_change_format", history_id, inputs).json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(outputs) == 2
assert len(implicit_collections) == 1
output1 = outputs[0]
output2 = outputs[1]
output1_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output1)
output2_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output2)
assert output1_details["file_ext"] == "txt" if (use_action == "do") else "data"
assert output2_details["file_ext"] == "txt" if (use_action == "do") else "data"
[docs] @skip_without_tool("output_action_change_format_paired")
def test_map_over_with_nested_paired_output_format_actions(self, history_id):
hdca_id = self.__build_nested_list(history_id)
inputs = {"input": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]}}
create = self._run("output_action_change_format_paired", history_id, inputs).json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(outputs) == 2
assert len(implicit_collections) == 1
for output in outputs:
assert output["file_ext"] == "txt", output
[docs] @skip_without_tool("output_filter_with_input")
def test_map_over_with_output_filter_no_filtering(self, history_id):
hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()["outputs"][0][
"id"
]
inputs = {
"input_1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
"produce_out_1": "true",
"filter_text_1": "foo",
}
create = self._run("output_filter_with_input", history_id, inputs).json()
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 3
assert len(implicit_collections) == 3
self._check_implicit_collection_populated(create)
[docs] @skip_without_tool("output_filter_with_input_optional")
def test_map_over_with_output_filter_on_optional_input(self, history_id):
hdca_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["myinputs"], wait=True
).json()["outputs"][0]["id"]
inputs = {
"input_1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
create = self._run("output_filter_with_input_optional", history_id, inputs).json()
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
self.dataset_populator.wait_for_job(jobs[0]["id"], assert_ok=True)
assert len(implicit_collections) == 1
self._check_implicit_collection_populated(create)
[docs] @skip_without_tool("output_filter_with_input")
def test_map_over_with_output_filter_one_filtered(self, history_id):
hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()["outputs"][0][
"id"
]
inputs = {
"input_1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
"produce_out_1": "true",
"filter_text_1": "bar",
}
create = self._run("output_filter_with_input", history_id, inputs).json()
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 3
assert len(implicit_collections) == 2
self._check_implicit_collection_populated(create)
[docs] @skip_without_tool("Cut1")
def test_map_over_with_complex_output_actions(self, history_id):
hdca_id = self._bed_list(history_id)
inputs = {
"columnList": "c1,c2,c3,c4,c5",
"delimiter": "T",
"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
create = self._run("Cut1", history_id, inputs).json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(outputs) == 2
assert len(implicit_collections) == 1
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.startswith("chr1")
assert output2_content.startswith("chr1")
[docs] @skip_without_tool("collection_creates_dynamic_list_of_pairs")
def test_map_over_with_discovered_output_collection_elements(self, history_id):
hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()["outputs"][0][
"id"
]
inputs = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}}
create = self._run("collection_creates_dynamic_list_of_pairs", history_id, inputs).json()
implicit_collections = create["implicit_collections"]
assert len(implicit_collections) == 1
assert implicit_collections[0]["collection_type"] == "list:list:paired"
assert implicit_collections[0]["elements"][0]["object"]["element_count"] is None
self.dataset_populator.wait_for_job(create["jobs"][0]["id"], assert_ok=True)
hdca = self._get(f"histories/{history_id}/contents/dataset_collections/{implicit_collections[0]['id']}").json()
assert hdca["elements"][0]["object"]["elements"][0]["object"]["elements"][0]["element_identifier"] == "forward"
def _bed_list(self, history_id):
bed1_contents = open(self.get_filename("1.bed")).read()
bed2_contents = open(self.get_filename("2.bed")).read()
contents = [bed1_contents, bed2_contents]
hdca = self.dataset_collection_populator.create_list_in_history(history_id, contents=contents, wait=True).json()
return hdca["outputs"][0]["id"]
[docs] @skip_without_tool("identifier_single")
def test_identifier_in_map(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
create_response = self._run("identifier_single", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(outputs) == 2
assert len(implicit_collections) == 1
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "forward"
assert output2_content.strip() == "reverse"
[docs] @skip_without_tool("identifier_single")
def test_identifier_outside_map(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123", name="Plain HDA")
inputs = {
"input1": {"src": "hda", "id": new_dataset1["id"]},
}
create_response = self._run("identifier_single", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "Plain HDA"
[docs] @skip_without_tool("identifier_multiple")
def test_list_selectable_in_multidata_input(self, history_id):
self.dataset_collection_populator.create_list_in_history(history_id, contents=["123", "456"], wait=True)
build = self.dataset_populator.build_tool_state("identifier_multiple", history_id)
assert len(build["inputs"][0]["options"]["hdca"]) == 1
[docs] @skip_without_tool("identifier_multiple")
def test_identifier_in_multiple_reduce(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"input1": {"src": "hdca", "id": hdca_id},
}
create_response = self._run("identifier_multiple", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_in_conditional")
def test_identifier_map_over_multiple_input_in_conditional(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"outer_cond|input1": {"src": "hdca", "id": hdca_id},
}
create_response = self._run("identifier_in_conditional", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_in_conditional")
def test_identifier_map_over_multiple_input_in_conditional_new_payload_form(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"outer_cond": {
"multi_input": True,
"input1": {"id": hdca_id, "src": "hdca"},
},
}
create_response = self._run("identifier_in_conditional", history_id, inputs, input_format="21.01")
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_multiple_in_repeat")
def test_identifier_multiple_reduce_in_repeat_new_payload_form(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"the_repeat": [{"the_data": {"input1": {"src": "hdca", "id": hdca_id}}}],
}
create_response = self._run("identifier_multiple_in_repeat", history_id, inputs, input_format="21.01")
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_in_conditional")
def test_identifier_map_over_input_in_conditional(self, history_id):
# Run cat tool, so HDA names are different from element identifiers
hdca_id = self._build_pair(history_id, ["123", "456"], run_cat=True)
inputs = {
"outer_cond|input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
"outer_cond|multi_input": False,
}
create_response = self._run("identifier_in_conditional", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(outputs) == 2
assert len(implicit_collections) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward"
output2 = outputs[1]
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output2_content.strip() == "reverse"
[docs] @skip_without_tool("identifier_multiple_in_conditional")
def test_identifier_multiple_reduce_in_conditional(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"outer_cond|inner_cond|input1": {"src": "hdca", "id": hdca_id},
}
create_response = self._run("identifier_multiple_in_conditional", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_multiple_in_repeat")
def test_identifier_multiple_reduce_in_repeat(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {
"the_repeat_0|the_data|input1": {"src": "hdca", "id": hdca_id},
}
create_response = self._run("identifier_multiple_in_repeat", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_single_in_repeat")
def test_identifier_single_in_repeat(self, history_id):
hdca_id = self._build_pair(history_id, ["123", "456"])
inputs = {"the_repeat_0|the_data|input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}}
create_response = self._run("identifier_single_in_repeat", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(implicit_collections) == 1
output_collection = implicit_collections[0]
elements = output_collection["elements"]
assert len(elements) == 2
forward_output = elements[0]["object"]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=forward_output)
assert output1_content.strip() == "forward", output1_content
[docs] @skip_without_tool("identifier_multiple_in_conditional")
def test_identifier_multiple_in_conditional(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123", name="Normal HDA1")
inputs = {
"outer_cond|inner_cond|input1": {"src": "hda", "id": new_dataset1["id"]},
}
create_response = self._run("identifier_multiple_in_conditional", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "Normal HDA1"
[docs] @skip_without_tool("identifier_multiple")
def test_identifier_with_multiple_normal_datasets(self, history_id):
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123", name="Normal HDA1")
new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456", name="Normal HDA2")
inputs = {"input1": [{"src": "hda", "id": new_dataset1["id"]}, {"src": "hda", "id": new_dataset2["id"]}]}
create_response = self._run("identifier_multiple", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(outputs) == 1
assert len(implicit_collections) == 0
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "Normal HDA1\nNormal HDA2"
[docs] @skip_without_tool("identifier_collection")
def test_identifier_with_data_collection(self, history_id):
element_identifiers = self.dataset_collection_populator.list_identifiers(history_id)
payload = dict(
instance_type="history",
history_id=history_id,
element_identifiers=element_identifiers,
collection_type="list",
)
create_response = self._post("dataset_collections", payload, json=True)
dataset_collection = create_response.json()
inputs = {
"input1": {"src": "hdca", "id": dataset_collection["id"]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create_response = self._run("identifier_collection", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
jobs = create["jobs"]
assert len(jobs) == 1
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "\n".join(d["name"] for d in element_identifiers)
[docs] @skip_without_tool("identifier_in_actions")
def test_identifier_in_actions(self, history_id):
element_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents=["1\t2"])
payload = dict(
instance_type="history",
history_id=history_id,
element_identifiers=element_identifiers,
collection_type="list",
)
create_response = self._post("dataset_collections", payload, json=True)
dataset_collection = create_response.json()
inputs = {
"input": {"batch": True, "values": [{"src": "hdca", "id": dataset_collection["id"]}]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create_response = self._run("identifier_in_actions", history_id, inputs)
self._assert_status_code_is(create_response, 200)
create = create_response.json()
outputs = create["outputs"]
output1 = outputs[0]
output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output1)
assert output_details["metadata_column_names"][1] == "data1", output_details
[docs] @skip_without_tool("cat1")
def test_map_over_nested_collections(self, history_id):
hdca_id = self.__build_nested_list(history_id)
inputs = {
"input1": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]},
}
self._check_simple_cat1_over_nested_collections(history_id, inputs)
[docs] @skip_without_tool("collection_paired_structured_like")
def test_paired_input_map_over_nested_collections(self, history_id):
hdca_id = self.__build_nested_list(history_id)
inputs = {
"input1": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_paired_structured_like", history_id, inputs, assert_ok=True)
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(implicit_collections) == 1
implicit_collection = implicit_collections[0]
assert implicit_collection["collection_type"] == "list:paired", implicit_collection["collection_type"]
outer_elements = implicit_collection["elements"]
assert len(outer_elements) == 2
[docs] @skip_without_tool("collection_paired_conditional_structured_like")
def test_paired_input_conditional_map_over_nested_collections(self, history_id):
hdca_id = self.__build_nested_list(history_id)
inputs = {
"cond|cond_param": "paired",
"cond|input1": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_paired_conditional_structured_like", history_id, inputs, assert_ok=True)
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(implicit_collections) == 1
implicit_collection = implicit_collections[0]
assert implicit_collection["collection_type"] == "list:paired", implicit_collection["collection_type"]
outer_elements = implicit_collection["elements"]
assert len(outer_elements) == 2
def _check_simple_cat1_over_nested_collections(self, history_id, inputs):
create = self._run_cat1(history_id, inputs=inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 4
assert len(outputs) == 4
assert len(implicit_collections) == 1
implicit_collection = implicit_collections[0]
self._assert_has_keys(implicit_collection, "collection_type", "elements")
assert implicit_collection["collection_type"] == "list:paired"
assert len(implicit_collection["elements"]) == 2
first_element, second_element = implicit_collection["elements"]
assert first_element["element_identifier"] == "test0", first_element
assert second_element["element_identifier"] == "test1", second_element
first_object = first_element["object"]
assert first_object["collection_type"] == "paired"
assert len(first_object["elements"]) == 2
first_object_forward_element = first_object["elements"][0]
assert outputs[0]["id"] == first_object_forward_element["object"]["id"]
[docs] @skip_without_tool("cat1")
def test_map_over_two_collections(self, history_id):
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self._build_pair(history_id, ["789\n", "0ab\n"])
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]},
"queries_0|input2": {"batch": True, "values": [{"src": "hdca", "id": hdca2_id}]},
}
self._check_map_cat1_over_two_collections(history_id, inputs)
def _check_map_cat1_over_two_collections(self, history_id, inputs):
response = self._run_cat1(history_id, inputs)
self._assert_status_code_is(response, 200)
response_object = response.json()
outputs = response_object["outputs"]
assert len(outputs) == 2
output1 = outputs[0]
output2 = outputs[1]
self.dataset_populator.wait_for_history(history_id)
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123\n789"
assert output2_content.strip() == "456\n0ab"
assert len(response_object["jobs"]) == 2
assert len(response_object["implicit_collections"]) == 1
[docs] @skip_without_tool("cat1")
def test_map_over_two_collections_unlinked(self, history_id):
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self._build_pair(history_id, ["789\n", "0ab\n"])
inputs = {
"input1": {"batch": True, "linked": False, "values": [{"src": "hdca", "id": hdca1_id}]},
"queries_0|input2": {"batch": True, "linked": False, "values": [{"src": "hdca", "id": hdca2_id}]},
}
response = self._run_cat1(history_id, inputs)
self._assert_status_code_is(response, 200)
response_object = response.json()
outputs = response_object["outputs"]
assert len(outputs) == 4
assert len(response_object["jobs"]) == 4
implicit_collections = response_object["implicit_collections"]
assert len(implicit_collections) == 1
implicit_collection = implicit_collections[0]
assert implicit_collection["collection_type"] == "paired:paired"
outer_elements = implicit_collection["elements"]
assert len(outer_elements) == 2
element0, element1 = outer_elements
assert element0["element_identifier"] == "forward"
assert element1["element_identifier"] == "reverse"
elements0 = element0["object"]["elements"]
elements1 = element1["object"]["elements"]
assert len(elements0) == 2
assert len(elements1) == 2
element00, element01 = elements0
assert element00["element_identifier"] == "forward"
assert element01["element_identifier"] == "reverse"
element10, element11 = elements1
assert element10["element_identifier"] == "forward"
assert element11["element_identifier"] == "reverse"
expected_contents_list = [
(element00, "123\n789\n"),
(element01, "123\n0ab\n"),
(element10, "456\n789\n"),
(element11, "456\n0ab\n"),
]
for element, expected_contents in expected_contents_list:
dataset_id = element["object"]["id"]
contents = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=dataset_id)
assert expected_contents == contents
[docs] @skip_without_tool("cat1")
def test_map_over_collected_and_individual_datasets(self, history_id):
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
new_dataset1 = self.dataset_populator.new_dataset(history_id, content="789")
new_dataset2 = self.dataset_populator.new_dataset(history_id, content="0ab")
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]},
"queries_0|input2": {
"batch": True,
"values": [dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)],
},
}
response = self._run_cat1(history_id, inputs)
self._assert_status_code_is(response, 200)
response_object = response.json()
outputs = response_object["outputs"]
assert len(outputs) == 2
assert len(response_object["jobs"]) == 2
assert len(response_object["implicit_collections"]) == 1
[docs] @skip_without_tool("identifier_source")
def test_default_identifier_source_map_over(self):
with self.dataset_populator.test_history() as history_id:
input_a_hdca_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("A", "A content")], wait=True
).json()["outputs"][0]["id"]
input_b_hdca_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("B", "B content")], wait=True
).json()["outputs"][0]["id"]
inputs = {
"inputA": {"batch": True, "values": [dict(src="hdca", id=input_a_hdca_id)]},
"inputB": {"batch": True, "values": [dict(src="hdca", id=input_b_hdca_id)]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("identifier_source", history_id, inputs, assert_ok=True)
for implicit_collection in create["implicit_collections"]:
if implicit_collection["output_name"] == "outputA":
assert implicit_collection["elements"][0]["element_identifier"] == "A"
else:
assert implicit_collection["elements"][0]["element_identifier"] == "B"
[docs] @skip_without_tool("collection_creates_pair")
def test_map_over_collection_output(self):
with self.dataset_populator.test_history() as history_id:
create_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_creates_pair", history_id, inputs, assert_ok=True)
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
assert len(implicit_collections) == 1
implicit_collection = implicit_collections[0]
assert implicit_collection["collection_type"] == "list:paired", implicit_collection
outer_elements = implicit_collection["elements"]
assert len(outer_elements) == 2
element0, element1 = outer_elements
assert element0["element_identifier"] == "data0"
assert element1["element_identifier"] == "data1"
pair0, pair1 = element0["object"], element1["object"]
pair00, pair01 = pair0["elements"]
pair10, pair11 = pair1["elements"]
for pair in pair0, pair1:
assert "collection_type" in pair, pair
assert pair["collection_type"] == "paired", pair
pair_ids = []
for pair_element in pair00, pair01, pair10, pair11:
assert "object" in pair_element
pair_ids.append(pair_element["object"]["id"])
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
expected_contents = [
"a\nc\n",
"b\nd\n",
"e\ng\n",
"f\nh\n",
]
for i in range(4):
contents = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=pair_ids[i])
assert expected_contents[i] == contents
[docs] @skip_without_tool("cat1")
def test_cannot_map_over_incompatible_collections(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id).json()["outputs"][0]["id"]
inputs = {
"input1": {
"batch": True,
"values": [{"src": "hdca", "id": hdca1_id}],
},
"queries_0|input2": {
"batch": True,
"values": [{"src": "hdca", "id": hdca2_id}],
},
}
run_response = self._run_cat1(history_id, inputs)
# TODO: Fix this error checking once switch over to new API decorator
# on server.
assert run_response.status_code >= 400
[docs] @skip_without_tool("__FILTER_FROM_FILE__")
def test_map_over_collection_structured_like(self):
with self.dataset_populator.test_history() as history_id:
hdca_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("A", "A"), ("B", "B")], wait=True
).json()["outputs"][0]["id"]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"input": {"values": [dict(src="hdca", id=hdca_id)]},
"how|filter_source": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]},
}
implicit_collections = self._run("__FILTER_FROM_FILE__", history_id, inputs, assert_ok=True)[
"implicit_collections"
]
discarded_collection, filtered_collection = implicit_collections
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
history_contents = self.dataset_populator._get_contents_request(history_id).json()
# We should have a final collection count of 3 (2 nested collections, plus the input collection)
new_collections = (
len([c for c in history_contents if c["history_content_type"] == "dataset_collection"]) - 1
)
assert (
new_collections == 2
), f"Expected to generate 4 new, filtered collections, but got {new_collections} collections"
assert (
filtered_collection["collection_type"] == discarded_collection["collection_type"] == "list:list"
), filtered_collection
collection_details = self.dataset_populator.get_history_collection_details(
history_id, hid=filtered_collection["hid"]
)
assert collection_details["element_count"] == 2
first_collection_level = collection_details["elements"][0]
assert first_collection_level["element_type"] == "dataset_collection"
second_collection_level = first_collection_level["object"]
assert second_collection_level["collection_type"] == "list"
assert second_collection_level["elements"][0]["element_type"] == "hda"
[docs] @skip_without_tool("collection_type_source")
def test_map_over_collection_type_source(self):
with self.dataset_populator.test_history() as history_id:
hdca_id = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("A", "A"), ("B", "B")], wait=True
).json()["outputs"][0]["id"]
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"input_collect": {"values": [dict(src="hdca", id=hdca_id)]},
"header": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]},
}
self._run("collection_type_source", history_id, inputs, assert_ok=True, wait_for_job=True)
collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=4)
assert collection_details["elements"][0]["object"]["elements"][0]["element_type"] == "hda"
[docs] @skip_without_tool("multi_data_param")
def test_reduce_collections_legacy(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()[
"outputs"
][0]["id"]
inputs = {
"f1": f"__collection_reduce__|{hdca1_id}",
"f2": f"__collection_reduce__|{hdca2_id}",
}
self._check_simple_reduce_job(history_id, inputs)
[docs] @skip_without_tool("multi_data_param")
def test_reduce_collections(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()[
"outputs"
][0]["id"]
inputs = {
"f1": {"src": "hdca", "id": hdca1_id},
"f2": {"src": "hdca", "id": hdca2_id},
}
self._check_simple_reduce_job(history_id, inputs)
[docs] @skip_without_tool("multi_data_param")
def test_implicit_reduce_with_mapping(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id, wait=True).json()[
"id"
]
inputs = {
"f1": {"src": "hdca", "id": hdca1_id},
"f2": {
"batch": True,
"values": [{"src": "hdca", "map_over_type": "list", "id": hdca2_id}],
},
}
create = self._run("multi_data_param", history_id, inputs, assert_ok=True)
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 1
assert len(implicit_collections) == 2
output_hdca = self.dataset_populator.get_history_collection_details(
history_id, hid=implicit_collections[0]["hid"]
)
assert output_hdca["collection_type"] == "list"
[docs] @skip_without_tool("column_multi_param")
def test_implicit_conversion_and_reduce(self):
with self.dataset_populator.test_history() as history_id:
self._run_implicit_collection_and_reduce(history_id=history_id, param="1")
[docs] @skip_without_tool("column_multi_param")
def test_implicit_conversion_and_reduce_invalid_param(self):
with self.dataset_populator.test_history() as history_id:
with pytest.raises(AssertionError):
self._run_implicit_collection_and_reduce(history_id=history_id, param="X")
details = self.dataset_populator.get_history_dataset_details(history_id=history_id, hid=3, assert_ok=False)
assert details["state"] == "error"
assert "parameter 'col': an invalid option" in details["misc_info"]
def _run_implicit_collection_and_reduce(self, history_id, param):
fasta_path = self.test_data_resolver.get_filename("1.fasta")
with open(fasta_path) as fasta_fh:
fasta_content = fasta_fh.read()
response = self.dataset_collection_populator.upload_collection(
history_id,
"list",
elements=[
{
"name": "test0",
"src": "pasted",
"paste_content": fasta_content,
"ext": "fasta",
}
],
wait=True,
)
self._assert_status_code_is(response, 200)
hdca_id = response.json()["outputs"][0]["id"]
inputs = {
"input1": {"src": "hdca", "id": hdca_id},
"col": param,
}
create = self._run("column_multi_param", history_id, inputs, assert_ok=True)
jobs = create["jobs"]
assert len(jobs) == 1
content = self.dataset_populator.get_history_dataset_content(history_id, hid=3)
assert content.strip() == "hg17", content
[docs] @skip_without_tool("multi_data_repeat")
def test_reduce_collections_in_repeat(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
inputs = {
"outer_repeat_0|f1": {"src": "hdca", "id": hdca1_id},
}
create = self._run("multi_data_repeat", history_id, inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
assert len(jobs) == 1
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "123\n456", output1_content
[docs] @skip_without_tool("multi_data_repeat")
def test_reduce_collections_in_repeat_legacy(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
inputs = {
"outer_repeat_0|f1": f"__collection_reduce__|{hdca1_id}",
}
create = self._run("multi_data_repeat", history_id, inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
assert len(jobs) == 1
assert len(outputs) == 1
output1 = outputs[0]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
assert output1_content.strip() == "123\n456", output1_content
[docs] @skip_without_tool("multi_data_param")
def test_reduce_multiple_lists_on_multi_data(self):
with self.dataset_populator.test_history() as history_id:
hdca1_id = self._build_pair(history_id, ["123\n", "456\n"])
hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()[
"outputs"
][0]["id"]
inputs = {
"f1": [{"src": "hdca", "id": hdca1_id}, {"src": "hdca", "id": hdca2_id}],
"f2": [{"src": "hdca", "id": hdca1_id}],
}
create = self._run("multi_data_param", history_id, inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
assert len(jobs) == 1
assert len(outputs) == 2
output1, output2 = outputs
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123\n456\nTestData123\nTestData123\nTestData123"
assert output2_content.strip() == "123\n456"
def _check_simple_reduce_job(self, history_id, inputs):
create = self._run("multi_data_param", history_id, inputs, assert_ok=True)
outputs = create["outputs"]
jobs = create["jobs"]
assert len(jobs) == 1
assert len(outputs) == 2
output1, output2 = outputs
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123\n456", output1_content
assert len(output2_content.strip().split("\n")) == 3, output2_content
[docs] @skip_without_tool("collection_paired_test")
def test_subcollection_mapping(self):
with self.dataset_populator.test_history() as history_id:
hdca_list_id = self.__build_nested_list(history_id)
inputs = {
"f1": {
"batch": True,
"values": [{"src": "hdca", "map_over_type": "paired", "id": hdca_list_id}],
}
}
self._check_simple_subcollection_mapping(history_id, inputs)
def _check_simple_subcollection_mapping(self, history_id, inputs):
# Following wait not really needed - just getting so many database
# locked errors with sqlite.
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
outputs = self._run_and_get_outputs("collection_paired_test", history_id, inputs)
assert len(outputs), 2
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123\n456", output1_content
assert output2_content.strip() == "789\n0ab", output2_content
[docs] @skip_without_tool("collection_mixed_param")
def test_combined_mapping_and_subcollection_mapping(self):
with self.dataset_populator.test_history() as history_id:
nested_list_id = self.__build_nested_list(history_id)
create_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["xxx\n", "yyy\n"], wait=True
)
list_id = create_response.json()["outputs"][0]["id"]
inputs = {
"f1": {
"batch": True,
"values": [{"src": "hdca", "map_over_type": "paired", "id": nested_list_id}],
},
"f2": {
"batch": True,
"values": [{"src": "hdca", "id": list_id}],
},
}
self._check_combined_mapping_and_subcollection_mapping(history_id, inputs)
def _check_combined_mapping_and_subcollection_mapping(self, history_id, inputs):
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
outputs = self._run_and_get_outputs("collection_mixed_param", history_id, inputs)
assert len(outputs), 2
output1 = outputs[0]
output2 = outputs[1]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1)
output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2)
assert output1_content.strip() == "123\n456\nxxx", output1_content
assert output2_content.strip() == "789\n0ab\nyyy", output2_content
def _check_implicit_collection_populated(self, run_response):
implicit_collections = run_response["implicit_collections"]
assert implicit_collections
for implicit_collection in implicit_collections:
assert implicit_collection["populated_state"] == "ok"
def _cat1_outputs(self, history_id, inputs):
return self._run_outputs(self._run_cat1(history_id, inputs))
def _run_and_get_outputs(self, tool_id, history_id, inputs=None, tool_version=None):
if inputs is None:
inputs = {}
return self._run_outputs(self._run(tool_id, history_id, inputs, tool_version=tool_version))
def _run_outputs(self, create_response):
self._assert_status_code_is(create_response, 200)
return create_response.json()["outputs"]
def _run_cat1(self, history_id, inputs, assert_ok=False, **kwargs):
return self._run("cat1", history_id, inputs, assert_ok=assert_ok, **kwargs)
def __tool_ids(self):
index = self._get("tool_panels/default")
tools_index = index.json()
# In panels by default, so flatten out sections...
tool_ids = []
for id, tool_or_section in tools_index.items():
if "tools" in tool_or_section:
tool_ids.extend([t for t in tool_or_section["tools"] if isinstance(t, str)])
else:
tool_ids.append(id)
return tool_ids
[docs] @skip_without_tool("collection_cat_group_tag_multiple")
def test_group_tag_selection(self, history_id):
input_hdca_id = self.__build_group_list(history_id)
inputs = {
"input1": {"src": "hdca", "id": input_hdca_id},
"group": "condition:treated",
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
response = self._run("collection_cat_group_tag", history_id, inputs, assert_ok=True)
outputs = response["outputs"]
assert len(outputs) == 1
output = outputs[0]
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.strip() == "123\n456"
[docs] @skip_without_tool("collection_cat_group_tag_multiple")
def test_group_tag_selection_multiple(self, history_id):
input_hdca_id = self.__build_group_list(history_id)
inputs = {
"input1": {"src": "hdca", "id": input_hdca_id},
"groups": "condition:treated,type:single",
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
response = self._run("collection_cat_group_tag_multiple", history_id, inputs, assert_ok=True)
outputs = response["outputs"]
assert len(outputs) == 1
output = outputs[0]
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.strip() == "123\n456\n456\n0ab"
[docs] @skip_without_tool("expression_forty_two")
def test_galaxy_expression_tool_simplest(self):
history_id = self.dataset_populator.new_history()
run_response = self._run("expression_forty_two", history_id)
self._assert_status_code_is(run_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id)
assert output_content == "42"
[docs] @skip_without_tool("expression_parse_int")
def test_galaxy_expression_tool_simple(self):
history_id = self.dataset_populator.new_history()
inputs = {
"input1": "7",
}
run_response = self._run("expression_parse_int", history_id, inputs)
self._assert_status_code_is(run_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id)
assert output_content == "7"
[docs] @skip_without_tool("expression_log_line_count")
def test_galaxy_expression_metadata(self):
history_id = self.dataset_populator.new_history()
new_dataset1 = self.dataset_populator.new_dataset(
history_id, content="1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14"
)
inputs = {
"input1": dataset_to_param(new_dataset1),
}
run_response = self._run("expression_log_line_count", history_id, inputs)
self._assert_status_code_is(run_response, 200)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
output_content = self.dataset_populator.get_history_dataset_content(history_id)
assert output_content == "3"
[docs] @skip_without_tool("cat1")
def test_run_deferred_dataset(self, history_id):
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
)
inputs = {
"input1": dataset_to_param(details),
}
outputs = self._cat1_outputs(history_id, inputs=inputs)
output = outputs[0]
details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=output, wait=True, assert_ok=True
)
assert details["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.startswith("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
[docs] @skip_without_tool("metadata_bam")
def test_run_deferred_dataset_with_metadata_options_filter(self, history_id):
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam"
)
inputs = {"input_bam": dataset_to_param(details), "ref_names": "chrM"}
run_response = self.dataset_populator.run_tool(tool_id="metadata_bam", inputs=inputs, history_id=history_id)
output = run_response["outputs"][0]
output_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=output, wait=True, assert_ok=True
)
assert output_details["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.startswith("chrM")
[docs] @skip_without_tool("pileup")
def test_metadata_validator_on_deferred_input(self, history_id):
deferred_bam_details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam"
)
fasta1_contents = open(self.get_filename("1.fasta")).read()
fasta = self.dataset_populator.new_dataset(history_id, content=fasta1_contents)
inputs = {"input1": dataset_to_param(deferred_bam_details), "reference": dataset_to_param(fasta)}
run_response = self.dataset_populator.run_tool(tool_id="pileup", inputs=inputs, history_id=history_id)
self.dataset_populator.wait_for_job(run_response["jobs"][0]["id"], assert_ok=True)
[docs] @pytest.mark.xfail
@skip_without_tool("pileup")
def test_metadata_validator_can_fail_on_deferred_input(self, history_id):
# This test fails because we just skip the validator
# Fixing this is a TODO
deferred_bam_details = self.dataset_populator.create_deferred_hda(
history_id,
"https://github.com/galaxyproject/galaxy/blob/dev/test-data/3unsorted.bam?raw=true",
ext="unsorted.bam",
)
fasta1_contents = open(self.get_filename("1.fasta")).read()
fasta = self.dataset_populator.new_dataset(history_id, content=fasta1_contents)
inputs = {"input1": dataset_to_param(deferred_bam_details), "reference": dataset_to_param(fasta)}
run_response = self.dataset_populator.run_tool(tool_id="pileup", inputs=inputs, history_id=history_id)
self.dataset_populator.wait_for_job(run_response["jobs"][0]["id"], assert_ok=False)
job_id = run_response["jobs"][0]["id"]
job_details = self.dataset_populator.get_job_details(job_id=job_id).json()
assert job_details["state"] == "failed"
[docs] @skip_without_tool("cat1")
def test_run_deferred_mapping(self, history_id: str):
elements = [
{
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed",
"info": "my cool bed",
"deferred": True,
"ext": "bed",
}
]
targets = [
{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": "list",
"name": "here is a name",
}
]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload, wait=True)
dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = dataset_collection["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
run_response = self._run_cat1(history_id, inputs).json()
hdca_id = run_response["implicit_collections"][0]["id"]
dataset_collection = self.dataset_populator.get_history_collection_details(history_id, id=hdca_id)
elements = dataset_collection["elements"]
assert len(elements) == 1
object_0 = elements[0]["object"]
assert isinstance(object_0, dict)
assert object_0["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=object_0)
assert output_content.startswith("chr22 30128507 31828507 uc003bnx.1_cds_2_0_chr22_29227_f 0 +")
[docs] @skip_without_tool("cat_list")
def test_run_deferred_list_multi_data_reduction(self, history_id: str):
elements = [
{
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed",
"info": "my cool bed",
"deferred": True,
"ext": "bed",
}
]
targets = [
{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": "list",
"name": "here is a name",
}
]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload, wait=True)
dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = dataset_collection["id"]
inputs = {
"input1": {"src": "hdca", "id": hdca_id},
}
run_response = self._run("cat_list", history_id, inputs, assert_ok=True)
output_dataset = run_response["outputs"][0]
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output_dataset)
assert output_content.startswith("chr22 30128507 31828507 uc003bnx.1_cds_2_0_chr22_29227_f 0 +")
[docs] @skip_without_tool("cat_list")
def test_run_deferred_nested_list_input(self, history_id: str):
elements = [
{
"name": "outer",
"elements": [
{
"src": "url",
"name": "forward",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed",
"info": "my cool bed 4",
"deferred": True,
"ext": "bed",
},
{
"src": "url",
"name": "reverse",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed",
"info": "my cool bed 1",
"deferred": True,
"ext": "bed",
},
],
},
]
targets = [
{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": "list:paired",
"name": "here is a name",
}
]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload, wait=True)
dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
assert dataset_collection["collection_type"] == "list:paired"
result_elements = dataset_collection["elements"]
assert len(result_elements) == 1
outer_element = result_elements[0]
assert isinstance(outer_element, dict)
inner_dataset_coll = outer_element["object"]
assert isinstance(inner_dataset_coll, dict)
inner_elements = inner_dataset_coll["elements"]
assert isinstance(inner_elements, list)
assert len(inner_elements) == 2
hdca_id = dataset_collection["id"]
inputs = {
"input1": {"src": "hdca", "id": hdca_id},
}
run_response = self._run("collection_nested_test", history_id, inputs, assert_ok=True, wait_for_job=True)
output_dataset = run_response["outputs"][1]
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output_dataset)
assert output_content.startswith("chr22 30128507 31828507 uc003bnx.1_cds_2_0_chr22_29227_f 0 +")
[docs] @skip_without_tool("collection_paired_structured_like")
def test_deferred_map_over_nested_collections(self, history_id):
elements = [
{
"name": "outer1",
"elements": [
{
"src": "url",
"name": "forward",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed",
"info": "my cool bed 4",
"deferred": True,
"ext": "bed",
},
{
"src": "url",
"name": "reverse",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed",
"info": "my cool bed 1",
"deferred": True,
"ext": "bed",
},
],
},
{
"name": "outer2",
"elements": [
{
"src": "url",
"name": "forward",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed",
"info": "my cool bed 4",
"deferred": True,
"ext": "bed",
},
{
"src": "url",
"name": "reverse",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed",
"info": "my cool bed 1",
"deferred": True,
"ext": "bed",
},
],
},
]
targets = [
{
"destination": {"type": "hdca"},
"elements": elements,
"collection_type": "list:paired",
"name": "here is a name",
}
]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload, wait=True)
dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = dataset_collection["id"]
inputs = {
"input1": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]},
}
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
create = self._run("collection_paired_structured_like", history_id, inputs, assert_ok=True)
jobs = create["jobs"]
implicit_collections = create["implicit_collections"]
assert len(jobs) == 2
self.dataset_populator.wait_for_jobs(jobs, assert_ok=True)
assert len(implicit_collections) == 1
implicit_collection = implicit_collections[0]
assert implicit_collection["collection_type"] == "list:paired", implicit_collection["collection_type"]
outer_elements = implicit_collection["elements"]
assert len(outer_elements) == 2
element0 = outer_elements[0]
pair1 = element0["object"]
hda = pair1["elements"][0]["object"]
output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=hda, wait=False)
assert output1_content.startswith("chr22\t30128507\t31828507\tuc003bnx.1_cds_2_0_chr22_29227_f\t0\t+")
def __build_group_list(self, history_id):
response = self.dataset_collection_populator.upload_collection(
history_id,
"list",
elements=[
{
"name": "test0",
"src": "pasted",
"paste_content": "123\n",
"ext": "txt",
"tags": ["group:type:paired-end", "group:condition:treated"],
},
{
"name": "test1",
"src": "pasted",
"paste_content": "456\n",
"ext": "txt",
"tags": ["group:type:single", "group:condition:treated"],
},
{
"name": "test2",
"src": "pasted",
"paste_content": "789\n",
"ext": "txt",
"tags": ["group:type:paired-end", "group:condition:untreated"],
},
{
"name": "test3",
"src": "pasted",
"paste_content": "0ab\n",
"ext": "txt",
"tags": ["group:type:single", "group:condition:untreated"],
},
],
wait=True,
)
self._assert_status_code_is(response, 200)
hdca_list_id = response.json()["outputs"][0]["id"]
return hdca_list_id
def __build_nested_list(self, history_id):
response = self.dataset_collection_populator.upload_collection(
history_id,
"list:paired",
elements=[
{
"name": "test0",
"elements": [
{"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "txt"},
{"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "txt"},
],
},
{
"name": "test1",
"elements": [
{"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "txt"},
{"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "txt"},
],
},
],
wait=True,
)
self._assert_status_code_is(response, 200)
hdca_list_id = response.json()["outputs"][0]["id"]
return hdca_list_id
def _build_pair(self, history_id, contents, run_cat=False):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=contents, direct_upload=True, wait=True
)
hdca_id = create_response.json()["output_collections"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]},
}
if run_cat:
outputs = self._run_cat(history_id, inputs=inputs, assert_ok=True)
hdca_id = outputs["implicit_collections"][0]["id"]
return hdca_id
def _assert_dataset_permission_denied_response(self, response):
# TODO: This should be 403, should just need to throw more specific exception in the
# Galaxy code.
assert response.status_code != 200
# err_message = response.json()["err_msg"]
# assert "User does not have permission to use a dataset" in err_message, err_message
@contextlib.contextmanager
def _different_user_and_history(self):
with self._different_user():
with self.dataset_populator.test_history() as other_history_id:
yield other_history_id