Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_dataset_collections

import json
import zipfile
from io import BytesIO
from pathlib import Path
from urllib.parse import quote

from galaxy.schema.schema import SampleSheetColumnDefinitions
from galaxy.util import galaxy_root_path
from galaxy.util.unittest_utils import skip_if_github_down
from galaxy_test.base.api_asserts import (
    assert_has_key,
    assert_object_id_error,
    assert_status_code_is,
)
from galaxy_test.base.decorators import requires_new_user
from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    skip_without_tool,
)
from ._framework import ApiTestCase

# copy of unit test definition in test_sample_sheet_workbook.py - maybe just serialize it as JSON?
TEST_COLUMN_DEFINITIONS_1: SampleSheetColumnDefinitions = [
    {
        "name": "replicate number",
        "type": "int",
        "description": "The replicate number of this sample.",
        "default_value": 0,
        "optional": False,
    },
    {
        "name": "treatment",
        "type": "string",
        "restrictions": ["treatment1", "treatment2", "none"],
        "description": "The treatment code for this sample.",
        "default_value": "none",
        "optional": False,
    },
    {
        "name": "is control?",
        "type": "boolean",
        "description": "Was this sample a control? If TRUE, please ensure treatment is set to none.",
        "default_value": True,
        "optional": False,
    },
]


[docs] class TestDatasetCollectionsApi(ApiTestCase): dataset_populator: DatasetPopulator
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_create_pair_from_history(self): with self.dataset_populator.test_history(require_new=False) as history_id: payload = self.dataset_collection_populator.create_pair_payload( history_id, instance_type="history", ) create_response = self.dataset_populator.fetch(payload, wait=True) dataset_collection = self._check_create_response(create_response) returned_datasets = dataset_collection["elements"] assert len(returned_datasets) == 2, dataset_collection
[docs] def test_create_list_from_history(self): with self.dataset_populator.test_history(require_new=False) as history_id: element_identifiers = self.dataset_collection_populator.list_identifiers(history_id) payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_datasets = dataset_collection["elements"] assert len(returned_datasets) == 3, dataset_collection
[docs] def test_create_list_of_existing_pairs(self): with self.dataset_populator.test_history(require_new=False) as history_id: pair_payload = self.dataset_collection_populator.create_pair_payload( history_id, instance_type="history", ) pair_create_response = self._post("tools/fetch", pair_payload, json=True) dataset_collection = self._check_create_response(pair_create_response) hdca_id = dataset_collection["id"] element_identifiers = [dict(name="test1", src="hdca", id=hdca_id)] payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection
[docs] def test_create_list_of_new_pairs(self): with self.dataset_populator.test_history(require_new=False) as history_id: identifiers = self.dataset_collection_populator.nested_collection_identifiers(history_id, "list:paired") payload = dict( collection_type="list:paired", instance_type="history", history_id=history_id, name="a nested collection", element_identifiers=identifiers, ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) assert dataset_collection["collection_type"] == "list:paired" assert dataset_collection["name"] == "a nested collection" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection pair_1_element = returned_collections[0] self._assert_has_keys(pair_1_element, "element_identifier", "element_index", "object") assert pair_1_element["element_identifier"] == "test_level_1", pair_1_element assert pair_1_element["element_index"] == 0, pair_1_element pair_1_object = pair_1_element["object"] self._assert_has_keys(pair_1_object, "collection_type", "elements", "element_count") assert pair_1_object["collection_type"] == "paired" assert pair_1_object["populated"] is True pair_elements = pair_1_object["elements"] assert len(pair_elements) == 2 pair_1_element_1 = pair_elements[0] assert pair_1_element_1["element_index"] == 0
[docs] def test_create_paried_or_unpaired(self, history_id): collection_name = "a singleton in a paired_or_unpaired collection" contents = [ ("unpaired", "1\t2\t3"), ] single_identifier = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name=collection_name, instance_type="history", history_id=history_id, element_identifiers=single_identifier, collection_type="paired_or_unpaired", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) assert dataset_collection["collection_type"] == "paired_or_unpaired" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection
[docs] def test_create_record(self, history_id): contents = [ ("condition", "1\t2\t3"), ("control1", "4\t5\t6"), ("control2", "7\t8\t9"), ] record_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) fields = [ {"name": "condition", "type": "File"}, {"name": "control1", "type": "File"}, {"name": "control2", "type": "File"}, ] payload = dict( name="a record", instance_type="history", history_id=history_id, element_identifiers=record_identifiers, collection_type="record", fields=fields, ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) assert dataset_collection["collection_type"] == "record" assert dataset_collection["name"] == "a record" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 3, dataset_collection record_pos_0_element = returned_collections[0] self._assert_has_keys(record_pos_0_element, "element_index") record_pos_0_object = record_pos_0_element["object"] self._assert_has_keys(record_pos_0_object, "name", "history_content_type")
[docs] def test_record_requires_fields(self, history_id): contents = [ ("condition", "1\t2\t3"), ("control1", "4\t5\t6"), ("control2", "7\t8\t9"), ] record_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="a record", instance_type="history", history_id=history_id, element_identifiers=json.dumps(record_identifiers), collection_type="record", ) create_response = self._post("dataset_collections", payload) self._assert_status_code_is(create_response, 400)
[docs] def test_record_auto_fields(self, history_id): contents = [ ("condition", "1\t2\t3"), ("control1", "4\t5\t6"), ("control2", "7\t8\t9"), ] record_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="a record", instance_type="history", history_id=history_id, element_identifiers=record_identifiers, collection_type="record", fields="auto", ) create_response = self._post("dataset_collections", payload, json=True) self._check_create_response(create_response)
[docs] def test_record_field_validation(self, history_id): contents = [ ("condition", "1\t2\t3"), ("control1", "4\t5\t6"), ("control2", "7\t8\t9"), ] record_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) too_few_fields = [ {"name": "condition", "type": "File"}, {"name": "control1", "type": "File"}, ] too_many_fields = [ {"name": "condition", "type": "File"}, {"name": "control1", "type": "File"}, {"name": "control2", "type": "File"}, {"name": "control3", "type": "File"}, ] wrong_name_fields = [ {"name": "condition", "type": "File"}, {"name": "control1", "type": "File"}, {"name": "control3", "type": "File"}, ] for fields in [too_few_fields, too_many_fields, wrong_name_fields]: payload = dict( name="a record", instance_type="history", history_id=history_id, element_identifiers=json.dumps(record_identifiers), collection_type="record", fields=json.dumps(fields), ) create_response = self._post("dataset_collections", payload) self._assert_status_code_is(create_response, 400)
[docs] def test_sample_sheet_column_definition_problems(self, history_id): contents = [ ("sample1", "1\t2\t3"), ("sample2", "4\t5\t6"), ] sample_sheet_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="my cool sample sheet", instance_type="history", history_id=history_id, element_identifiers=sample_sheet_identifiers, collection_type="sample_sheet", column_definitions=[{"type": "int", "name": "replicate", "optional": False}], rows={"sample1": [42], "sample2": [45]}, ) create_response = self._post("dataset_collections", payload, json=True) self._check_create_response(create_response) payload["column_definitions"] = [{"type": "intx"}] create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400) payload["column_definitions"] = [{"typex": "int"}] create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400) payload["column_definitions"] = [{"type": "int", "restrictions": "wrongtype", "name": "replicate"}] create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400) payload["column_definitions"] = [ {"type": "int", "name": "replicate", "validators": [{"type": "expression", "expression": "False"}]} ] create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400)
[docs] def test_sample_sheet_element_identifier_column_type(self, history_id): contents = [ ("sample1", "1\t2\t3"), ("sample2", "4\t5\t6"), ] sample_sheet_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="my cool sample sheet", instance_type="history", history_id=history_id, element_identifiers=sample_sheet_identifiers, collection_type="sample_sheet", column_definitions=[{"type": "element_identifier", "name": "matched_element", "optional": False}], rows={"sample1": ["sample2"], "sample2": ["sample1"]}, ) create_response = self._post("dataset_collections", payload, json=True) self._check_create_response(create_response) # should not allow collection creation if element identifiers are not matching payload["rows"] = {"sample1": ["noinsamplesheet"], "sample2": ["noinsamplesheet"]} create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400)
[docs] def test_sample_sheet_of_pairs_creation(self, history_id): contents = [ "1\t2\t3", "4\t5\t6", ] pair_identifiers = self.dataset_collection_populator.pair_identifiers(history_id, contents) identifiers = [ { "name": "sample1", "collection_type": "paired", "src": "new_collection", "element_identifiers": pair_identifiers, } ] payload = dict( name="my cool sample sheet", instance_type="history", history_id=history_id, element_identifiers=identifiers, collection_type="sample_sheet:paired", column_definitions=[{"type": "int", "name": "replicate", "default_value": 0, "optional": False}], rows={"sample1": [42]}, ) create_response = self._post("dataset_collections", payload, json=True) print(create_response.json()) self._check_create_response(create_response) dataset_collection = create_response.json() assert dataset_collection["collection_type"] == "sample_sheet:paired" assert dataset_collection["name"] == "my cool sample sheet" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection sheet_row_0_element = returned_collections[0] self._assert_has_keys(sheet_row_0_element, "element_index", "columns") columns = sheet_row_0_element["columns"] assert len(columns) == 1 assert columns[0] == 42 hdca_id = dataset_collection["id"] dataset_collection_url = f"/api/dataset_collections/{hdca_id}" dataset_collection = self._get(dataset_collection_url).json() assert dataset_collection["id"] == hdca_id assert dataset_collection["collection_type"] == "sample_sheet:paired" assert dataset_collection["column_definitions"] is not None
[docs] def test_sample_sheet_validating_against_column_definition(self, history_id): contents = [ ("sample1", "1\t2\t3"), ("sample2", "4\t5\t6"), ] sample_sheet_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="my cool sample sheet", instance_type="history", history_id=history_id, element_identifiers=sample_sheet_identifiers, collection_type="sample_sheet", column_definitions=[{"type": "int", "name": "replicate", "default_value": 0, "optional": False}], rows={"sample1": [42], "sample2": [45]}, ) create_response = self._post("dataset_collections", payload, json=True) print(create_response.json()) self._check_create_response(create_response) # now the datatype of the row data is wrong.... payload["column_definitions"] = [ {"type": "string", "name": "replicate", "default_value": "", "optional": False} ] create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400) print(create_response.json()) # now the row values are too small for the supplied validator payload["column_definitions"] = [ {"type": "int", "name": "replicate", "validators": [{"type": "in_range", "min": 60}]} ] create_response = self._post("dataset_collections", payload, json=True) assert_status_code_is(create_response, 400)
[docs] def test_sample_sheet_requires_columns(self, history_id): contents = [ ("sample1", "1\t2\t3"), ("sample2", "4\t5\t6"), ] sample_sheet_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="my cool sample sheet", instance_type="history", history_id=history_id, element_identifiers=sample_sheet_identifiers, collection_type="sample_sheet", column_definitions=[{"type": "int", "name": "replicate", "optional": False}], rows={"sample1": [42], "sample2": [45]}, ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) self._assert_has_keys(dataset_collection, "collection_type", "column_definitions") column_definitions = dataset_collection["column_definitions"] assert len(column_definitions) == 1 self._assert_has_keys(column_definitions[0], "type") assert column_definitions[0]["type"] == "int" # TODO: restore assertion and test before merging... # assert something about column definition here.... assert dataset_collection["collection_type"] == "sample_sheet" assert dataset_collection["name"] == "my cool sample sheet" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 2, dataset_collection sheet_row_0_element = returned_collections[0] self._assert_has_keys(sheet_row_0_element, "element_index", "columns") record_pos_0_object = sheet_row_0_element["object"] self._assert_has_keys(record_pos_0_object, "name", "history_content_type") row_0 = sheet_row_0_element["columns"] assert row_0[0] == 42 sheet_row_1_element = returned_collections[1] self._assert_has_keys(sheet_row_1_element, "element_index", "columns") row_1 = sheet_row_1_element["columns"] assert row_1[0] == 45
# TODO: test case where column definition does not match supplied data
[docs] @skip_without_tool("cat1") def test_sample_sheet_map_over_preserves_columns(self, history_id): """Test that mapping cat1 over a sample sheet preserves columns metadata.""" # Create a sample sheet collection with columns metadata contents = [ ("sample1", "content1"), ("sample2", "content2"), ("sample3", "content3"), ] sample_sheet_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="test sample sheet", instance_type="history", history_id=history_id, element_identifiers=sample_sheet_identifiers, collection_type="sample_sheet", column_definitions=[ {"type": "int", "name": "replicate", "optional": False}, {"type": "string", "name": "condition", "optional": False}, ], rows={ "sample1": [1, "control"], "sample2": [2, "treatment"], "sample3": [3, "control"], }, ) create_response = self._post("dataset_collections", payload, json=True) sample_sheet = self._check_create_response(create_response) hdca_id = sample_sheet["id"] # Verify the input sample sheet has columns metadata input_elements = sample_sheet["elements"] assert len(input_elements) == 3 assert input_elements[0]["columns"] == [1, "control"] assert input_elements[1]["columns"] == [2, "treatment"] assert input_elements[2]["columns"] == [3, "control"] # Run cat1 on the sample sheet collection in batch mode (mapping over it) inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } run = self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id) self.dataset_populator.wait_for_history_jobs(history_id) # Get the implicit output collection implicit_collections = run["implicit_collections"] assert len(implicit_collections) == 1, f"Expected 1 implicit collection, got {len(implicit_collections)}" output_collection = implicit_collections[0] # Fetch the full collection details including elements collection_details = self.dataset_populator.get_history_collection_details( history_id, content_id=output_collection["id"] ) assert collection_details["column_definitions"] == sample_sheet["column_definitions"] # Verify that the output collection preserved the columns metadata output_elements = collection_details["elements"] assert len(output_elements) == 3, f"Expected 3 output elements, got {len(output_elements)}" # Check that columns metadata was preserved for each element self._assert_has_keys(output_elements[0], "columns") assert output_elements[0]["columns"] == [ 1, "control", ], f"Expected [1, 'control'], got {output_elements[0]['columns']}" assert output_elements[0]["element_identifier"] == "sample1" self._assert_has_keys(output_elements[1], "columns") assert output_elements[1]["columns"] == [ 2, "treatment", ], f"Expected [2, 'treatment'], got {output_elements[1]['columns']}" assert output_elements[1]["element_identifier"] == "sample2" self._assert_has_keys(output_elements[2], "columns") assert output_elements[2]["columns"] == [ 3, "control", ], f"Expected [3, 'control'], got {output_elements[2]['columns']}" assert output_elements[2]["element_identifier"] == "sample3"
[docs] def test_copy_sample_sheet_collection(self, history_id): """Test that copying a sample sheet collection preserves columns metadata.""" # Create a sample sheet collection with columns metadata contents = [ ("sample1", "content1"), ("sample2", "content2"), ] sample_sheet_identifiers = self.dataset_collection_populator.list_identifiers(history_id, contents) payload = dict( name="original sample sheet", instance_type="history", history_id=history_id, element_identifiers=sample_sheet_identifiers, collection_type="sample_sheet", column_definitions=[ {"type": "int", "name": "replicate", "optional": False}, {"type": "string", "name": "condition", "optional": False}, ], rows={ "sample1": [1, "control"], "sample2": [2, "treatment"], }, ) create_response = self._post("dataset_collections", payload, json=True) original_collection = self._check_create_response(create_response) original_hdca_id = original_collection["id"] # Verify the original sample sheet has columns metadata original_elements = original_collection["elements"] assert len(original_elements) == 2 assert original_elements[0]["columns"] == [1, "control"] assert original_elements[1]["columns"] == [2, "treatment"] # Copy the collection using the new copy_collection method copy_response = self.dataset_collection_populator.copy_collection( history_id, original_hdca_id, copy_elements=True, wait=False ) copied_collection = copy_response.json() # Fetch the full details of the copied collection copied_collection_details = self.dataset_populator.get_history_collection_details( history_id, content_id=copied_collection["id"] ) # Verify the copied collection has the same columns metadata copied_elements = copied_collection_details["elements"] assert len(copied_elements) == 2, f"Expected 2 elements, got {len(copied_elements)}" # Check that columns metadata was preserved for each element self._assert_has_keys(copied_elements[0], "columns") assert copied_elements[0]["columns"] == [ 1, "control", ], f"Expected [1, 'control'], got {copied_elements[0]['columns']}" assert copied_elements[0]["element_identifier"] == "sample1" self._assert_has_keys(copied_elements[1], "columns") assert copied_elements[1]["columns"] == [ 2, "treatment", ], f"Expected [2, 'treatment'], got {copied_elements[1]['columns']}" assert copied_elements[1]["element_identifier"] == "sample2" # Verify column definitions are preserved assert copied_collection_details["column_definitions"] == original_collection["column_definitions"] assert copied_collection_details["collection_type"] == "sample_sheet"
[docs] def test_workbook_download(self): xlsx_file = self.dataset_collection_populator.download_workbook( "sample_sheet", [ {"name": "condition", "type": "string", "default_value": "", "optional": False}, {"name": "replicate", "type": "int", "default_value": 0, "optional": False}, ], ) self._assert_file_looks_like_xlsx(xlsx_file)
[docs] def test_workbook_download_for_collection(self): with self.dataset_populator.test_history(require_new=False) as history_id: hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("sample1", "sample1 contents")], wait=True ).json()["outputs"][0]["id"] xlsx_file = self.dataset_collection_populator.download_workbook_for_collection( hdca_id, [ {"name": "condition", "type": "string", "default_value": "", "optional": False}, {"name": "replicate", "type": "int", "default_value": 0, "optional": False}, ], ) self._assert_file_looks_like_xlsx(xlsx_file)
def _assert_file_looks_like_xlsx(self, xlsx_file: str): # Check the file header with open(xlsx_file, "rb") as file: header = file.read(4) # The ZIP file signature is 0x50 0x4B 0x03 0x04 return header == b"\x50\x4b\x03\x04"
[docs] def test_workbook_parse(self): xlsx_path = Path(galaxy_root_path) / "lib" / "galaxy" / "model" / "unittest_utils" / "filled_in_workbook_1.xlsx" example_as_bytes = xlsx_path.read_bytes() response = self.dataset_collection_populator.parse_workbook( example_as_bytes, "sample_sheet", TEST_COLUMN_DEFINITIONS_1 ) assert_has_key(response, "rows") rows = response["rows"] assert rows[0]["url"] == "https://zenodo.org/records/3263975/files/DRR000770.fastqsanger.gz" assert rows[0]["replicate number"] == 1 assert rows[0]["treatment"] == "treatment1" assert rows[0]["is control?"] is False assert rows[1]["replicate number"] == 2 assert rows[1]["treatment"] == "treatment1"
[docs] def test_workbook_parse_for_collection(self): with self.dataset_populator.test_history(require_new=False) as history_id: hdca_id = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("sample1", "sample1 contents")], wait=True ).json()["outputs"][0]["id"] xlsx_path = ( Path(galaxy_root_path) / "lib" / "galaxy" / "model" / "unittest_utils" / "filled_in_workbook_from_collection.xlsx" ) example_as_bytes = xlsx_path.read_bytes() response = self.dataset_collection_populator.parse_workflow_for_collection( hdca_id, example_as_bytes, TEST_COLUMN_DEFINITIONS_1 ) assert_has_key(response, "rows") assert_has_key(response, "elements")
[docs] def test_list_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: fetch_response = self.dataset_collection_populator.create_list_in_history( history_id, direct_upload=True ).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 3, dataset_collection create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]" collection_name = dataset_collection["name"] for element, zip_path in zip(returned_dce, namelist): assert f"{collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_pair_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: fetch_response = self.dataset_collection_populator.create_pair_in_history( history_id, direct_upload=True ).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 2, dataset_collection hdca_id = dataset_collection["id"] create_response = self._download_dataset_collection(history_id=history_id, hdca_id=hdca_id) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 2, f"Expected 2 elements in [{namelist}]" collection_name = dataset_collection["name"] for element, zip_path in zip(returned_dce, namelist): assert f"{collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_list_pair_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: fetch_response = self.dataset_collection_populator.create_list_of_pairs_in_history(history_id).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection list_collection_name = dataset_collection["name"] pair = returned_dce[0] create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 2, f"Expected 2 elements in [{namelist}]" pair_collection_name = pair["element_identifier"] for element, zip_path in zip(pair["object"]["elements"], namelist): assert ( f"{list_collection_name}/{pair_collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path )
[docs] def test_list_list_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history( history_id, wait=True ).json() returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]"
[docs] def test_list_list_list_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history( history_id, collection_type="list:list:list", wait=True, ).json() returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]"
[docs] def test_download_non_english_characters(self): with self.dataset_populator.test_history() as history_id: name = "دیتاست" payload = self.dataset_collection_populator.create_list_payload(history_id, name=name) hdca_id = self.dataset_populator.fetch(payload, wait=True).json()["outputs"][0]["id"] create_response = self._download_dataset_collection(history_id=history_id, hdca_id=hdca_id) self._assert_status_code_is(create_response, 200) assert quote(name, safe="") in create_response.headers["Content-Disposition"]
[docs] @requires_new_user def test_hda_security(self): with self.dataset_populator.test_history(require_new=False) as history_id: element_identifiers = self.dataset_collection_populator.pair_identifiers(history_id, wait=True) self.dataset_populator.make_private(history_id, element_identifiers[0]["id"]) with self._different_user(): history_id = self.dataset_populator.new_history() payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="paired", ) create_response = self._post("dataset_collections", payload, json=True) self._assert_status_code_is(create_response, 403)
[docs] def test_dataset_collection_element_security(self): with self.dataset_populator.test_history(require_new=False) as history_id: dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history( history_id, collection_type="list:list:list", wait=True, ).json() first_element = dataset_collection["elements"][0] assert first_element["model_class"] == "DatasetCollectionElement" assert first_element["element_type"] == "dataset_collection" first_element_url = f"/api/dataset_collection_element/{first_element['id']}" # Make one dataset private to check that access permissions are respected first_dataset_element = first_element["object"]["elements"][0]["object"]["elements"][0] self.dataset_populator.make_private(history_id, first_dataset_element["object"]["id"]) with self._different_user(): assert self._get(first_element_url).status_code == 403 collection_dce_response = self._get(first_element_url) collection_dce_response.raise_for_status() collection_dce = collection_dce_response.json() assert collection_dce["model_class"] == "DatasetCollectionElement" assert collection_dce["element_type"] == "dataset_collection" first_dataset_element = first_element["object"]["elements"][0]["object"]["elements"][0] assert first_dataset_element["model_class"] == "DatasetCollectionElement" assert first_dataset_element["element_type"] == "hda" first_dataset_element_url = f"/api/dataset_collection_element/{first_dataset_element['id']}" with self._different_user(): assert self._get(first_dataset_element_url).status_code == 403 dataset_dce_response = self._get(first_dataset_element_url) dataset_dce_response.raise_for_status() dataset_dce = dataset_dce_response.json() assert dataset_dce["model_class"] == "DatasetCollectionElement" assert dataset_dce["element_type"] == "hda" assert dataset_dce["object"]["model_class"] == "HistoryDatasetAssociation"
[docs] def test_enforces_unique_names(self): with self.dataset_populator.test_history(require_new=False) as history_id: element_identifiers = self.dataset_collection_populator.list_identifiers(history_id) element_identifiers[2]["name"] = element_identifiers[0]["name"] payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) self._assert_status_code_is(create_response, 400)
[docs] def test_upload_collection(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "files", "dbkey": "hg19", "info": "my cool bed", "tags": ["name:data1", "group:condition:treated", "machine:illumina"], } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "Test upload", "tags": ["name:collection1"], } ] payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = assert_one_collection_created_in_history(self.dataset_populator, history_id) assert hdca["name"] == "Test upload" hdca_tags = hdca["tags"] assert len(hdca_tags) == 1 assert "name:collection1" in hdca_tags assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" dataset0 = element0["object"] assert dataset0["file_size"] == 61 dataset_tags = dataset0["tags"] assert len(dataset_tags) == 3, dataset0
[docs] def test_upload_nested(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [{"name": "samp1", "elements": [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}]}] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list:list", "name": "Test upload", } ] payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = assert_one_collection_created_in_history(self.dataset_populator, history_id) assert hdca["name"] == "Test upload" assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "samp1"
[docs] def test_upload_collection_from_url(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "url", "url": self.dataset_populator.base64_url_for_string("hello world"), "name": "hello.txt", "info": "my cool bed", } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload) hdca = assert_one_collection_created_in_history(self.dataset_populator, history_id) assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "hello.txt" assert element0["object"]["file_size"] == 11
[docs] def test_upload_collection_deferred(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed", "deferred": True, } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload) hdca = assert_one_collection_created_in_history(self.dataset_populator, history_id) assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" object0 = element0["object"] assert object0["state"] == "deferred"
[docs] @skip_if_github_down def test_upload_collection_failed_expansion_url(self): with self.dataset_populator.test_history(require_new=False) as history_id: targets = [ { "destination": {"type": "hdca"}, "elements_from": "bagit", "collection_type": "list", "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload, assert_ok=False, wait=True) hdca = assert_one_collection_created_in_history(self.dataset_populator, history_id) assert hdca["populated"] is False assert "bagit.txt" in hdca["populated_state_message"], hdca
[docs] def test_upload_flat_sample_sheet(self): upload_flat_sample_sheet(self.dataset_populator)
[docs] def test_upload_sample_sheet_paired(self): column_definitions = [{"type": "int", "name": "replicate", "optional": False, "default_value": 0}] with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "name": "sample1", "row": [42], "elements": [ { "src": "url", "url": self.dataset_populator.base64_url_for_string("hello world forward"), "info": "my cool hello world forward", "name": "forward", }, { "src": "url", "url": self.dataset_populator.base64_url_for_string("hello world reverse"), "info": "my cool hello world reverse", "name": "forward", }, ], } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "sample_sheet:paired", "column_definitions": column_definitions, } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload) hdca = assert_one_collection_created_in_history(self.dataset_populator, history_id) assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "sample1" assert element0["columns"][0] == 42
def _assert_one_collection_created_in_history(self, history_id: str): contents = self.dataset_populator.get_history_contents_of_type(history_id, "dataset_collections") assert len(contents) == 1 hdca = contents[0] assert hdca["history_content_type"] == "dataset_collection" hdca_id = hdca["id"] return self.dataset_populator.get_history_collection_details(history_id, content_id=hdca_id) def _check_create_response(self, create_response): self._assert_status_code_is(create_response, 200) dataset_collection = create_response.json() if "output_collections" in dataset_collection: # fetch data response, we'll have to check the final response dataset_collection = dataset_collection["output_collections"][0] dataset_collection = self._get(f"dataset_collections/{dataset_collection['id']}").json() self._assert_has_keys(dataset_collection, "elements", "url", "name", "collection_type", "element_count") return dataset_collection def _download_dataset_collection(self, history_id: str, hdca_id: str): return self._get(f"histories/{history_id}/contents/dataset_collections/{hdca_id}/download")
[docs] @requires_new_user def test_collection_contents_security(self, history_id): # request contents on an hdca that doesn't belong to user hdca, contents_url = self._create_collection_contents_pair(history_id) with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 403)
[docs] @requires_new_user def test_published_collection_contents_accessible(self, history_id): # request contents on an hdca that is in a published history hdca, contents_url = self._create_collection_contents_pair(history_id) with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 403) self.dataset_populator.make_public(history_id) with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 200)
[docs] def test_collection_contents_invalid_collection(self, history_id): # request an invalid collection from a valid hdca, should get 404 hdca, contents_url = self._create_collection_contents_pair(history_id) response = self._get(contents_url) self._assert_status_code_is(response, 200) fake_collection_id = "5d7db0757a2eb7ef" fake_contents_url = f"/api/dataset_collections/{hdca['id']}/contents/{fake_collection_id}" error_response = self._get(fake_contents_url) assert_object_id_error(error_response)
[docs] def test_show_dataset_collection(self, history_id): fetch_response = self.dataset_collection_populator.create_list_in_history(history_id, direct_upload=True).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 3, dataset_collection hdca_id = dataset_collection["id"] dataset_collection_url = f"/api/dataset_collections/{hdca_id}" dataset_collection = self._get(dataset_collection_url).json() assert dataset_collection["id"] == hdca_id assert dataset_collection["collection_type"] == "list"
[docs] def test_show_dataset_collection_contents(self, history_id): # Get contents_url from history contents, use it to show the first level # of collection contents in the created HDCA, then use it again to drill # down into the nested collection contents hdca = self.dataset_collection_populator.create_list_of_list_in_history(history_id, wait=True).json() root_contents_url = self._get_contents_url_for_hdca(history_id, hdca) # check root contents for this collection root_contents = self._get(root_contents_url).json() assert len(root_contents) == len(hdca["elements"]) self._compare_collection_contents_elements(root_contents, hdca["elements"]) # drill down, retrieve nested collection contents assert "object" in root_contents[0] assert "contents_url" in root_contents[0]["object"] assert root_contents[0]["object"]["element_count"] == 3 assert root_contents[0]["object"]["populated"] drill_contents_url = root_contents[0]["object"]["contents_url"] drill_contents = self._get(drill_contents_url).json() assert len(drill_contents) == len(hdca["elements"][0]["object"]["elements"]) self._compare_collection_contents_elements(drill_contents, hdca["elements"][0]["object"]["elements"])
[docs] def test_collection_contents_limit_offset(self, history_id): # check limit/offset params for collection contents endpoint hdca, root_contents_url = self._create_collection_contents_pair(history_id) # check limit limited_contents = self._get(f"{root_contents_url}?limit=1").json() assert len(limited_contents) == 1 assert limited_contents[0]["element_index"] == 0 # check offset offset_contents = self._get(f"{root_contents_url}?offset=1").json() assert len(offset_contents) == 1 assert offset_contents[0]["element_index"] == 1
[docs] def test_collection_contents_empty_root(self, history_id): create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=[], wait=True ).json() hdca = create_response["output_collections"][0] assert hdca["elements"] == [] root_contents_url = hdca["contents_url"] response = self._get(root_contents_url) response.raise_for_status() assert response.json() == []
[docs] def test_get_suitable_converters_single_datatype(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "bed"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") expected = [ # This list is subject to change, but it's unlikely we'll be removing converters "CONVERTER_bed_to_fli_0", "CONVERTER_bed_gff_or_vcf_to_bigwig_0", "CONVERTER_bed_to_gff_0", "CONVERTER_interval_to_bgzip_0", "tabular_to_csv", "CONVERTER_interval_to_bed6_0", "CONVERTER_interval_to_bedstrict_0", "CONVERTER_interval_to_tabix_0", "CONVERTER_interval_to_bed12_0", ] actual = [] for converter in converters.json(): actual.append(converter["tool_id"]) missing_expected_converters = set(expected) - set(actual) assert ( not missing_expected_converters ), f"Expected converter(s) {', '.join(missing_expected_converters)} missing from response"
[docs] def test_get_suitable_converters_different_datatypes_matches(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "tabular"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "tabular"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") expected = "tabular_to_csv" actual = [] for converter in converters.json(): actual.append(converter["tool_id"]) assert expected in actual
[docs] def test_get_suitable_converters_different_datatypes_no_matches(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "fasta"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "fasta"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") actual: list[str] = [] for converter in converters.json(): actual.append(converter["tool_id"]) assert actual == []
[docs] def test_collection_tools_tag_propagation(self, history_id): elements = [{"src": "files", "tags": ["name:element_tag"]}] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "Test collection", "tags": ["name:collection_tag"], } ] payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } hdca_id = self.dataset_populator.fetch(payload).json()["output_collections"][0]["id"] inputs = { "input": {"batch": False, "src": "hdca", "id": hdca_id}, } payload = self.dataset_populator.run_tool_payload( tool_id="__FILTER_FAILED_DATASETS__", inputs=inputs, history_id=history_id, input_format="legacy", ) response = self._post("tools", payload).json() self.dataset_populator.wait_for_history(history_id, assert_ok=False) output_collection = response["output_collections"][0] # collection should not inherit tags from input collection elements, only parent collection assert output_collection["tags"] == ["name:collection_tag"] element = output_collection["elements"][0] # new element hda should have tags copied from old hda assert element["object"]["tags"] == ["name:element_tag"]
def _compare_collection_contents_elements(self, contents_elements, hdca_elements): # compare collection api results to existing hdca element contents fields = ["element_identifier", "element_index", "element_type", "id", "model_class"] for content_element, hdca_element in zip(contents_elements, hdca_elements): for f in fields: assert content_element[f] == hdca_element[f] def _create_collection_contents_pair(self, history_id: str): # Create a simple collection, return hdca and contents_url payload = self.dataset_collection_populator.create_pair_payload(history_id, instance_type="history") create_response = self.dataset_populator.fetch(payload=payload, wait=True) hdca = self._check_create_response(create_response) root_contents_url = self._get_contents_url_for_hdca(history_id, hdca) return hdca, root_contents_url def _get_contents_url_for_hdca(self, history_id: str, hdca): # look up the history contents using optional serialization key history_contents_url = f"histories/{history_id}/contents?v=dev&view=summary&keys=contents_url" json = self._get(history_contents_url).json() # filter out the collection we just made id = hdca.id # make sure the contents_url appears def find_hdca(c): return c["history_content_type"] == "dataset_collection" and c["id"] == hdca["id"] matches = list(filter(find_hdca, json)) assert len(matches) == 1 assert "contents_url" in matches[0] return matches[0]["contents_url"]
[docs] def upload_flat_sample_sheet(dataset_populator: DatasetPopulator): column_definitions = [{"type": "int", "name": "replicate", "optional": False, "default_value": 0}] with dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "url", "url": dataset_populator.base64_url_for_string("hello world"), "info": "my cool hello world", "name": "sample1", "row": [42], } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "sample_sheet", "column_definitions": column_definitions, } ] payload = { "history_id": history_id, "targets": targets, } dataset_populator.fetch(payload) hdca = assert_one_collection_created_in_history(dataset_populator, history_id) assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "sample1" assert element0["columns"][0] == 42 object0 = element0["object"] assert object0["state"] == "ok" assert hdca["column_definitions"] is not None
[docs] def assert_one_collection_created_in_history(dataset_populator: DatasetPopulator, history_id: str): contents = dataset_populator.get_history_contents_of_type(history_id, "dataset_collections") assert len(contents) == 1 hdca = contents[0] assert hdca["history_content_type"] == "dataset_collection" hdca_id = hdca["id"] return dataset_populator.get_history_collection_details(history_id, content_id=hdca_id)