Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_dataset_collections

import zipfile
from io import BytesIO
from typing import List

from galaxy.util.unittest_utils import skip_if_github_down
from galaxy_test.base.api_asserts import assert_object_id_error
from galaxy_test.base.decorators import requires_new_user
from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
)
from ._framework import ApiTestCase


[docs]class TestDatasetCollectionsApi(ApiTestCase): dataset_populator: DatasetPopulator
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_create_pair_from_history(self): with self.dataset_populator.test_history(require_new=False) as history_id: payload = self.dataset_collection_populator.create_pair_payload( history_id, instance_type="history", ) create_response = self.dataset_populator.fetch(payload, wait=True) dataset_collection = self._check_create_response(create_response) returned_datasets = dataset_collection["elements"] assert len(returned_datasets) == 2, dataset_collection
[docs] def test_create_list_from_history(self): with self.dataset_populator.test_history(require_new=False) as history_id: element_identifiers = self.dataset_collection_populator.list_identifiers(history_id) payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_datasets = dataset_collection["elements"] assert len(returned_datasets) == 3, dataset_collection
[docs] def test_create_list_of_existing_pairs(self): with self.dataset_populator.test_history(require_new=False) as history_id: pair_payload = self.dataset_collection_populator.create_pair_payload( history_id, instance_type="history", ) pair_create_response = self._post("tools/fetch", pair_payload, json=True) dataset_collection = self._check_create_response(pair_create_response) hdca_id = dataset_collection["id"] element_identifiers = [dict(name="test1", src="hdca", id=hdca_id)] payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection
[docs] def test_create_list_of_new_pairs(self): with self.dataset_populator.test_history(require_new=False) as history_id: identifiers = self.dataset_collection_populator.nested_collection_identifiers(history_id, "list:paired") payload = dict( collection_type="list:paired", instance_type="history", history_id=history_id, name="a nested collection", element_identifiers=identifiers, ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) assert dataset_collection["collection_type"] == "list:paired" assert dataset_collection["name"] == "a nested collection" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection pair_1_element = returned_collections[0] self._assert_has_keys(pair_1_element, "element_identifier", "element_index", "object") assert pair_1_element["element_identifier"] == "test_level_1", pair_1_element assert pair_1_element["element_index"] == 0, pair_1_element pair_1_object = pair_1_element["object"] self._assert_has_keys(pair_1_object, "collection_type", "elements", "element_count") assert pair_1_object["collection_type"] == "paired" assert pair_1_object["populated"] is True pair_elements = pair_1_object["elements"] assert len(pair_elements) == 2 pair_1_element_1 = pair_elements[0] assert pair_1_element_1["element_index"] == 0
[docs] def test_list_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: fetch_response = self.dataset_collection_populator.create_list_in_history( history_id, direct_upload=True ).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 3, dataset_collection create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]" collection_name = dataset_collection["name"] for element, zip_path in zip(returned_dce, namelist): assert f"{collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_pair_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: fetch_response = self.dataset_collection_populator.create_pair_in_history( history_id, direct_upload=True ).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 2, dataset_collection hdca_id = dataset_collection["id"] create_response = self._download_dataset_collection(history_id=history_id, hdca_id=hdca_id) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 2, f"Expected 2 elements in [{namelist}]" collection_name = dataset_collection["name"] for element, zip_path in zip(returned_dce, namelist): assert f"{collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_list_pair_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: fetch_response = self.dataset_collection_populator.create_list_of_pairs_in_history(history_id).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection list_collection_name = dataset_collection["name"] pair = returned_dce[0] create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 2, f"Expected 2 elements in [{namelist}]" pair_collection_name = pair["element_identifier"] for element, zip_path in zip(pair["object"]["elements"], namelist): assert ( f"{list_collection_name}/{pair_collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path )
[docs] def test_list_list_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history( history_id, wait=True ).json() returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]"
[docs] def test_list_list_list_download(self): with self.dataset_populator.test_history(require_new=False) as history_id: dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history( history_id, collection_type="list:list:list", wait=True, ).json() returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection create_response = self._download_dataset_collection(history_id=history_id, hdca_id=dataset_collection["id"]) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]"
[docs] @requires_new_user def test_hda_security(self): with self.dataset_populator.test_history(require_new=False) as history_id: element_identifiers = self.dataset_collection_populator.pair_identifiers(history_id, wait=True) self.dataset_populator.make_private(history_id, element_identifiers[0]["id"]) with self._different_user(): history_id = self.dataset_populator.new_history() payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="paired", ) create_response = self._post("dataset_collections", payload, json=True) self._assert_status_code_is(create_response, 403)
[docs] def test_dataset_collection_element_security(self): with self.dataset_populator.test_history(require_new=False) as history_id: dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history( history_id, collection_type="list:list:list", wait=True, ).json() first_element = dataset_collection["elements"][0] assert first_element["model_class"] == "DatasetCollectionElement" assert first_element["element_type"] == "dataset_collection" first_element_url = f"/api/dataset_collection_element/{first_element['id']}" # Make one dataset private to check that access permissions are respected first_dataset_element = first_element["object"]["elements"][0]["object"]["elements"][0] self.dataset_populator.make_private(history_id, first_dataset_element["object"]["id"]) with self._different_user(): assert self._get(first_element_url).status_code == 403 collection_dce_response = self._get(first_element_url) collection_dce_response.raise_for_status() collection_dce = collection_dce_response.json() assert collection_dce["model_class"] == "DatasetCollectionElement" assert collection_dce["element_type"] == "dataset_collection" first_dataset_element = first_element["object"]["elements"][0]["object"]["elements"][0] assert first_dataset_element["model_class"] == "DatasetCollectionElement" assert first_dataset_element["element_type"] == "hda" first_dataset_element_url = f"/api/dataset_collection_element/{first_dataset_element['id']}" with self._different_user(): assert self._get(first_dataset_element_url).status_code == 403 dataset_dce_response = self._get(first_dataset_element_url) dataset_dce_response.raise_for_status() dataset_dce = dataset_dce_response.json() assert dataset_dce["model_class"] == "DatasetCollectionElement" assert dataset_dce["element_type"] == "hda" assert dataset_dce["object"]["model_class"] == "HistoryDatasetAssociation"
[docs] def test_enforces_unique_names(self): with self.dataset_populator.test_history(require_new=False) as history_id: element_identifiers = self.dataset_collection_populator.list_identifiers(history_id) element_identifiers[2]["name"] = element_identifiers[0]["name"] payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) self._assert_status_code_is(create_response, 400)
[docs] def test_upload_collection(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "files", "dbkey": "hg19", "info": "my cool bed", "tags": ["name:data1", "group:condition:treated", "machine:illumina"], } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "Test upload", "tags": ["name:collection1"], } ] payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history(history_id) assert hdca["name"] == "Test upload" hdca_tags = hdca["tags"] assert len(hdca_tags) == 1 assert "name:collection1" in hdca_tags assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" dataset0 = element0["object"] assert dataset0["file_size"] == 61 dataset_tags = dataset0["tags"] assert len(dataset_tags) == 3, dataset0
[docs] def test_upload_nested(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [{"name": "samp1", "elements": [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}]}] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list:list", "name": "Test upload", } ] payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history(history_id) assert hdca["name"] == "Test upload" assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "samp1"
[docs] @skip_if_github_down def test_upload_collection_from_url(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed", } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history(history_id) assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" assert element0["object"]["file_size"] == 61
[docs] def test_upload_collection_deferred(self): with self.dataset_populator.test_history(require_new=False) as history_id: elements = [ { "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed", "deferred": True, } ] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history(history_id) assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" object0 = element0["object"] assert object0["state"] == "deferred"
[docs] @skip_if_github_down def test_upload_collection_failed_expansion_url(self): with self.dataset_populator.test_history(require_new=False) as history_id: targets = [ { "destination": {"type": "hdca"}, "elements_from": "bagit", "collection_type": "list", "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", } ] payload = { "history_id": history_id, "targets": targets, } self.dataset_populator.fetch(payload, assert_ok=False, wait=True) hdca = self._assert_one_collection_created_in_history(history_id) assert hdca["populated"] is False assert "bagit.txt" in hdca["populated_state_message"], hdca
def _assert_one_collection_created_in_history(self, history_id: str): contents_response = self._get(f"histories/{history_id}/contents/dataset_collections") self._assert_status_code_is(contents_response, 200) contents = contents_response.json() assert len(contents) == 1 hdca = contents[0] assert hdca["history_content_type"] == "dataset_collection" hdca_id = hdca["id"] collection_response = self._get(f"histories/{history_id}/contents/dataset_collections/{hdca_id}") self._assert_status_code_is(collection_response, 200) return collection_response.json() def _check_create_response(self, create_response): self._assert_status_code_is(create_response, 200) dataset_collection = create_response.json() if "output_collections" in dataset_collection: # fetch data response, we'll have to check the final response dataset_collection = dataset_collection["output_collections"][0] dataset_collection = self._get(f"dataset_collections/{dataset_collection['id']}").json() self._assert_has_keys(dataset_collection, "elements", "url", "name", "collection_type", "element_count") return dataset_collection def _download_dataset_collection(self, history_id: str, hdca_id: str): return self._get(f"histories/{history_id}/contents/dataset_collections/{hdca_id}/download")
[docs] @requires_new_user def test_collection_contents_security(self, history_id): # request contents on an hdca that doesn't belong to user hdca, contents_url = self._create_collection_contents_pair(history_id) with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 403)
[docs] @requires_new_user def test_published_collection_contents_accessible(self, history_id): # request contents on an hdca that is in a published history hdca, contents_url = self._create_collection_contents_pair(history_id) with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 403) self.dataset_populator.make_public(history_id) with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 200)
[docs] def test_collection_contents_invalid_collection(self, history_id): # request an invalid collection from a valid hdca, should get 404 hdca, contents_url = self._create_collection_contents_pair(history_id) response = self._get(contents_url) self._assert_status_code_is(response, 200) fake_collection_id = "5d7db0757a2eb7ef" fake_contents_url = f"/api/dataset_collections/{hdca['id']}/contents/{fake_collection_id}" error_response = self._get(fake_contents_url) assert_object_id_error(error_response)
[docs] def test_show_dataset_collection(self, history_id): fetch_response = self.dataset_collection_populator.create_list_in_history(history_id, direct_upload=True).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 3, dataset_collection hdca_id = dataset_collection["id"] dataset_collection_url = f"/api/dataset_collections/{hdca_id}" dataset_collection = self._get(dataset_collection_url).json() assert dataset_collection["id"] == hdca_id assert dataset_collection["collection_type"] == "list"
[docs] def test_show_dataset_collection_contents(self, history_id): # Get contents_url from history contents, use it to show the first level # of collection contents in the created HDCA, then use it again to drill # down into the nested collection contents hdca = self.dataset_collection_populator.create_list_of_list_in_history(history_id).json() root_contents_url = self._get_contents_url_for_hdca(history_id, hdca) # check root contents for this collection root_contents = self._get(root_contents_url).json() assert len(root_contents) == len(hdca["elements"]) self._compare_collection_contents_elements(root_contents, hdca["elements"]) # drill down, retrieve nested collection contents assert "object" in root_contents[0] assert "contents_url" in root_contents[0]["object"] drill_contents_url = root_contents[0]["object"]["contents_url"] drill_contents = self._get(drill_contents_url).json() assert len(drill_contents) == len(hdca["elements"][0]["object"]["elements"]) self._compare_collection_contents_elements(drill_contents, hdca["elements"][0]["object"]["elements"])
[docs] def test_collection_contents_limit_offset(self, history_id): # check limit/offset params for collection contents endpoint hdca, root_contents_url = self._create_collection_contents_pair(history_id) # check limit limited_contents = self._get(f"{root_contents_url}?limit=1").json() assert len(limited_contents) == 1 assert limited_contents[0]["element_index"] == 0 # check offset offset_contents = self._get(f"{root_contents_url}?offset=1").json() assert len(offset_contents) == 1 assert offset_contents[0]["element_index"] == 1
[docs] def test_collection_contents_empty_root(self, history_id): create_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=[], wait=True ).json() hdca = create_response["output_collections"][0] assert hdca["elements"] == [] root_contents_url = hdca["contents_url"] response = self._get(root_contents_url) response.raise_for_status() assert response.json() == []
[docs] def test_get_suitable_converters_single_datatype(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "bed"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") expected = [ # This list is subject to change, but it's unlikely we'll be removing converters "CONVERTER_bed_to_fli_0", "CONVERTER_bed_gff_or_vcf_to_bigwig_0", "CONVERTER_bed_to_gff_0", "CONVERTER_interval_to_bgzip_0", "tabular_to_csv", "CONVERTER_interval_to_bed6_0", "CONVERTER_interval_to_bedstrict_0", "CONVERTER_interval_to_tabix_0", "CONVERTER_interval_to_bed12_0", ] actual = [] for converter in converters.json(): actual.append(converter["tool_id"]) missing_expected_converters = set(expected) - set(actual) assert ( not missing_expected_converters ), f"Expected converter(s) {', '.join(missing_expected_converters)} missing from response"
[docs] def test_get_suitable_converters_different_datatypes_matches(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "tabular"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "tabular"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") expected = "tabular_to_csv" actual = [] for converter in converters.json(): actual.append(converter["tool_id"]) assert expected in actual
[docs] def test_get_suitable_converters_different_datatypes_no_matches(self, history_id): response = self.dataset_collection_populator.upload_collection( history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ], }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "fasta"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "fasta"}, ], }, ], wait=True, ) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") actual: List[str] = [] for converter in converters.json(): actual.append(converter["tool_id"]) assert actual == []
[docs] def test_collection_tools_tag_propagation(self, history_id): elements = [{"src": "files", "tags": ["name:element_tag"]}] targets = [ { "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "Test collection", "tags": ["name:collection_tag"], } ] payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } hdca_id = self.dataset_populator.fetch(payload).json()["output_collections"][0]["id"] inputs = { "input": {"batch": False, "src": "hdca", "id": hdca_id}, } payload = self.dataset_populator.run_tool_payload( tool_id="__FILTER_FAILED_DATASETS__", inputs=inputs, history_id=history_id, input_format="legacy", ) response = self._post("tools", payload).json() self.dataset_populator.wait_for_history(history_id, assert_ok=False) output_collection = response["output_collections"][0] # collection should not inherit tags from input collection elements, only parent collection assert output_collection["tags"] == ["name:collection_tag"] element = output_collection["elements"][0] # new element hda should have tags copied from old hda assert element["object"]["tags"] == ["name:element_tag"]
def _compare_collection_contents_elements(self, contents_elements, hdca_elements): # compare collection api results to existing hdca element contents fields = ["element_identifier", "element_index", "element_type", "id", "model_class"] for content_element, hdca_element in zip(contents_elements, hdca_elements): for f in fields: assert content_element[f] == hdca_element[f] def _create_collection_contents_pair(self, history_id: str): # Create a simple collection, return hdca and contents_url payload = self.dataset_collection_populator.create_pair_payload(history_id, instance_type="history") create_response = self.dataset_populator.fetch(payload=payload, wait=True) hdca = self._check_create_response(create_response) root_contents_url = self._get_contents_url_for_hdca(history_id, hdca) return hdca, root_contents_url def _get_contents_url_for_hdca(self, history_id: str, hdca): # look up the history contents using optional serialization key history_contents_url = f"histories/{history_id}/contents?v=dev&view=summary&keys=contents_url" json = self._get(history_contents_url).json() # filter out the collection we just made id = hdca.id # make sure the contents_url appears def find_hdca(c): return c["history_content_type"] == "dataset_collection" and c["id"] == hdca["id"] matches = list(filter(find_hdca, json)) assert len(matches) == 1 assert "contents_url" in matches[0] return matches[0]["contents_url"]