Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_tools

# Test tools API.
import contextlib
import json
import os
import zipfile
from io import BytesIO
from typing import (
    Any,
    Dict,
    List,
)

import pytest
from requests import (
    get,
    put,
)

from galaxy.tool_util.verify.interactor import ValidToolTestDict
from galaxy.util import galaxy_root_path
from galaxy.util.unittest_utils import skip_if_github_down
from galaxy_test.base import rules_test_data
from galaxy_test.base.api_asserts import (
    assert_has_keys,
    assert_status_code_is,
)
from galaxy_test.base.decorators import requires_new_history
from galaxy_test.base.populators import (
    BaseDatasetCollectionPopulator,
    DatasetCollectionPopulator,
    DatasetPopulator,
    LibraryPopulator,
    skip_without_tool,
    stage_rules_example,
)
from ._framework import ApiTestCase

MINIMAL_TOOL = {
    "id": "minimal_tool",
    "name": "Minimal Tool",
    "class": "GalaxyTool",
    "version": "1.0.0",
    "command": "echo 'Hello World' > $output1",
    "inputs": [],
    "outputs": dict(
        output1=dict(format="txt"),
    ),
}
MINIMAL_TOOL_NO_ID = {
    "name": "Minimal Tool",
    "class": "GalaxyTool",
    "version": "1.0.0",
    "command": "echo 'Hello World 2' > $output1",
    "inputs": [],
    "outputs": dict(
        output1=dict(format="txt"),
    ),
}


[docs]class TestsTools: dataset_populator: DatasetPopulator dataset_collection_populator: BaseDatasetCollectionPopulator def _build_pair(self, history_id, contents): create_response = self.dataset_collection_populator.create_pair_in_history( history_id, contents=contents, direct_upload=True, wait=True, ) hdca_id = create_response.json()["outputs"][0]["id"] return hdca_id def _run_and_check_simple_collection_mapping(self, history_id, inputs): create = self._run_cat(history_id, inputs=inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(outputs) == 2 assert len(implicit_collections) == 1 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123" assert output2_content.strip() == "456" def _run_cat(self, history_id, inputs, assert_ok=False, **kwargs): return self._run("cat", history_id, inputs, assert_ok=assert_ok, **kwargs) def _run( self, tool_id=None, history_id=None, inputs=None, tool_uuid=None, assert_ok=False, tool_version=None, use_cached_job=False, wait_for_job=False, input_format="legacy", ): if inputs is None: inputs = {} if tool_id is None: assert tool_uuid is not None payload = self.dataset_populator.run_tool_payload( tool_id=tool_id, inputs=inputs, history_id=history_id, input_format=input_format, ) if tool_uuid: payload["tool_uuid"] = tool_uuid if tool_version is not None: payload["tool_version"] = tool_version if use_cached_job: payload["use_cached_job"] = True create_response = self.dataset_populator._post("tools", data=payload) if wait_for_job: self.dataset_populator.wait_for_job(job_id=create_response.json()["jobs"][0]["id"]) if assert_ok: assert_status_code_is(create_response, 200) create = create_response.json() assert_has_keys(create, "outputs") return create else: return create_response
[docs]class TestToolsApi(ApiTestCase, TestsTools): dataset_populator: DatasetPopulator
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_index(self): tool_ids = self.__tool_ids() assert "upload1" in tool_ids
[docs] @skip_without_tool("cat1") def test_search_cat(self): url = self._api_url("tools") payload = dict(q="concat") get_response = get(url, payload).json() assert "cat1" in get_response
[docs] @skip_without_tool("trimmer") def test_search_trimmer(self): url = self._api_url("tools") payload = dict(q="leading or trailing characters") get_response = get(url, payload).json() assert "trimmer" in get_response
[docs] @skip_without_tool("Grep1") def test_search_grep(self): url = self._api_url("tools") payload = dict(q="Select lines that match an expression") get_response = get(url, payload).json() assert "Grep1" in get_response
[docs] def test_no_panel_index(self): index = self._get("tools", data=dict(in_panel=False)) tools_index = index.json() # No need to flatten out sections, with in_panel=False, only tools are # returned. tool_ids = [_["id"] for _ in tools_index] assert "upload1" in tool_ids
[docs] @skip_without_tool("test_sam_to_bam_conversions") def test_requirements(self): requirements_response = self._get("tools/test_sam_to_bam_conversions/requirements", admin=True) self._assert_status_code_is_ok(requirements_response) requirements = requirements_response.json() assert len(requirements) == 1, requirements requirement = requirements[0] assert requirement["name"] == "samtools", requirement
[docs] @skip_without_tool("cat1") def test_show_repeat(self): tool_info = self._show_valid_tool("cat1") parameters = tool_info["inputs"] assert len(parameters) == 2, f"Expected two inputs - got [{parameters}]" assert parameters[0]["name"] == "input1" assert parameters[1]["name"] == "queries" repeat_info = parameters[1] self._assert_has_keys(repeat_info, "min", "max", "title", "help") repeat_params = repeat_info["inputs"] assert len(repeat_params) == 1 assert repeat_params[0]["name"] == "input2"
[docs] @skip_without_tool("random_lines1") def test_show_conditional(self): tool_info = self._show_valid_tool("random_lines1") cond_info = tool_info["inputs"][2] self._assert_has_keys(cond_info, "cases", "test_param") self._assert_has_keys(cond_info["test_param"], "name", "type", "label", "help") cases = cond_info["cases"] assert len(cases) == 2 case1 = cases[0] self._assert_has_keys(case1, "value", "inputs") assert case1["value"] == "no_seed" assert len(case1["inputs"]) == 0 case2 = cases[1] self._assert_has_keys(case2, "value", "inputs") case2_inputs = case2["inputs"] assert len(case2_inputs) == 1 self._assert_has_keys(case2_inputs[0], "name", "type", "label", "help", "argument") assert case2_inputs[0]["name"] == "seed"
[docs] @skip_without_tool("multi_data_param") def test_show_multi_data(self): tool_info = self._show_valid_tool("multi_data_param") f1_info, f2_info = tool_info["inputs"][0], tool_info["inputs"][1] self._assert_has_keys(f1_info, "min", "max") assert f1_info["min"] == 1 assert f1_info["max"] == 1235 self._assert_has_keys(f2_info, "min", "max") assert f2_info["min"] is None assert f2_info["max"] is None
[docs] @skip_without_tool("collection_creates_list") def test_show_output_collection(self): tool_info = self._show_valid_tool("collection_creates_list") outputs = tool_info["outputs"] assert len(outputs) == 1 output = outputs[0] assert output["label"] == "Duplicate List" assert output["inherit_format"] is True
[docs] @skip_without_tool("test_data_source") def test_data_source_build_request(self): with self.dataset_populator.test_history() as history_id: build = self.dataset_populator.build_tool_state("test_data_source", history_id) galaxy_url_param = build["inputs"][0] assert galaxy_url_param["name"] == "GALAXY_URL" galaxy_url = galaxy_url_param["value"] assert galaxy_url.startswith("http") assert galaxy_url.endswith("tool_runner?tool_id=ratmine")
[docs] @skip_without_tool("cheetah_problem_unbound_var_input") def test_legacy_biotools_xref_injection(self): url = self._api_url("tools/cheetah_problem_unbound_var_input") get_response = get(url) get_response.raise_for_status() get_json = get_response.json() assert "xrefs" in get_json assert len(get_json["xrefs"]) == 1 xref = get_json["xrefs"][0] assert xref["reftype"] == "bio.tools" assert xref["value"] == "bwa"
[docs] @skip_without_tool("test_data_source") @skip_if_github_down def test_data_source_ok_request(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.run_tool_payload( tool_id="test_data_source", inputs={ "URL": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", "URL_method": "get", "data_type": "bed", }, history_id=history_id, ) create_response = self._post("tools", data=payload) self._assert_status_code_is(create_response, 200) create_object = create_response.json() self._assert_has_keys(create_object, "outputs") assert len(create_object["outputs"]) == 1 output = create_object["outputs"][0] self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output) assert output_content.startswith("chr1\t147962192\t147962580") output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) assert output_details["file_ext"] == "bed"
[docs] @skip_without_tool("test_data_source") def test_data_source_sniff_fastqsanger(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.run_tool_payload( tool_id="test_data_source", inputs={ "URL": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.fastqsanger.gz", "URL_method": "get", }, history_id=history_id, ) create_response = self._post("tools", data=payload) self._assert_status_code_is(create_response, 200) create_object = create_response.json() self._assert_has_keys(create_object, "outputs") assert len(create_object["outputs"]) == 1 output = create_object["outputs"][0] self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) assert output_details["file_ext"] == "fastqsanger.gz", output_details
[docs] @skip_without_tool("test_data_source") def test_data_sources_block_file_parameters(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.run_tool_payload( tool_id="test_data_source", inputs={ "URL": f"file://{os.path.join(os.getcwd(), 'README.rst')}", "URL_method": "get", "data_type": "bed", }, history_id=history_id, ) create_response = self._post("tools", data=payload) self._assert_status_code_is(create_response, 200) create_object = create_response.json() self._assert_has_keys(create_object, "outputs") assert len(create_object["outputs"]) == 1 output = create_object["outputs"][0] self.dataset_populator.wait_for_history(history_id, assert_ok=False) output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output, wait=False) assert output_details["state"] == "error", output_details assert "has not sent back a URL parameter" in output_details["misc_info"], output_details
def _show_valid_tool(self, tool_id, tool_version=None): data = dict(io_details=True) if tool_version: data["tool_version"] = tool_version tool_show_response = self._get(f"tools/{tool_id}", data=data) self._assert_status_code_is(tool_show_response, 200) tool_info = tool_show_response.json() self._assert_has_keys(tool_info, "inputs", "outputs", "panel_section_id") return tool_info
[docs] @skip_without_tool("model_attributes") def test_model_attributes_sanitization(self): with self.dataset_populator.test_history(require_new=False) as history_id: cool_name_with_quote = 'cool name with a quo"te' cool_name_without_quote = "cool name with a quo__dq__te" current_user = self._get("users/current").json() user_info_url = self._api_url(f"users/{current_user['id']}/information/inputs", use_key=True) put_response = put(user_info_url, data=json.dumps({"address_0|desc": cool_name_with_quote})) put_response.raise_for_status() response = get(user_info_url).json() assert len(response["addresses"]) == 1 assert response["addresses"][0]["desc"] == cool_name_with_quote hda1 = self.dataset_populator.new_dataset(history_id, content="1\t2\t3", name=cool_name_with_quote) assert hda1["name"] == cool_name_with_quote rval = self._run( tool_id="model_attributes", inputs={"input1": dataset_to_param(hda1)}, history_id=history_id, assert_ok=True, wait_for_job=True, ) sanitized_dataset_name = self.dataset_populator.get_history_dataset_content( history_id, dataset=rval["outputs"][0] ) assert sanitized_dataset_name.strip() == cool_name_without_quote sanitized_email = self.dataset_populator.get_history_dataset_content(history_id, dataset=rval["outputs"][1]) assert '"' not in sanitized_email sanitized_address = self.dataset_populator.get_history_dataset_content( history_id, dataset=rval["outputs"][2] ) assert sanitized_address.strip() == cool_name_without_quote
[docs] @skip_without_tool("composite_output") def test_test_data_filepath_security(self): test_data_response = self._get("tools/composite_output/test_data_path?filename=../CONTRIBUTORS.md", admin=True) assert test_data_response.status_code == 404, test_data_response.text
[docs] @skip_without_tool("composite_output") def test_test_data_admin_security(self): test_data_response = self._get("tools/composite_output/test_data_path?filename=../CONTRIBUTORS.md") assert test_data_response.status_code == 403, test_data_response.text
[docs] @skip_without_tool("dbkey_filter_multi_input") def test_data_table_requirement_annotated(self): test_data_response = self._get("tools/dbkey_filter_multi_input/test_data") assert test_data_response.status_code == 200 test_case = test_data_response.json()[0] assert test_case["required_data_tables"][0] == "test_fasta_indexes" assert len(test_case["required_loc_files"]) == 0
[docs] @skip_without_tool("composite_output") def test_test_data_composite_output(self): test_data_response = self._get("tools/composite_output/test_data") assert test_data_response.status_code == 200 test_data = test_data_response.json() assert len(test_data) == 1 test_case = test_data[0] self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files") assert len(test_case["inputs"]) == 1, test_case
# input0 = next(iter(test_case["inputs"].values()))
[docs] @skip_without_tool("collection_two_paired") def test_test_data_collection_two_paired(self): test_data_response = self._get("tools/collection_two_paired/test_data") assert test_data_response.status_code == 200 test_data = test_data_response.json() assert len(test_data) == 2 test_case = test_data[0] assert len(test_case["required_data_tables"]) == 0 assert len(test_case["required_loc_files"]) == 0 self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files") assert len(test_case["inputs"]) == 3, test_case
[docs] @skip_without_tool("collection_nested_test") def test_test_data_collection_nested(self): test_data_response = self._get("tools/collection_nested_test/test_data") assert test_data_response.status_code == 200 test_data = test_data_response.json() assert len(test_data) == 2 test_case = test_data[0] self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files") assert len(test_case["inputs"]) == 1, test_case
[docs] @skip_without_tool("expression_null_handling_boolean") def test_test_data_null_boolean_inputs(self): test_data_response = self._get("tools/expression_null_handling_boolean/test_data") assert test_data_response.status_code == 200 test_data = test_data_response.json() assert len(test_data) == 3 test_case = test_data[2] self._assert_has_keys(test_case, "inputs", "outputs", "output_collections", "required_files") inputs = test_case["inputs"] assert len(inputs) == 1, test_case assert "bool_input" in inputs, inputs assert inputs["bool_input"] is None, inputs
[docs] @skip_without_tool("simple_constructs_y") def test_test_data_yaml_tools(self): test_data_response = self._get("tools/simple_constructs_y/test_data") assert test_data_response.status_code == 200 test_data = test_data_response.json() assert len(test_data) == 3
[docs] @skip_without_tool("cat1") def test_test_data_download(self): test_data_response = self._get("tools/cat1/test_data_download?filename=1.bed") assert test_data_response.status_code == 200, test_data_response.text.startswith("chr")
[docs] @skip_without_tool("composite_output") def test_test_data_downloads_security(self): test_data_response = self._get("tools/composite_output/test_data_download?filename=../CONTRIBUTORS.md") assert test_data_response.status_code == 404, test_data_response.text
[docs] @skip_without_tool("composite_output") def test_test_data_download_composite(self): test_data_response = self._get("tools/composite_output/test_data_download?filename=velveth_test1") assert test_data_response.status_code == 200 with zipfile.ZipFile(BytesIO(test_data_response.content)) as contents: namelist = contents.namelist() assert len(namelist) == 6 expected_names = { "velveth_test1/Roadmaps", "velveth_test1/output.html", "velveth_test1/Sequences", "velveth_test1/Log", "velveth_test1/output/", "velveth_test1/output/1", } assert set(namelist) == expected_names
[docs] def test_convert_dataset_explicit_history(self): with self.dataset_populator.test_history(require_new=False) as history_id: fasta1_contents = open(self.get_filename("1.fasta")).read() hda1 = self.dataset_populator.new_dataset(history_id, content=fasta1_contents) payload = { "src": "hda", "id": hda1["id"], "source_type": "fasta", "target_type": "tabular", "history_id": history_id, } create_response = self._post("tools/CONVERTER_fasta_to_tabular/convert", data=payload) self.dataset_populator.wait_for_job(create_response.json()["jobs"][0]["id"], assert_ok=True) create_response.raise_for_status() assert len(create_response.json()["implicit_collections"]) == 0 for output in create_response.json()["outputs"]: assert output["file_ext"] == "tabular"
[docs] def test_convert_dataset_implicit_history(self): with self.dataset_populator.test_history(require_new=False) as history_id: fasta1_contents = open(self.get_filename("1.fasta")).read() hda1 = self.dataset_populator.new_dataset(history_id, content=fasta1_contents) payload = {"src": "hda", "id": hda1["id"], "source_type": "fasta", "target_type": "tabular"} create_response = self._post("tools/CONVERTER_fasta_to_tabular/convert", data=payload) self.dataset_populator.wait_for_job(create_response.json()["jobs"][0]["id"], assert_ok=True) create_response.raise_for_status() assert len(create_response.json()["implicit_collections"]) == 0 for output in create_response.json()["outputs"]: assert output["file_ext"] == "tabular"
[docs] def test_convert_hdca(self): with self.dataset_populator.test_history(require_new=False) as history_id: data = [ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "fasta"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "fasta"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "fasta"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "fasta"}, ], }, ] hdca1 = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=data, wait=True ) self._assert_status_code_is(hdca1, 200) payload = { "src": "hdca", "id": hdca1.json()["outputs"][0]["id"], "source_type": "fasta", "target_type": "tabular", "history_id": history_id, } create_response = self._post("tools/CONVERTER_fasta_to_tabular/convert", payload) create_response.raise_for_status() assert create_response.json()["implicit_collections"] != [] hid = create_response.json()["implicit_collections"][0]["hid"] self.dataset_populator.wait_for_history(history_id, assert_ok=True) collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=hid) for element in collection_details["elements"][0]["object"]["elements"]: assert element["object"]["file_ext"] == "tabular"
[docs] def test_unzip_collection(self): with self.dataset_populator.test_history(require_new=False) as history_id: hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "input": {"src": "hdca", "id": hdca_id}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) response = self._run("__UNZIP_COLLECTION__", history_id, inputs, assert_ok=True) outputs = response["outputs"] assert len(outputs) == 2 output_forward = outputs[0] output_reverse = outputs[1] output_forward_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output_forward ) output_reverse_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output_reverse ) assert output_forward_content.strip() == "123" assert output_reverse_content.strip() == "456" output_forward = self.dataset_populator.get_history_dataset_details(history_id, dataset=output_forward) output_reverse = self.dataset_populator.get_history_dataset_details(history_id, dataset=output_reverse) assert output_forward["history_id"] == history_id assert output_reverse["history_id"] == history_id
[docs] def test_unzip_nested(self): with self.dataset_populator.test_history(require_new=False) as history_id: response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ { "src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "txt", "tags": ["#foo"], }, { "src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "txt", "tags": ["#bar"], }, ], } ], wait=True, ) self._assert_status_code_is(response, 200) hdca_id = response.json()["outputs"][0]["id"] inputs = { "input": { "batch": True, "values": [{"src": "hdca", "map_over_type": "paired", "id": hdca_id}], } } response = self._run("__UNZIP_COLLECTION__", history_id, inputs, assert_ok=True) implicit_collections = response["implicit_collections"] assert len(implicit_collections) == 2 unzipped_hdca = self.dataset_populator.get_history_collection_details( history_id, hid=implicit_collections[0]["hid"] ) assert unzipped_hdca["elements"][0]["element_type"] == "hda", unzipped_hdca
[docs] def test_zip_inputs(self): with self.dataset_populator.test_history(require_new=False) as history_id: hda1 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="1\t2\t3")) hda2 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="4\t5\t6")) inputs = { "input_forward": hda1, "input_reverse": hda2, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) response = self._run("__ZIP_COLLECTION__", history_id, inputs, assert_ok=True) output_collections = response["output_collections"] assert len(output_collections) == 1 self.dataset_populator.wait_for_job(response["jobs"][0]["id"], assert_ok=True) zipped_hdca = self.dataset_populator.get_history_collection_details( history_id, hid=output_collections[0]["hid"] ) assert zipped_hdca["collection_type"] == "paired"
[docs] @skip_without_tool("__ZIP_COLLECTION__") def test_collection_operation_dataset_input_permissions(self): with self.dataset_populator.test_history(require_new=False) as history_id: hda1 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="1\t2\t3")) self.dataset_populator.wait_for_history(history_id, assert_ok=True) self.dataset_populator.make_private(history_id, hda1["id"]) inputs = { "input_forward": hda1, "input_reverse": hda1, } with self._different_user_and_history() as other_history_id: response = self._run("__ZIP_COLLECTION__", other_history_id, inputs, assert_ok=False) self._assert_dataset_permission_denied_response(response)
[docs] @skip_without_tool("__UNZIP_COLLECTION__") def test_collection_operation_collection_input_permissions(self): with self.dataset_populator.test_history(require_new=False) as history_id: create_response = self.dataset_collection_populator.create_pair_in_history( history_id, direct_upload=True, wait=True ) self._assert_status_code_is(create_response, 200) collection = create_response.json()["outputs"][0] self.dataset_populator.wait_for_history(history_id, assert_ok=True) collection = self.dataset_populator.get_history_collection_details(history_id, hid=collection["hid"]) element_id = collection["elements"][0]["object"]["id"] self.dataset_populator.make_private(history_id, element_id) inputs = { "input": {"src": "hdca", "id": collection["id"]}, } with self._different_user_and_history() as other_history_id: response = self._run("__UNZIP_COLLECTION__", other_history_id, inputs, assert_ok=False) self._assert_dataset_permission_denied_response(response)
[docs] def test_zip_list_inputs(self): with self.dataset_populator.test_history(require_new=False) as history_id: hdca1_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True ).json()["outputs"][0]["id"] hdca2_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=["1\n2\n3\n4", "5\n6\n7\n8"], wait=True ).json()["outputs"][0]["id"] inputs = { "input_forward": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]}, "input_reverse": {"batch": True, "values": [{"src": "hdca", "id": hdca2_id}]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) response = self._run("__ZIP_COLLECTION__", history_id, inputs, assert_ok=True) implicit_collections = response["implicit_collections"] assert len(implicit_collections) == 1 self.dataset_populator.wait_for_job(response["jobs"][0]["id"], assert_ok=True) zipped_hdca = self.dataset_populator.get_history_collection_details( history_id, hid=implicit_collections[0]["hid"] ) assert zipped_hdca["collection_type"] == "list:paired"
[docs] @skip_without_tool("__EXTRACT_DATASET__") @skip_without_tool("cat_data_and_sleep") def test_database_operation_tool_with_pending_inputs(self): with self.dataset_populator.test_history(require_new=False) as history_id: hdca1_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True ).json()["outputs"][0]["id"] self.dataset_populator.run_tool( tool_id="cat_data_and_sleep", inputs={ "sleep_time": 15, "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]}, }, history_id=history_id, ) run_response = self.dataset_populator.run_tool( tool_id="__EXTRACT_DATASET__", inputs={ "data_collection": {"src": "hdca", "id": hdca1_id}, }, history_id=history_id, ) assert run_response["outputs"][0]["state"] != "ok"
[docs] @skip_without_tool("__FILTER_FAILED_DATASETS__") def test_filter_failed_list(self): with self.dataset_populator.test_history(require_new=False) as history_id: ok_hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=["0", "1", "0", "1"], wait=True ).json()["outputs"][0]["id"] response = self.dataset_populator.run_exit_code_from_file(history_id, ok_hdca_id) mixed_implicit_collections = response["implicit_collections"] assert len(mixed_implicit_collections) == 1 mixed_hdca_hid = mixed_implicit_collections[0]["hid"] mixed_hdca = self.dataset_populator.get_history_collection_details( history_id, hid=mixed_hdca_hid, wait=False ) def get_state(dce): return dce["object"]["state"] mixed_states = [get_state(_) for _ in mixed_hdca["elements"]] assert mixed_states == ["ok", "error", "ok", "error"], mixed_states filtered_hdca = self._run_filter(history_id=history_id, failed_hdca_id=mixed_hdca["id"]) filtered_states = [get_state(_) for _ in filtered_hdca["elements"]] assert filtered_states == ["ok", "ok"], filtered_states
[docs] @skip_without_tool("__FILTER_FAILED_DATASETS__") def test_filter_failed_list_paired(self): with self.dataset_populator.test_history(require_new=False) as history_id: pair1 = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["0", "0"], wait=True ).json()["outputs"][0]["id"] pair2 = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["0", "1"], wait=True ).json()["outputs"][0]["id"] ok_hdca_id = self.dataset_collection_populator.create_list_from_pairs(history_id, [pair1, pair2]).json()[ "id" ] response = self.dataset_populator.run_exit_code_from_file(history_id, ok_hdca_id) mixed_implicit_collections = response["implicit_collections"] assert len(mixed_implicit_collections) == 1 mixed_hdca_hid = mixed_implicit_collections[0]["hid"] mixed_hdca = self.dataset_populator.get_history_collection_details( history_id, hid=mixed_hdca_hid, wait=False ) def get_state(dce): return dce["object"]["state"] mixed_states = [get_state(element) for _ in mixed_hdca["elements"] for element in _["object"]["elements"]] assert mixed_states == ["ok", "ok", "ok", "error"], mixed_states filtered_hdca = self._run_filter(history_id=history_id, failed_hdca_id=mixed_hdca["id"]) filtered_states = [ get_state(element) for _ in filtered_hdca["elements"] for element in _["object"]["elements"] ] assert filtered_states == ["ok", "ok"], filtered_states # Also try list:list:paired llp = self.dataset_collection_populator.create_nested_collection( history_id=history_id, collection_type="list:list:paired", collection=[mixed_hdca["id"]] ).json() filtered_nested_hdca = self._run_filter(history_id=history_id, failed_hdca_id=llp["id"], batch=True) filtered_states = [ get_state(element) for _ in filtered_nested_hdca["elements"] for parent_element in _["object"]["elements"] for element in parent_element["object"]["elements"] ] assert filtered_states == ["ok", "ok"], filtered_states
def _run_filter(self, history_id, failed_hdca_id, batch=False): if batch: inputs = { "input": { "batch": batch, "values": [{"map_over_type": "list:paired", "src": "hdca", "id": failed_hdca_id}], }, } else: inputs = { "input": {"batch": batch, "src": "hdca", "id": failed_hdca_id}, } response = self._run("__FILTER_FAILED_DATASETS__", history_id, inputs, assert_ok=False).json() self.dataset_populator.wait_for_history(history_id, assert_ok=False) filter_output_collections = response["output_collections"] if batch: return response["implicit_collections"][0] assert len(filter_output_collections) == 1 filtered_hid = filter_output_collections[0]["hid"] filtered_hdca = self.dataset_populator.get_history_collection_details(history_id, hid=filtered_hid, wait=False) return filtered_hdca def _apply_rules_and_check(self, example: Dict[str, Any]) -> None: with self.dataset_populator.test_history(require_new=False) as history_id: inputs = stage_rules_example(self.galaxy_interactor, history_id, example) hdca = inputs["input"] inputs = {"input": {"src": "hdca", "id": hdca["id"]}, "rules": example["rules"]} self.dataset_populator.wait_for_history(history_id) response = self._run("__APPLY_RULES__", history_id, inputs, assert_ok=True) output_collections = response["output_collections"] assert len(output_collections) == 1 output_hid = output_collections[0]["hid"] output_hdca = self.dataset_populator.get_history_collection_details(history_id, hid=output_hid, wait=False) example["check"](output_hdca, self.dataset_populator)
[docs] def test_apply_rules_1(self): self._apply_rules_and_check(rules_test_data.EXAMPLE_1)
[docs] def test_apply_rules_2(self): self._apply_rules_and_check(rules_test_data.EXAMPLE_2)
[docs] def test_apply_rules_3(self): self._apply_rules_and_check(rules_test_data.EXAMPLE_3)
[docs] def test_apply_rules_4(self): self._apply_rules_and_check(rules_test_data.EXAMPLE_4)
[docs] def test_apply_rules_5(self): self._apply_rules_and_check(rules_test_data.EXAMPLE_5)
[docs] def test_apply_rules_6(self): self._apply_rules_and_check(rules_test_data.EXAMPLE_6)
[docs] @skip_without_tool("galaxy_json_sleep") def test_dataset_hidden_after_job_finish(self): with self.dataset_populator.test_history() as history_id: inputs = { "sleep_time": 5, } response = self._run("galaxy_json_sleep", history_id, inputs, assert_ok=True) output = response["outputs"][0] response = self._put( f"histories/{history_id}/contents/datasets/{output['id']}", data={"visible": False}, json=True ) response.raise_for_status() output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output, wait=False) assert not output_details["visible"] output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output, wait=True) assert not output_details["visible"]
[docs] @skip_without_tool("multi_select") def test_multi_select_as_list(self): with self.dataset_populator.test_history(require_new=False) as history_id: inputs = { "select_ex": ["--ex1", "ex2"], } response = self._run("multi_select", history_id, inputs, assert_ok=True) output = response["outputs"][0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output) assert output1_content == "--ex1,ex2"
[docs] @skip_without_tool("multi_select") def test_multi_select_optional(self): with self.dataset_populator.test_history(require_new=False) as history_id: inputs = { "select_ex": ["--ex1"], "select_optional": None, } response = self._run("multi_select", history_id, inputs, assert_ok=True) output = response["outputs"] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[0]) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[1]) assert output1_content.strip() == "--ex1" assert output2_content.strip() == "None", output2_content
[docs] @skip_without_tool("library_data") def test_library_data_param(self): with self.dataset_populator.test_history(require_new=False) as history_id: ld = LibraryPopulator(self.galaxy_interactor).new_library_dataset("lda_test_library") inputs = {"library_dataset": ld["ldda_id"], "library_dataset_multiple": [ld["ldda_id"], ld["ldda_id"]]} response = self._run("library_data", history_id, inputs, assert_ok=True) output = response["outputs"] output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[0]) assert output_content == "TestData\n", output_content output_multiple_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output[1]) assert output_multiple_content == "TestData\nTestData\n", output_multiple_content
[docs] @skip_without_tool("multi_data_param") def test_multidata_param(self): with self.dataset_populator.test_history(require_new=False) as history_id: hda1 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="1\t2\t3")) hda2 = dataset_to_param(self.dataset_populator.new_dataset(history_id, content="4\t5\t6")) inputs = { "f1": {"batch": False, "values": [hda1, hda2]}, "f2": {"batch": False, "values": [hda2, hda1]}, } response = self._run("multi_data_param", history_id, inputs, assert_ok=True) output1 = response["outputs"][0] output2 = response["outputs"][1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content == "1\t2\t3\n4\t5\t6\n", output1_content assert output2_content == "4\t5\t6\n1\t2\t3\n", output2_content
[docs] @skip_without_tool("cat1") def test_run_cat1(self): with self.dataset_populator.test_history(require_new=False) as history_id: # Run simple non-upload tool with an input data parameter. new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Test") inputs = dict( input1=dataset_to_param(new_dataset), ) outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "Cat1Test"
[docs] @skip_without_tool("cat1") @requires_new_history def test_run_cat1_use_cached_job(self): with self.dataset_populator.test_history_for(self.test_run_cat1_use_cached_job) as history_id: # Run simple non-upload tool with an input data parameter. new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Test") inputs = dict( input1=dataset_to_param(new_dataset), ) outputs_one = self._run_cat1(history_id, inputs=inputs, assert_ok=True, wait_for_job=True) outputs_two = self._run_cat1( history_id, inputs=inputs, use_cached_job=False, assert_ok=True, wait_for_job=True ) outputs_three = self._run_cat1( history_id, inputs=inputs, use_cached_job=True, assert_ok=True, wait_for_job=True ) dataset_details = [] for output in [outputs_one, outputs_two, outputs_three]: output_id = output["outputs"][0]["id"] dataset_details.append(self._get(f"datasets/{output_id}").json()) filenames = [dd["file_name"] for dd in dataset_details] assert len(filenames) == 3, filenames assert len(set(filenames)) <= 2, filenames
[docs] @skip_without_tool("cat1") def test_run_cat1_listified_param(self): with self.dataset_populator.test_history_for(self.test_run_cat1_listified_param) as history_id: # Run simple non-upload tool with an input data parameter. new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Testlistified") inputs = dict( input1=[dataset_to_param(new_dataset)], ) outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "Cat1Testlistified"
[docs] @skip_without_tool("multiple_versions") def test_run_by_versions(self): with self.dataset_populator.test_history_for(self.test_run_by_versions) as history_id: for version in ["0.1", "0.2"]: # Run simple non-upload tool with an input data parameter. outputs = self._run_and_get_outputs( tool_id="multiple_versions", history_id=history_id, tool_version=version ) assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == f"Version {version}"
[docs] @skip_without_tool("multiple_versions") def test_test_by_versions(self): test_data_response = self._get("tools/multiple_versions/test_data") test_data_response.raise_for_status() test_data_dicts = test_data_response.json() assert len(test_data_dicts) == 1 assert test_data_dicts[0]["tool_version"] == "0.2" test_data_response = self._get("tools/multiple_versions/test_data?tool_version=*") test_data_response.raise_for_status() test_data_dicts = test_data_response.json()
# this found a bug - tools that appear in the toolbox twice should not cause # multiple copies of test data to be returned. This assertion broke when # we placed multiple_versions in the test tool panel in multiple places. We need # to fix this but it isn't as important as the existing bug. # assert len(test_data_dicts) == 3
[docs] @skip_without_tool("multiple_versions") def test_show_with_wrong_tool_version_in_tool_id(self): tool_info = self._show_valid_tool("multiple_versions", tool_version="0.01") # Return last version assert tool_info["version"] == "0.2"
[docs] @skip_without_tool("cat1") def test_run_cat1_single_meta_wrapper(self): with self.dataset_populator.test_history_for(self.test_run_cat1_single_meta_wrapper) as history_id: # Wrap input in a no-op meta parameter wrapper like Sam is planning to # use for all UI API submissions. new_dataset = self.dataset_populator.new_dataset(history_id, content="123") inputs = dict( input1={"batch": False, "values": [dataset_to_param(new_dataset)]}, ) outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "123"
[docs] @skip_without_tool("cat1") @requires_new_history def test_guess_derived_permissions(self): with self.dataset_populator.test_history_for(self.test_run_cat1_single_meta_wrapper) as history_id: def assert_inputs(inputs, can_be_used=True): # Until we make the dataset private, _different_user() can use it: with self._different_user_and_history() as other_history_id: response = self._run("cat1", other_history_id, inputs) if can_be_used: assert response.status_code == 200 else: self._assert_dataset_permission_denied_response(response) new_dataset = self.dataset_populator.new_dataset(history_id, content="Cat1Test") inputs = dict( input1=dataset_to_param(new_dataset), ) # Until we make the dataset private, _different_user() can use it: assert_inputs(inputs, can_be_used=True) self.dataset_populator.make_private(history_id, new_dataset["id"]) # _different_user can no longer use the input dataset. assert_inputs(inputs, can_be_used=False) outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 1 output1 = outputs[0] inputs_2 = dict( input1=dataset_to_param(output1), ) # _different_user cannot use datasets derived from the private input. assert_inputs(inputs_2, can_be_used=False)
[docs] @skip_without_tool("collection_creates_list") def test_guess_derived_permissions_collections(self, history_id): def first_element_dataset_id(hdca): # Fetch full and updated details for HDCA print(hdca) full_hdca = self.dataset_populator.get_history_collection_details(history_id, hid=hdca["hid"]) elements = full_hdca["elements"] element0 = elements[0]["object"] return element0["id"] response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], direct_upload=True, wait=True ) self._assert_status_code_is(response, 200) hdca = response.json()["output_collections"][0] self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { "input1": {"src": "hdca", "id": hdca["id"]}, } public_output_response = self._run("collection_creates_list", history_id, inputs) self._assert_status_code_is(public_output_response, 200) self.dataset_populator.wait_for_history(history_id, assert_ok=True) input_element_id = first_element_dataset_id(hdca) self.dataset_populator.make_private(history_id, input_element_id) private_output_response = self._run("collection_creates_list", history_id, inputs) self._assert_status_code_is(private_output_response, 200) self.dataset_populator.wait_for_history(history_id, assert_ok=True) public_element_id = first_element_dataset_id(public_output_response.json()["output_collections"][0]) private_element_id = first_element_dataset_id(private_output_response.json()["output_collections"][0]) def _dataset_accessible(dataset_id): contents_response = self._get(f"histories/{history_id}/contents/{dataset_id}").json() return "name" in contents_response with self._different_user(): assert _dataset_accessible(public_element_id) assert not _dataset_accessible(private_element_id)
[docs] @skip_without_tool("validation_default") def test_validation(self, history_id): inputs = { "select_param": '" ; echo "moo', } response = self._run("validation_default", history_id, inputs) self._assert_status_code_is(response, 400)
[docs] @skip_without_tool("validation_empty_dataset") def test_validation_empty_dataset(self, history_id): outputs = self._run_and_get_outputs("empty_output", history_id) empty_dataset = outputs[0] inputs = { "input1": dataset_to_param(empty_dataset), } self.dataset_populator.wait_for_history(history_id, assert_ok=True) response = self._run("validation_empty_dataset", history_id, inputs) self._assert_status_code_is(response, 400)
[docs] @skip_without_tool("validation_repeat") def test_validation_in_repeat(self, history_id): inputs = { "r1_0|text": "123", "r2_0|text": "", } response = self._run("validation_repeat", history_id, inputs) self._assert_status_code_is(response, 400)
[docs] @requires_new_history @skip_without_tool("collection_paired_test") def test_collection_parameter(self, history_id): hdca_id = self._build_pair(history_id, ["123\n", "456\n"]) inputs = { "f1": {"src": "hdca", "id": hdca_id}, } output = self._run("collection_paired_test", history_id, inputs, assert_ok=True) assert len(output["jobs"]) == 1 assert len(output["implicit_collections"]) == 0 assert len(output["outputs"]) == 1 contents = self.dataset_populator.get_history_dataset_content(history_id, hid=4) assert contents.strip() == "123\n456", contents
[docs] @skip_without_tool("collection_creates_pair") def test_paired_collection_output(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789\n0ab") inputs = { "input1": {"src": "hda", "id": new_dataset1["id"]}, } # TODO: shouldn't need this wait self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_creates_pair", history_id, inputs, assert_ok=True) output_collection = self._assert_one_job_one_collection_run(create) element0, element1 = self._assert_elements_are(output_collection, "forward", "reverse") self.dataset_populator.wait_for_history(history_id, assert_ok=True) self._verify_element(history_id, element0, contents="123\n789\n", file_ext="txt", visible=False) self._verify_element(history_id, element1, contents="456\n0ab\n", file_ext="txt", visible=False)
[docs] @skip_without_tool("collection_creates_list") def test_list_collection_output(self, history_id): create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True ) hdca_id = create_response.json()["outputs"][0]["id"] create = self.dataset_populator.run_collection_creates_list(history_id, hdca_id) output_collection = self._assert_one_job_one_collection_run(create) element0, element1 = self._assert_elements_are(output_collection, "data0", "data1") self.dataset_populator.wait_for_history(history_id, assert_ok=True) self._verify_element(history_id, element0, contents="identifier is data0\n", file_ext="txt") self._verify_element(history_id, element1, contents="identifier is data1\n", file_ext="txt")
[docs] @skip_without_tool("collection_creates_list_2") def test_list_collection_output_format_source(self, history_id): # test using format_source with a tool new_dataset1 = self.dataset_populator.new_dataset(history_id, content="#col1\tcol2") create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\tb\nc\td", "e\tf\ng\th"], wait=True ) hdca_id = create_response.json()["outputs"][0]["id"] inputs = { "header": {"src": "hda", "id": new_dataset1["id"]}, "input_collect": {"src": "hdca", "id": hdca_id}, } # TODO: real problem here - shouldn't have to have this wait. self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_creates_list_2", history_id, inputs, assert_ok=True) output_collection = self._assert_one_job_one_collection_run(create) element0, element1 = self._assert_elements_are(output_collection, "data0", "data1") self.dataset_populator.wait_for_history(history_id, assert_ok=True) self._verify_element(history_id, element0, contents="#col1\tcol2\na\tb\nc\td\n", file_ext="txt") self._verify_element(history_id, element1, contents="#col1\tcol2\ne\tf\ng\th\n", file_ext="txt")
[docs] @skip_without_tool("collection_split_on_column") def test_dynamic_list_output(self, history_id): new_dataset1 = self.dataset_populator.new_dataset( history_id, content="samp1\t1\nsamp1\t3\nsamp2\t2\nsamp2\t4\n" ) inputs = { "input1": dataset_to_param(new_dataset1), } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_split_on_column", history_id, inputs, assert_ok=True) output_collection = self._assert_one_job_one_collection_run(create) self._assert_has_keys(output_collection, "id", "name", "elements", "populated") assert not output_collection["populated"] assert len(output_collection["elements"]) == 0 assert output_collection["name"] == "Table split on first column" self.dataset_populator.wait_for_job(create["jobs"][0]["id"], assert_ok=True) get_collection_response = self._get( f"dataset_collections/{output_collection['id']}", data={"instance_type": "history"} ) self._assert_status_code_is(get_collection_response, 200) output_collection = get_collection_response.json() self._assert_has_keys(output_collection, "id", "name", "elements", "populated") assert output_collection["populated"] assert output_collection["name"] == "Table split on first column" assert len(output_collection["elements"]) == 2 output_element_0 = output_collection["elements"][0] assert output_element_0["element_index"] == 0 assert output_element_0["element_identifier"] == "samp1" output_element_hda_0 = output_element_0["object"] assert output_element_hda_0["metadata_column_types"] is not None
[docs] @skip_without_tool("collection_creates_dynamic_nested") def test_dynamic_list_output_datasets_in_failed_state(self, history_id): inputs = {"fail_bool": True} create = self._run("collection_creates_dynamic_nested", history_id, inputs, assert_ok=False, wait_for_job=True) self._assert_status_code_is(create, 200) collection = self._get( f"dataset_collections/{create.json()['output_collections'][0]['id']}", data={"instance_type": "history"} ).json() assert collection["element_count"] == 3 for nested_collection in collection["elements"]: nested_collection = nested_collection["object"] assert nested_collection["element_count"] == 2 for element in nested_collection["elements"]: assert element["object"]["state"] == "error"
[docs] def test_nonadmin_users_cannot_create_tools(self): payload = dict( representation=json.dumps(MINIMAL_TOOL), ) create_response = self._post("dynamic_tools", data=payload, admin=False) self._assert_status_code_is(create_response, 403)
[docs] def test_dynamic_tool_1(self): # Create tool. self.dataset_populator.create_tool(MINIMAL_TOOL) # Run tool. history_id = self.dataset_populator.new_history() self._run("minimal_tool", history_id) self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id) assert output_content == "Hello World\n"
[docs] def test_dynamic_tool_from_path(self): # Create tool. dynamic_tool_path = os.path.join( galaxy_root_path, "lib", "galaxy_test", "base", "data", "minimal_tool_no_id.json" ) tool_response = self.dataset_populator.create_tool_from_path(dynamic_tool_path) self._assert_has_keys(tool_response, "uuid") # Run tool. history_id = self.dataset_populator.new_history() self._run(history_id=history_id, tool_uuid=tool_response["uuid"]) self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id) assert output_content == "Hello World 2\n"
[docs] def test_dynamic_tool_no_id(self): # Create tool. tool_response = self.dataset_populator.create_tool(MINIMAL_TOOL_NO_ID) self._assert_has_keys(tool_response, "uuid") # Run tool. history_id = self.dataset_populator.new_history() self._run(history_id=history_id, tool_uuid=tool_response["uuid"]) self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id) assert output_content == "Hello World 2\n"
[docs] def test_show_dynamic_tools(self): # Create tool. original_list = self.dataset_populator.list_dynamic_tools() created_dynamic_tool_dict = self.dataset_populator.create_tool(MINIMAL_TOOL_NO_ID) self._assert_has_keys(created_dynamic_tool_dict, "id", "uuid", "active") created_id = created_dynamic_tool_dict["id"] created_uuid = created_dynamic_tool_dict["uuid"] new_list = self.dataset_populator.list_dynamic_tools() for dynamic_tool_dict in original_list: self._assert_has_keys(dynamic_tool_dict, "id", "uuid", "active") assert dynamic_tool_dict["id"] != created_id assert dynamic_tool_dict["uuid"] != created_uuid found_id = False found_uuid = False for dynamic_tool_dict in new_list: self._assert_has_keys(dynamic_tool_dict, "id", "uuid", "active") found_id = found_id or dynamic_tool_dict["id"] == created_id found_uuid = found_uuid or dynamic_tool_dict["uuid"] == created_uuid assert found_id assert found_uuid
[docs] def test_show_tool_source_admin(self): response = self._get("tools/cat1/raw_tool_source", admin=True) response.raise_for_status() assert "Concatenate datasets" in response.text assert response.headers["language"] == "xml"
[docs] def test_show_tool_source_denied(self): with self._different_user(anon=True): response = self._get("tools/cat1/raw_tool_source") assert response.status_code == 403
[docs] def test_tool_deactivate(self): # Create tool. tool_response = self.dataset_populator.create_tool(MINIMAL_TOOL_NO_ID) self._assert_has_keys(tool_response, "id", "uuid", "active") assert tool_response["active"] deactivate_response = self.dataset_populator.deactivate_dynamic_tool(tool_response["uuid"]) assert not deactivate_response["active"] # Run tool. history_id = self.dataset_populator.new_history() response = self._run(history_id=history_id, tool_uuid=tool_response["uuid"], assert_ok=False) # Get a 404 when trying to run a deactivated tool. self._assert_status_code_is(response, 404)
[docs] @skip_without_tool("cat1") def test_run_cat1_with_two_inputs(self, history_id): # Run tool with an multiple data parameter and grouping (repeat) new_dataset1 = self.dataset_populator.new_dataset(history_id, content="Cat1Test") new_dataset2 = self.dataset_populator.new_dataset(history_id, content="Cat2Test") inputs = {"input1": dataset_to_param(new_dataset1), "queries_0|input2": dataset_to_param(new_dataset2)} outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "Cat1Test\nCat2Test"
[docs] @skip_without_tool("mapper_two") def test_bam_state_regression(self, history_id): # Test regression of https://github.com/galaxyproject/galaxy/issues/6856. With changes # to metadata file flushing to optimize creating bam outputs and copying bam datasets # we observed very subtle problems with HDA state changes on other files being flushed at # the same time. This tests txt datasets finalized before and after the bam outputs as # well as other bam files all flush properly during job completion. new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789") inputs = { "input1": dataset_to_param(new_dataset1), "reference": dataset_to_param(new_dataset1), } outputs = self._run_and_get_outputs("mapper_two", history_id, inputs) assert len(outputs) == 4 for output in outputs: details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) assert details["state"] == "ok"
[docs] @skip_without_tool("qc_stdout") def test_qc_messages(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789") inputs = { "input1": dataset_to_param(new_dataset1), "quality": 3, } create = self._run("qc_stdout", history_id, inputs, wait_for_job=True, assert_ok=True) assert "jobs" in create, create job_id = create["jobs"][0]["id"] details = self.dataset_populator.get_job_details(job_id, full=True).json() assert "job_messages" in details, details qc_message = details["job_messages"][0] # assert qc_message["code_desc"] == "QC Metrics for Tool", qc_message assert qc_message["desc"] == "QC: Matched on Quality of sample is 30%." assert qc_message["match"] == "Quality of sample is 30%." assert qc_message["error_level"] == 1.1
[docs] @skip_without_tool("cat1") def test_multirun_cat1(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123") new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456") datasets = [dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)] inputs = { "input1": { "batch": True, "values": datasets, }, } self._check_cat1_multirun(history_id, inputs)
def _check_cat1_multirun(self, history_id, inputs): outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 2 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123" assert output2_content.strip() == "456"
[docs] @skip_without_tool("random_lines1") def test_multirun_non_data_parameter(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123\n456\n789") inputs = {"input": dataset_to_param(new_dataset1), "num_lines": {"batch": True, "values": [1, 2, 3]}} outputs = self._run_and_get_outputs("random_lines1", history_id, inputs) # Assert we have three outputs with 1, 2, and 3 lines respectively. assert len(outputs) == 3 outputs_contents = [ self.dataset_populator.get_history_dataset_content(history_id, dataset=o).strip() for o in outputs ] assert sorted(len(c.split("\n")) for c in outputs_contents) == [1, 2, 3]
[docs] @skip_without_tool("cat1") def test_multirun_in_repeat(self): history_id, common_dataset, repeat_datasets = self._setup_repeat_multirun() inputs = { "input1": common_dataset, "queries_0|input2": {"batch": True, "values": repeat_datasets}, } self._check_repeat_multirun(history_id, inputs)
[docs] @skip_without_tool("cat1") def test_multirun_in_repeat_mismatch(self): history_id, common_dataset, repeat_datasets = self._setup_repeat_multirun() inputs = { "input1": {"batch": False, "values": [common_dataset]}, "queries_0|input2": {"batch": True, "values": repeat_datasets}, } self._check_repeat_multirun(history_id, inputs)
[docs] @skip_without_tool("cat1") def test_multirun_on_multiple_inputs(self): history_id, first_two, second_two = self._setup_two_multiruns() inputs = { "input1": {"batch": True, "values": first_two}, "queries_0|input2": {"batch": True, "values": second_two}, } outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 2 outputs_contents = [ self.dataset_populator.get_history_dataset_content(history_id, dataset=o).strip() for o in outputs ] assert "123\n789" in outputs_contents assert "456\n0ab" in outputs_contents
[docs] @skip_without_tool("cat1") def test_multirun_on_multiple_inputs_unlinked(self): history_id, first_two, second_two = self._setup_two_multiruns() inputs = { "input1": {"batch": True, "linked": False, "values": first_two}, "queries_0|input2": {"batch": True, "linked": False, "values": second_two}, } outputs = self._cat1_outputs(history_id, inputs=inputs) outputs_contents = [ self.dataset_populator.get_history_dataset_content(history_id, dataset=o).strip() for o in outputs ] assert len(outputs) == 4 assert "123\n789" in outputs_contents assert "456\n0ab" in outputs_contents assert "123\n0ab" in outputs_contents assert "456\n789" in outputs_contents
[docs] @skip_without_tool("dbkey_output_action") def test_dynamic_parameter_error_handling(self): # Run test with valid index once, then supply invalid dbkey and invalid table # entry to ensure dynamic param errors are register. job_data_list = [] def register_job_data(job_data): job_data_list.append(job_data) def tool_test_case_list(inputs, required_files) -> List[ValidToolTestDict]: return [ { "inputs": inputs, "outputs": {}, "required_files": required_files, "output_collections": [], "test_index": 0, "tool_version": "0.1.0", "tool_id": "dbkey_output_action", "error": False, } ] tool_test_dicts = tool_test_case_list( { "input": ["simple_line.txt"], "index": ["hg18_value"], }, [["simple_line.txt", {"value": "simple_line.txt", "dbkey": "hg18"}]], ) dynamic_param_error = None test_driver = self.driver_or_skip_test_if_remote() try: test_driver.run_tool_test("dbkey_output_action", _tool_test_dicts=tool_test_dicts) except Exception as e: dynamic_param_error = getattr(e, "dynamic_param_error", False) assert dynamic_param_error is None tool_test_dicts = tool_test_case_list( { "input": ["simple_line.txt"], "index": ["hg18_value"], }, [["simple_line.txt", {"value": "simple_line.txt", "dbkey": "hgnot18"}]], ) dynamic_param_error = None try: test_driver.run_tool_test("dbkey_output_action", _tool_test_dicts=tool_test_dicts) except Exception as e: dynamic_param_error = getattr(e, "dynamic_param_error", False) assert dynamic_param_error tool_test_dicts = tool_test_case_list( { "input": ["simple_line.txt"], "index": ["hgnot18"], }, [["simple_line.txt", {"value": "simple_line.txt", "dbkey": "hg18"}]], ) dynamic_param_error = None try: test_driver.run_tool_test( "dbkey_output_action", _tool_test_dicts=tool_test_dicts, register_job_data=register_job_data ) except Exception as e: dynamic_param_error = getattr(e, "dynamic_param_error", False) assert dynamic_param_error assert len(job_data_list) == 1 job_data = job_data_list[0] assert job_data["status"] == "error" job_data_list.clear() dynamic_param_error = None try: test_driver.run_tool_test( "dbkey_output_action", _tool_test_dicts=tool_test_dicts, skip_on_dynamic_param_errors=True, register_job_data=register_job_data, ) except Exception as e: dynamic_param_error = getattr(e, "dynamic_param_error", False) assert dynamic_param_error assert len(job_data_list) == 1 job_data = job_data_list[0] assert job_data["status"] == "skip"
def _assert_one_job_one_collection_run(self, create): jobs = create["jobs"] implicit_collections = create["implicit_collections"] collections = create["output_collections"] assert len(jobs) == 1 assert len(implicit_collections) == 0 assert len(collections) == 1 output_collection = collections[0] return output_collection def _assert_elements_are(self, collection, *args): elements = collection["elements"] assert len(elements) == len(args) for index, element in enumerate(elements): arg = args[index] assert arg == element["element_identifier"] return elements def _verify_element(self, history_id, element, **props): object_id = element["object"]["id"] if "contents" in props: expected_contents = props["contents"] contents = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=object_id) assert contents == expected_contents del props["contents"] if props: details = self.dataset_populator.get_history_dataset_details(history_id, dataset_id=object_id) for key, value in props.items(): assert details[key] == value def _setup_repeat_multirun(self): history_id = self.dataset_populator.new_history() new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123") new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456") common_dataset = self.dataset_populator.new_dataset(history_id, content="Common") return ( history_id, dataset_to_param(common_dataset), [dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)], ) def _check_repeat_multirun(self, history_id, inputs): outputs = self._cat1_outputs(history_id, inputs=inputs) assert len(outputs) == 2 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "Common\n123" assert output2_content.strip() == "Common\n456" def _setup_two_multiruns(self): history_id = self.dataset_populator.new_history() new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123") new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456") new_dataset3 = self.dataset_populator.new_dataset(history_id, content="789") new_dataset4 = self.dataset_populator.new_dataset(history_id, content="0ab") return ( history_id, [dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)], [dataset_to_param(new_dataset3), dataset_to_param(new_dataset4)], )
[docs] @skip_without_tool("cat") def test_map_over_collection(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } self._run_and_check_simple_collection_mapping(history_id, inputs)
[docs] @skip_without_tool("cat1") def test_map_over_empty_collection(self, history_id): hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, contents=[]).json()["outputs"][ 0 ]["id"] inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } create = self._run_cat1(history_id, inputs=inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 0 assert len(outputs) == 0 assert len(implicit_collections) == 1 empty_output = implicit_collections[0] assert empty_output["name"] == "Concatenate datasets on collection 1", empty_output
[docs] @skip_without_tool("output_action_change_format") def test_map_over_with_output_format_actions(self, history_id): for use_action in ["do", "dont"]: hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "input_cond|dispatch": use_action, "input_cond|input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } create = self._run("output_action_change_format", history_id, inputs).json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(outputs) == 2 assert len(implicit_collections) == 1 output1 = outputs[0] output2 = outputs[1] output1_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output1) output2_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output2) assert output1_details["file_ext"] == "txt" if (use_action == "do") else "data" assert output2_details["file_ext"] == "txt" if (use_action == "do") else "data"
[docs] @skip_without_tool("output_action_change_format_paired") def test_map_over_with_nested_paired_output_format_actions(self, history_id): hdca_id = self.__build_nested_list(history_id) inputs = {"input": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]}} create = self._run("output_action_change_format_paired", history_id, inputs).json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(outputs) == 2 assert len(implicit_collections) == 1 for output in outputs: assert output["file_ext"] == "txt", output
[docs] @skip_without_tool("output_filter_with_input") def test_map_over_with_output_filter_no_filtering(self, history_id): hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()["outputs"][0][ "id" ] inputs = { "input_1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "produce_out_1": "true", "filter_text_1": "foo", } create = self._run("output_filter_with_input", history_id, inputs).json() jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 3 assert len(implicit_collections) == 3 self._check_implicit_collection_populated(create)
[docs] @skip_without_tool("output_filter_with_input_optional") def test_map_over_with_output_filter_on_optional_input(self, history_id): hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=["myinputs"], wait=True ).json()["outputs"][0]["id"] inputs = { "input_1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } create = self._run("output_filter_with_input_optional", history_id, inputs).json() jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 self.dataset_populator.wait_for_job(jobs[0]["id"], assert_ok=True) assert len(implicit_collections) == 1 self._check_implicit_collection_populated(create)
[docs] @skip_without_tool("output_filter_with_input") def test_map_over_with_output_filter_one_filtered(self, history_id): hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()["outputs"][0][ "id" ] inputs = { "input_1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "produce_out_1": "true", "filter_text_1": "bar", } create = self._run("output_filter_with_input", history_id, inputs).json() jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 3 assert len(implicit_collections) == 2 self._check_implicit_collection_populated(create)
[docs] @skip_without_tool("Cut1") def test_map_over_with_complex_output_actions(self, history_id): hdca_id = self._bed_list(history_id) inputs = { "columnList": "c1,c2,c3,c4,c5", "delimiter": "T", "input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } create = self._run("Cut1", history_id, inputs).json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(outputs) == 2 assert len(implicit_collections) == 1 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.startswith("chr1") assert output2_content.startswith("chr1")
[docs] @skip_without_tool("collection_creates_dynamic_list_of_pairs") def test_map_over_with_discovered_output_collection_elements(self, history_id): hdca_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()["outputs"][0][ "id" ] inputs = {"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}} create = self._run("collection_creates_dynamic_list_of_pairs", history_id, inputs).json() implicit_collections = create["implicit_collections"] assert len(implicit_collections) == 1 assert implicit_collections[0]["collection_type"] == "list:list:paired" assert implicit_collections[0]["elements"][0]["object"]["element_count"] is None self.dataset_populator.wait_for_job(create["jobs"][0]["id"], assert_ok=True) hdca = self._get(f"histories/{history_id}/contents/dataset_collections/{implicit_collections[0]['id']}").json() assert hdca["elements"][0]["object"]["elements"][0]["object"]["elements"][0]["element_identifier"] == "forward"
def _bed_list(self, history_id): bed1_contents = open(self.get_filename("1.bed")).read() bed2_contents = open(self.get_filename("2.bed")).read() contents = [bed1_contents, bed2_contents] hdca = self.dataset_collection_populator.create_list_in_history(history_id, contents=contents, wait=True).json() return hdca["outputs"][0]["id"]
[docs] @skip_without_tool("identifier_single") def test_identifier_in_map(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } create_response = self._run("identifier_single", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(outputs) == 2 assert len(implicit_collections) == 1 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "forward" assert output2_content.strip() == "reverse"
[docs] @skip_without_tool("identifier_single") def test_identifier_outside_map(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123", name="Plain HDA") inputs = { "input1": {"src": "hda", "id": new_dataset1["id"]}, } create_response = self._run("identifier_single", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "Plain HDA"
[docs] @skip_without_tool("identifier_multiple") def test_list_selectable_in_multidata_input(self, history_id): self.dataset_collection_populator.create_list_in_history(history_id, contents=["123", "456"], wait=True) build = self.dataset_populator.build_tool_state("identifier_multiple", history_id) assert len(build["inputs"][0]["options"]["hdca"]) == 1
[docs] @skip_without_tool("identifier_multiple") def test_identifier_in_multiple_reduce(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "input1": {"src": "hdca", "id": hdca_id}, } create_response = self._run("identifier_multiple", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_in_conditional") def test_identifier_map_over_multiple_input_in_conditional(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "outer_cond|input1": {"src": "hdca", "id": hdca_id}, } create_response = self._run("identifier_in_conditional", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_in_conditional") def test_identifier_map_over_multiple_input_in_conditional_new_payload_form(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "outer_cond": { "multi_input": True, "input1": {"id": hdca_id, "src": "hdca"}, }, } create_response = self._run("identifier_in_conditional", history_id, inputs, input_format="21.01") self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_multiple_in_repeat") def test_identifier_multiple_reduce_in_repeat_new_payload_form(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "the_repeat": [{"the_data": {"input1": {"src": "hdca", "id": hdca_id}}}], } create_response = self._run("identifier_multiple_in_repeat", history_id, inputs, input_format="21.01") self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_in_conditional") def test_identifier_map_over_input_in_conditional(self, history_id): # Run cat tool, so HDA names are different from element identifiers hdca_id = self._build_pair(history_id, ["123", "456"], run_cat=True) inputs = { "outer_cond|input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, "outer_cond|multi_input": False, } create_response = self._run("identifier_in_conditional", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(outputs) == 2 assert len(implicit_collections) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward" output2 = outputs[1] output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output2_content.strip() == "reverse"
[docs] @skip_without_tool("identifier_multiple_in_conditional") def test_identifier_multiple_reduce_in_conditional(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "outer_cond|inner_cond|input1": {"src": "hdca", "id": hdca_id}, } create_response = self._run("identifier_multiple_in_conditional", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_multiple_in_repeat") def test_identifier_multiple_reduce_in_repeat(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = { "the_repeat_0|the_data|input1": {"src": "hdca", "id": hdca_id}, } create_response = self._run("identifier_multiple_in_repeat", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "forward\nreverse"
[docs] @skip_without_tool("identifier_single_in_repeat") def test_identifier_single_in_repeat(self, history_id): hdca_id = self._build_pair(history_id, ["123", "456"]) inputs = {"the_repeat_0|the_data|input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}} create_response = self._run("identifier_single_in_repeat", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(implicit_collections) == 1 output_collection = implicit_collections[0] elements = output_collection["elements"] assert len(elements) == 2 forward_output = elements[0]["object"] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=forward_output) assert output1_content.strip() == "forward", output1_content
[docs] @skip_without_tool("identifier_multiple_in_conditional") def test_identifier_multiple_in_conditional(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123", name="Normal HDA1") inputs = { "outer_cond|inner_cond|input1": {"src": "hda", "id": new_dataset1["id"]}, } create_response = self._run("identifier_multiple_in_conditional", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "Normal HDA1"
[docs] @skip_without_tool("identifier_multiple") def test_identifier_with_multiple_normal_datasets(self, history_id): new_dataset1 = self.dataset_populator.new_dataset(history_id, content="123", name="Normal HDA1") new_dataset2 = self.dataset_populator.new_dataset(history_id, content="456", name="Normal HDA2") inputs = {"input1": [{"src": "hda", "id": new_dataset1["id"]}, {"src": "hda", "id": new_dataset2["id"]}]} create_response = self._run("identifier_multiple", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(outputs) == 1 assert len(implicit_collections) == 0 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "Normal HDA1\nNormal HDA2"
[docs] @skip_without_tool("identifier_collection") def test_identifier_with_data_collection(self, history_id): element_identifiers = self.dataset_collection_populator.list_identifiers(history_id) payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = create_response.json() inputs = { "input1": {"src": "hdca", "id": dataset_collection["id"]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create_response = self._run("identifier_collection", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] jobs = create["jobs"] assert len(jobs) == 1 assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "\n".join(d["name"] for d in element_identifiers)
[docs] @skip_without_tool("identifier_in_actions") def test_identifier_in_actions(self, history_id): element_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents=["1\t2"]) payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = create_response.json() inputs = { "input": {"batch": True, "values": [{"src": "hdca", "id": dataset_collection["id"]}]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create_response = self._run("identifier_in_actions", history_id, inputs) self._assert_status_code_is(create_response, 200) create = create_response.json() outputs = create["outputs"] output1 = outputs[0] output_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output1) assert output_details["metadata_column_names"][1] == "data1", output_details
[docs] @skip_without_tool("cat1") def test_map_over_nested_collections(self, history_id): hdca_id = self.__build_nested_list(history_id) inputs = { "input1": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]}, } self._check_simple_cat1_over_nested_collections(history_id, inputs)
[docs] @skip_without_tool("collection_paired_structured_like") def test_paired_input_map_over_nested_collections(self, history_id): hdca_id = self.__build_nested_list(history_id) inputs = { "input1": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_paired_structured_like", history_id, inputs, assert_ok=True) jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(implicit_collections) == 1 implicit_collection = implicit_collections[0] assert implicit_collection["collection_type"] == "list:paired", implicit_collection["collection_type"] outer_elements = implicit_collection["elements"] assert len(outer_elements) == 2
[docs] @skip_without_tool("collection_paired_conditional_structured_like") def test_paired_input_conditional_map_over_nested_collections(self, history_id): hdca_id = self.__build_nested_list(history_id) inputs = { "cond|cond_param": "paired", "cond|input1": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_paired_conditional_structured_like", history_id, inputs, assert_ok=True) jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(implicit_collections) == 1 implicit_collection = implicit_collections[0] assert implicit_collection["collection_type"] == "list:paired", implicit_collection["collection_type"] outer_elements = implicit_collection["elements"] assert len(outer_elements) == 2
def _check_simple_cat1_over_nested_collections(self, history_id, inputs): create = self._run_cat1(history_id, inputs=inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 4 assert len(outputs) == 4 assert len(implicit_collections) == 1 implicit_collection = implicit_collections[0] self._assert_has_keys(implicit_collection, "collection_type", "elements") assert implicit_collection["collection_type"] == "list:paired" assert len(implicit_collection["elements"]) == 2 first_element, second_element = implicit_collection["elements"] assert first_element["element_identifier"] == "test0", first_element assert second_element["element_identifier"] == "test1", second_element first_object = first_element["object"] assert first_object["collection_type"] == "paired" assert len(first_object["elements"]) == 2 first_object_forward_element = first_object["elements"][0] assert outputs[0]["id"] == first_object_forward_element["object"]["id"]
[docs] @skip_without_tool("cat1") def test_map_over_two_collections(self, history_id): hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self._build_pair(history_id, ["789\n", "0ab\n"]) inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]}, "queries_0|input2": {"batch": True, "values": [{"src": "hdca", "id": hdca2_id}]}, } self._check_map_cat1_over_two_collections(history_id, inputs)
def _check_map_cat1_over_two_collections(self, history_id, inputs): response = self._run_cat1(history_id, inputs) self._assert_status_code_is(response, 200) response_object = response.json() outputs = response_object["outputs"] assert len(outputs) == 2 output1 = outputs[0] output2 = outputs[1] self.dataset_populator.wait_for_history(history_id) output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123\n789" assert output2_content.strip() == "456\n0ab" assert len(response_object["jobs"]) == 2 assert len(response_object["implicit_collections"]) == 1
[docs] @skip_without_tool("cat1") def test_map_over_two_collections_unlinked(self, history_id): hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self._build_pair(history_id, ["789\n", "0ab\n"]) inputs = { "input1": {"batch": True, "linked": False, "values": [{"src": "hdca", "id": hdca1_id}]}, "queries_0|input2": {"batch": True, "linked": False, "values": [{"src": "hdca", "id": hdca2_id}]}, } response = self._run_cat1(history_id, inputs) self._assert_status_code_is(response, 200) response_object = response.json() outputs = response_object["outputs"] assert len(outputs) == 4 assert len(response_object["jobs"]) == 4 implicit_collections = response_object["implicit_collections"] assert len(implicit_collections) == 1 implicit_collection = implicit_collections[0] assert implicit_collection["collection_type"] == "paired:paired" outer_elements = implicit_collection["elements"] assert len(outer_elements) == 2 element0, element1 = outer_elements assert element0["element_identifier"] == "forward" assert element1["element_identifier"] == "reverse" elements0 = element0["object"]["elements"] elements1 = element1["object"]["elements"] assert len(elements0) == 2 assert len(elements1) == 2 element00, element01 = elements0 assert element00["element_identifier"] == "forward" assert element01["element_identifier"] == "reverse" element10, element11 = elements1 assert element10["element_identifier"] == "forward" assert element11["element_identifier"] == "reverse" expected_contents_list = [ (element00, "123\n789\n"), (element01, "123\n0ab\n"), (element10, "456\n789\n"), (element11, "456\n0ab\n"), ] for element, expected_contents in expected_contents_list: dataset_id = element["object"]["id"] contents = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=dataset_id) assert expected_contents == contents
[docs] @skip_without_tool("cat1") def test_map_over_collected_and_individual_datasets(self, history_id): hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) new_dataset1 = self.dataset_populator.new_dataset(history_id, content="789") new_dataset2 = self.dataset_populator.new_dataset(history_id, content="0ab") inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca1_id}]}, "queries_0|input2": { "batch": True, "values": [dataset_to_param(new_dataset1), dataset_to_param(new_dataset2)], }, } response = self._run_cat1(history_id, inputs) self._assert_status_code_is(response, 200) response_object = response.json() outputs = response_object["outputs"] assert len(outputs) == 2 assert len(response_object["jobs"]) == 2 assert len(response_object["implicit_collections"]) == 1
[docs] @skip_without_tool("identifier_source") def test_default_identifier_source_map_over(self): with self.dataset_populator.test_history() as history_id: input_a_hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("A", "A content")], wait=True ).json()["outputs"][0]["id"] input_b_hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("B", "B content")], wait=True ).json()["outputs"][0]["id"] inputs = { "inputA": {"batch": True, "values": [dict(src="hdca", id=input_a_hdca_id)]}, "inputB": {"batch": True, "values": [dict(src="hdca", id=input_b_hdca_id)]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("identifier_source", history_id, inputs, assert_ok=True) for implicit_collection in create["implicit_collections"]: if implicit_collection["output_name"] == "outputA": assert implicit_collection["elements"][0]["element_identifier"] == "A" else: assert implicit_collection["elements"][0]["element_identifier"] == "B"
[docs] @skip_without_tool("collection_creates_pair") def test_map_over_collection_output(self): with self.dataset_populator.test_history() as history_id: create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"], wait=True ) hdca_id = create_response.json()["outputs"][0]["id"] inputs = { "input1": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_creates_pair", history_id, inputs, assert_ok=True) jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 assert len(implicit_collections) == 1 implicit_collection = implicit_collections[0] assert implicit_collection["collection_type"] == "list:paired", implicit_collection outer_elements = implicit_collection["elements"] assert len(outer_elements) == 2 element0, element1 = outer_elements assert element0["element_identifier"] == "data0" assert element1["element_identifier"] == "data1" pair0, pair1 = element0["object"], element1["object"] pair00, pair01 = pair0["elements"] pair10, pair11 = pair1["elements"] for pair in pair0, pair1: assert "collection_type" in pair, pair assert pair["collection_type"] == "paired", pair pair_ids = [] for pair_element in pair00, pair01, pair10, pair11: assert "object" in pair_element pair_ids.append(pair_element["object"]["id"]) self.dataset_populator.wait_for_history(history_id, assert_ok=True) expected_contents = [ "a\nc\n", "b\nd\n", "e\ng\n", "f\nh\n", ] for i in range(4): contents = self.dataset_populator.get_history_dataset_content(history_id, dataset_id=pair_ids[i]) assert expected_contents[i] == contents
[docs] @skip_without_tool("cat1") def test_cannot_map_over_incompatible_collections(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id).json()["outputs"][0]["id"] inputs = { "input1": { "batch": True, "values": [{"src": "hdca", "id": hdca1_id}], }, "queries_0|input2": { "batch": True, "values": [{"src": "hdca", "id": hdca2_id}], }, } run_response = self._run_cat1(history_id, inputs) # TODO: Fix this error checking once switch over to new API decorator # on server. assert run_response.status_code >= 400
[docs] @skip_without_tool("__FILTER_FROM_FILE__") def test_map_over_collection_structured_like(self): with self.dataset_populator.test_history() as history_id: hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("A", "A"), ("B", "B")], wait=True ).json()["outputs"][0]["id"] self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { "input": {"values": [dict(src="hdca", id=hdca_id)]}, "how|filter_source": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]}, } implicit_collections = self._run("__FILTER_FROM_FILE__", history_id, inputs, assert_ok=True)[ "implicit_collections" ] discarded_collection, filtered_collection = implicit_collections self.dataset_populator.wait_for_history(history_id, assert_ok=True) history_contents = self.dataset_populator._get_contents_request(history_id).json() # We should have a final collection count of 3 (2 nested collections, plus the input collection) new_collections = ( len([c for c in history_contents if c["history_content_type"] == "dataset_collection"]) - 1 ) assert ( new_collections == 2 ), f"Expected to generate 4 new, filtered collections, but got {new_collections} collections" assert ( filtered_collection["collection_type"] == discarded_collection["collection_type"] == "list:list" ), filtered_collection collection_details = self.dataset_populator.get_history_collection_details( history_id, hid=filtered_collection["hid"] ) assert collection_details["element_count"] == 2 first_collection_level = collection_details["elements"][0] assert first_collection_level["element_type"] == "dataset_collection" second_collection_level = first_collection_level["object"] assert second_collection_level["collection_type"] == "list" assert second_collection_level["elements"][0]["element_type"] == "hda"
[docs] @skip_without_tool("collection_type_source") def test_map_over_collection_type_source(self): with self.dataset_populator.test_history() as history_id: hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("A", "A"), ("B", "B")], wait=True ).json()["outputs"][0]["id"] self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { "input_collect": {"values": [dict(src="hdca", id=hdca_id)]}, "header": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]}, } self._run("collection_type_source", history_id, inputs, assert_ok=True, wait_for_job=True) collection_details = self.dataset_populator.get_history_collection_details(history_id, hid=4) assert collection_details["elements"][0]["object"]["elements"][0]["element_type"] == "hda"
[docs] @skip_without_tool("multi_data_param") def test_reduce_collections_legacy(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()[ "outputs" ][0]["id"] inputs = { "f1": f"__collection_reduce__|{hdca1_id}", "f2": f"__collection_reduce__|{hdca2_id}", } self._check_simple_reduce_job(history_id, inputs)
[docs] @skip_without_tool("multi_data_param") def test_reduce_collections(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()[ "outputs" ][0]["id"] inputs = { "f1": {"src": "hdca", "id": hdca1_id}, "f2": {"src": "hdca", "id": hdca2_id}, } self._check_simple_reduce_job(history_id, inputs)
[docs] @skip_without_tool("multi_data_param") def test_implicit_reduce_with_mapping(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id, wait=True).json()[ "id" ] inputs = { "f1": {"src": "hdca", "id": hdca1_id}, "f2": { "batch": True, "values": [{"src": "hdca", "map_over_type": "list", "id": hdca2_id}], }, } create = self._run("multi_data_param", history_id, inputs, assert_ok=True) jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 1 assert len(implicit_collections) == 2 output_hdca = self.dataset_populator.get_history_collection_details( history_id, hid=implicit_collections[0]["hid"] ) assert output_hdca["collection_type"] == "list"
[docs] @skip_without_tool("column_multi_param") def test_implicit_conversion_and_reduce(self): with self.dataset_populator.test_history() as history_id: self._run_implicit_collection_and_reduce(history_id=history_id, param="1")
[docs] @skip_without_tool("column_multi_param") def test_implicit_conversion_and_reduce_invalid_param(self): with self.dataset_populator.test_history() as history_id: with pytest.raises(AssertionError): self._run_implicit_collection_and_reduce(history_id=history_id, param="X") details = self.dataset_populator.get_history_dataset_details(history_id=history_id, hid=3, assert_ok=False) assert details["state"] == "error" assert "parameter 'col': an invalid option" in details["misc_info"]
def _run_implicit_collection_and_reduce(self, history_id, param): fasta_path = self.test_data_resolver.get_filename("1.fasta") with open(fasta_path) as fasta_fh: fasta_content = fasta_fh.read() response = self.dataset_collection_populator.upload_collection( history_id, "list", elements=[ { "name": "test0", "src": "pasted", "paste_content": fasta_content, "ext": "fasta", } ], wait=True, ) self._assert_status_code_is(response, 200) hdca_id = response.json()["outputs"][0]["id"] inputs = { "input1": {"src": "hdca", "id": hdca_id}, "col": param, } create = self._run("column_multi_param", history_id, inputs, assert_ok=True) jobs = create["jobs"] assert len(jobs) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, hid=3) assert content.strip() == "hg17", content
[docs] @skip_without_tool("multi_data_repeat") def test_reduce_collections_in_repeat(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) inputs = { "outer_repeat_0|f1": {"src": "hdca", "id": hdca1_id}, } create = self._run("multi_data_repeat", history_id, inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] assert len(jobs) == 1 assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "123\n456", output1_content
[docs] @skip_without_tool("multi_data_repeat") def test_reduce_collections_in_repeat_legacy(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) inputs = { "outer_repeat_0|f1": f"__collection_reduce__|{hdca1_id}", } create = self._run("multi_data_repeat", history_id, inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] assert len(jobs) == 1 assert len(outputs) == 1 output1 = outputs[0] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) assert output1_content.strip() == "123\n456", output1_content
[docs] @skip_without_tool("multi_data_param") def test_reduce_multiple_lists_on_multi_data(self): with self.dataset_populator.test_history() as history_id: hdca1_id = self._build_pair(history_id, ["123\n", "456\n"]) hdca2_id = self.dataset_collection_populator.create_list_in_history(history_id, wait=True).json()[ "outputs" ][0]["id"] inputs = { "f1": [{"src": "hdca", "id": hdca1_id}, {"src": "hdca", "id": hdca2_id}], "f2": [{"src": "hdca", "id": hdca1_id}], } create = self._run("multi_data_param", history_id, inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] assert len(jobs) == 1 assert len(outputs) == 2 output1, output2 = outputs output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123\n456\nTestData123\nTestData123\nTestData123" assert output2_content.strip() == "123\n456"
def _check_simple_reduce_job(self, history_id, inputs): create = self._run("multi_data_param", history_id, inputs, assert_ok=True) outputs = create["outputs"] jobs = create["jobs"] assert len(jobs) == 1 assert len(outputs) == 2 output1, output2 = outputs output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123\n456", output1_content assert len(output2_content.strip().split("\n")) == 3, output2_content
[docs] @skip_without_tool("collection_paired_test") def test_subcollection_mapping(self): with self.dataset_populator.test_history() as history_id: hdca_list_id = self.__build_nested_list(history_id) inputs = { "f1": { "batch": True, "values": [{"src": "hdca", "map_over_type": "paired", "id": hdca_list_id}], } } self._check_simple_subcollection_mapping(history_id, inputs)
def _check_simple_subcollection_mapping(self, history_id, inputs): # Following wait not really needed - just getting so many database # locked errors with sqlite. self.dataset_populator.wait_for_history(history_id, assert_ok=True) outputs = self._run_and_get_outputs("collection_paired_test", history_id, inputs) assert len(outputs), 2 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123\n456", output1_content assert output2_content.strip() == "789\n0ab", output2_content
[docs] @skip_without_tool("collection_mixed_param") def test_combined_mapping_and_subcollection_mapping(self): with self.dataset_populator.test_history() as history_id: nested_list_id = self.__build_nested_list(history_id) create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["xxx\n", "yyy\n"], wait=True ) list_id = create_response.json()["outputs"][0]["id"] inputs = { "f1": { "batch": True, "values": [{"src": "hdca", "map_over_type": "paired", "id": nested_list_id}], }, "f2": { "batch": True, "values": [{"src": "hdca", "id": list_id}], }, } self._check_combined_mapping_and_subcollection_mapping(history_id, inputs)
def _check_combined_mapping_and_subcollection_mapping(self, history_id, inputs): self.dataset_populator.wait_for_history(history_id, assert_ok=True) outputs = self._run_and_get_outputs("collection_mixed_param", history_id, inputs) assert len(outputs), 2 output1 = outputs[0] output2 = outputs[1] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output1) output2_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output2) assert output1_content.strip() == "123\n456\nxxx", output1_content assert output2_content.strip() == "789\n0ab\nyyy", output2_content def _check_implicit_collection_populated(self, run_response): implicit_collections = run_response["implicit_collections"] assert implicit_collections for implicit_collection in implicit_collections: assert implicit_collection["populated_state"] == "ok" def _cat1_outputs(self, history_id, inputs): return self._run_outputs(self._run_cat1(history_id, inputs)) def _run_and_get_outputs(self, tool_id, history_id, inputs=None, tool_version=None): if inputs is None: inputs = dict() return self._run_outputs(self._run(tool_id, history_id, inputs, tool_version=tool_version)) def _run_outputs(self, create_response): self._assert_status_code_is(create_response, 200) return create_response.json()["outputs"] def _run_cat1(self, history_id, inputs, assert_ok=False, **kwargs): return self._run("cat1", history_id, inputs, assert_ok=assert_ok, **kwargs) def __tool_ids(self): index = self._get("tool_panels/default") tools_index = index.json() # In panels by default, so flatten out sections... tool_ids = [] for id, tool_or_section in tools_index.items(): if "tools" in tool_or_section: tool_ids.extend([t for t in tool_or_section["tools"] if isinstance(t, str)]) else: tool_ids.append(id) return tool_ids
[docs] @skip_without_tool("collection_cat_group_tag_multiple") def test_group_tag_selection(self, history_id): input_hdca_id = self.__build_group_list(history_id) inputs = { "input1": {"src": "hdca", "id": input_hdca_id}, "group": "condition:treated", } self.dataset_populator.wait_for_history(history_id, assert_ok=True) response = self._run("collection_cat_group_tag", history_id, inputs, assert_ok=True) outputs = response["outputs"] assert len(outputs) == 1 output = outputs[0] output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output) assert output_content.strip() == "123\n456"
[docs] @skip_without_tool("collection_cat_group_tag_multiple") def test_group_tag_selection_multiple(self, history_id): input_hdca_id = self.__build_group_list(history_id) inputs = { "input1": {"src": "hdca", "id": input_hdca_id}, "groups": "condition:treated,type:single", } self.dataset_populator.wait_for_history(history_id, assert_ok=True) response = self._run("collection_cat_group_tag_multiple", history_id, inputs, assert_ok=True) outputs = response["outputs"] assert len(outputs) == 1 output = outputs[0] output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output) assert output_content.strip() == "123\n456\n456\n0ab"
[docs] @skip_without_tool("expression_forty_two") def test_galaxy_expression_tool_simplest(self): history_id = self.dataset_populator.new_history() run_response = self._run("expression_forty_two", history_id) self._assert_status_code_is(run_response, 200) self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id) assert output_content == "42"
[docs] @skip_without_tool("expression_parse_int") def test_galaxy_expression_tool_simple(self): history_id = self.dataset_populator.new_history() inputs = { "input1": "7", } run_response = self._run("expression_parse_int", history_id, inputs) self._assert_status_code_is(run_response, 200) self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id) assert output_content == "7"
[docs] @skip_without_tool("expression_log_line_count") def test_galaxy_expression_metadata(self): history_id = self.dataset_populator.new_history() new_dataset1 = self.dataset_populator.new_dataset( history_id, content="1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n11\n12\n13\n14" ) inputs = { "input1": dataset_to_param(new_dataset1), } run_response = self._run("expression_log_line_count", history_id, inputs) self._assert_status_code_is(run_response, 200) self.dataset_populator.wait_for_history(history_id, assert_ok=True) output_content = self.dataset_populator.get_history_dataset_content(history_id) assert output_content == "3"
[docs] @skip_without_tool("cat1") def test_run_deferred_dataset(self, history_id): details = self.dataset_populator.create_deferred_hda( history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed" ) inputs = { "input1": dataset_to_param(details), } outputs = self._cat1_outputs(history_id, inputs=inputs) output = outputs[0] details = self.dataset_populator.get_history_dataset_details( history_id, dataset=output, wait=True, assert_ok=True ) assert details["state"] == "ok" output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output) assert output_content.startswith("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
[docs] @skip_without_tool("metadata_bam") def test_run_deferred_dataset_with_metadata_options_filter(self, history_id): details = self.dataset_populator.create_deferred_hda( history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam" ) inputs = {"input_bam": dataset_to_param(details), "ref_names": "chrM"} run_response = self.dataset_populator.run_tool(tool_id="metadata_bam", inputs=inputs, history_id=history_id) output = run_response["outputs"][0] output_details = self.dataset_populator.get_history_dataset_details( history_id, dataset=output, wait=True, assert_ok=True ) assert output_details["state"] == "ok" output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output) assert output_content.startswith("chrM")
[docs] @skip_without_tool("pileup") def test_metadata_validator_on_deferred_input(self, history_id): deferred_bam_details = self.dataset_populator.create_deferred_hda( history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam" ) fasta1_contents = open(self.get_filename("1.fasta")).read() fasta = self.dataset_populator.new_dataset(history_id, content=fasta1_contents) inputs = {"input1": dataset_to_param(deferred_bam_details), "reference": dataset_to_param(fasta)} run_response = self.dataset_populator.run_tool(tool_id="pileup", inputs=inputs, history_id=history_id) self.dataset_populator.wait_for_job(run_response["jobs"][0]["id"], assert_ok=True)
[docs] @pytest.mark.xfail @skip_without_tool("pileup") def test_metadata_validator_can_fail_on_deferred_input(self, history_id): # This test fails because we just skip the validator # Fixing this is a TODO deferred_bam_details = self.dataset_populator.create_deferred_hda( history_id, "https://github.com/galaxyproject/galaxy/blob/dev/test-data/3unsorted.bam?raw=true", ext="unsorted.bam", ) fasta1_contents = open(self.get_filename("1.fasta")).read() fasta = self.dataset_populator.new_dataset(history_id, content=fasta1_contents) inputs = {"input1": dataset_to_param(deferred_bam_details), "reference": dataset_to_param(fasta)} run_response = self.dataset_populator.run_tool(tool_id="pileup", inputs=inputs, history_id=history_id) self.dataset_populator.wait_for_job(run_response["jobs"][0]["id"], assert_ok=False) job_id = run_response["jobs"][0]["id"] job_details = self.dataset_populator.get_job_details(job_id=job_id).json() assert job_details["state"] == "failed"
[docs] @skip_without_tool("cat1") def test_run_deferred_mapping(self, history_id: str): elements = [ { "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed", "deferred": True, "ext": "bed", } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "here is a name", } ] payload = { "history_id": history_id, "targets": json.dumps(targets), } fetch_response = self.dataset_populator.fetch(payload, wait=True) dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) hdca_id = dataset_collection["id"] inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } run_response = self._run_cat1(history_id, inputs).json() hdca_id = run_response["implicit_collections"][0]["id"] dataset_collection = self.dataset_populator.get_history_collection_details(history_id, id=hdca_id) elements = dataset_collection["elements"] assert len(elements) == 1 object_0 = elements[0]["object"] assert isinstance(object_0, dict) assert object_0["state"] == "ok" output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=object_0) assert output_content.startswith("chr22 30128507 31828507 uc003bnx.1_cds_2_0_chr22_29227_f 0 +")
[docs] @skip_without_tool("cat_list") def test_run_deferred_list_multi_data_reduction(self, history_id: str): elements = [ { "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed", "deferred": True, "ext": "bed", } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "here is a name", } ] payload = { "history_id": history_id, "targets": json.dumps(targets), } fetch_response = self.dataset_populator.fetch(payload, wait=True) dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) hdca_id = dataset_collection["id"] inputs = { "input1": {"src": "hdca", "id": hdca_id}, } run_response = self._run("cat_list", history_id, inputs, assert_ok=True) output_dataset = run_response["outputs"][0] output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output_dataset) assert output_content.startswith("chr22 30128507 31828507 uc003bnx.1_cds_2_0_chr22_29227_f 0 +")
[docs] @skip_without_tool("cat_list") def test_run_deferred_nested_list_input(self, history_id: str): elements = [ { "name": "outer", "elements": [ { "src": "url", "name": "forward", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed 4", "deferred": True, "ext": "bed", }, { "src": "url", "name": "reverse", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", "info": "my cool bed 1", "deferred": True, "ext": "bed", }, ], }, ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list:paired", "name": "here is a name", } ] payload = { "history_id": history_id, "targets": json.dumps(targets), } fetch_response = self.dataset_populator.fetch(payload, wait=True) dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) assert dataset_collection["collection_type"] == "list:paired" result_elements = dataset_collection["elements"] assert len(result_elements) == 1 outer_element = result_elements[0] assert isinstance(outer_element, dict) inner_dataset_coll = outer_element["object"] assert isinstance(inner_dataset_coll, dict) inner_elements = inner_dataset_coll["elements"] assert isinstance(inner_elements, list) assert len(inner_elements) == 2 hdca_id = dataset_collection["id"] inputs = { "input1": {"src": "hdca", "id": hdca_id}, } run_response = self._run("collection_nested_test", history_id, inputs, assert_ok=True, wait_for_job=True) output_dataset = run_response["outputs"][1] output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output_dataset) assert output_content.startswith("chr22 30128507 31828507 uc003bnx.1_cds_2_0_chr22_29227_f 0 +")
[docs] @skip_without_tool("collection_paired_structured_like") def test_deferred_map_over_nested_collections(self, history_id): elements = [ { "name": "outer1", "elements": [ { "src": "url", "name": "forward", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed 4", "deferred": True, "ext": "bed", }, { "src": "url", "name": "reverse", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", "info": "my cool bed 1", "deferred": True, "ext": "bed", }, ], }, { "name": "outer2", "elements": [ { "src": "url", "name": "forward", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed 4", "deferred": True, "ext": "bed", }, { "src": "url", "name": "reverse", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", "info": "my cool bed 1", "deferred": True, "ext": "bed", }, ], }, ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list:paired", "name": "here is a name", } ] payload = { "history_id": history_id, "targets": json.dumps(targets), } fetch_response = self.dataset_populator.fetch(payload, wait=True) dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) hdca_id = dataset_collection["id"] inputs = { "input1": {"batch": True, "values": [dict(map_over_type="paired", src="hdca", id=hdca_id)]}, } self.dataset_populator.wait_for_history(history_id, assert_ok=True) create = self._run("collection_paired_structured_like", history_id, inputs, assert_ok=True) jobs = create["jobs"] implicit_collections = create["implicit_collections"] assert len(jobs) == 2 self.dataset_populator.wait_for_jobs(jobs, assert_ok=True) assert len(implicit_collections) == 1 implicit_collection = implicit_collections[0] assert implicit_collection["collection_type"] == "list:paired", implicit_collection["collection_type"] outer_elements = implicit_collection["elements"] assert len(outer_elements) == 2 element0 = outer_elements[0] pair1 = element0["object"] hda = pair1["elements"][0]["object"] output1_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=hda, wait=False) assert output1_content.startswith("chr22\t30128507\t31828507\tuc003bnx.1_cds_2_0_chr22_29227_f\t0\t+")
def __build_group_list(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list", elements=[ { "name": "test0", "src": "pasted", "paste_content": "123\n", "ext": "txt", "tags": ["group:type:paired-end", "group:condition:treated"], }, { "name": "test1", "src": "pasted", "paste_content": "456\n", "ext": "txt", "tags": ["group:type:single", "group:condition:treated"], }, { "name": "test2", "src": "pasted", "paste_content": "789\n", "ext": "txt", "tags": ["group:type:paired-end", "group:condition:untreated"], }, { "name": "test3", "src": "pasted", "paste_content": "0ab\n", "ext": "txt", "tags": ["group:type:single", "group:condition:untreated"], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] return hdca_list_id def __build_nested_list(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "txt"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "txt"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "txt"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "txt"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] return hdca_list_id def _build_pair(self, history_id, contents, run_cat=False): create_response = self.dataset_collection_populator.create_pair_in_history( history_id, contents=contents, direct_upload=True, wait=True ) hdca_id = create_response.json()["output_collections"][0]["id"] inputs = { "input1": {"batch": True, "values": [dict(src="hdca", id=hdca_id)]}, } if run_cat: outputs = self._run_cat(history_id, inputs=inputs, assert_ok=True) hdca_id = outputs["implicit_collections"][0]["id"] return hdca_id def _assert_dataset_permission_denied_response(self, response): # TODO: This should be 403, should just need to throw more specific exception in the # Galaxy code. assert response.status_code != 200 # err_message = response.json()["err_msg"] # assert "User does not have permission to use a dataset" in err_message, err_message @contextlib.contextmanager def _different_user_and_history(self): with self._different_user(): with self.dataset_populator.test_history() as other_history_id: yield other_history_id
[docs]def dataset_to_param(dataset): return dict(src="hda", id=dataset["id"])