Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_dataset_collections

import json
import zipfile
from io import BytesIO
from typing import List

from galaxy_test.base.api_asserts import assert_object_id_error
from galaxy_test.base.populators import DatasetCollectionPopulator, DatasetPopulator, skip_if_github_down
from ._framework import ApiTestCase


[docs]class DatasetCollectionApiTestCase(ApiTestCase): history_id: str
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.history_id = self.dataset_populator.new_history()
[docs] def test_create_pair_from_history(self): payload = self.dataset_collection_populator.create_pair_payload( self.history_id, instance_type="history", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_datasets = dataset_collection["elements"] assert len(returned_datasets) == 2, dataset_collection
[docs] def test_create_list_from_history(self): element_identifiers = self.dataset_collection_populator.list_identifiers(self.history_id) payload = dict( instance_type="history", history_id=self.history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_datasets = dataset_collection["elements"] assert len(returned_datasets) == 3, dataset_collection
[docs] def test_create_list_of_existing_pairs(self): pair_payload = self.dataset_collection_populator.create_pair_payload( self.history_id, instance_type="history", ) pair_create_response = self._post("dataset_collections", pair_payload, json=True) dataset_collection = self._check_create_response(pair_create_response) hdca_id = dataset_collection["id"] element_identifiers = [ dict(name="test1", src="hdca", id=hdca_id) ] payload = dict( instance_type="history", history_id=self.history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection
[docs] def test_create_list_of_new_pairs(self): identifiers = self.dataset_collection_populator.nested_collection_identifiers(self.history_id, "list:paired") payload = dict( collection_type="list:paired", instance_type="history", history_id=self.history_id, name="a nested collection", element_identifiers=identifiers, ) create_response = self._post("dataset_collections", payload, json=True) dataset_collection = self._check_create_response(create_response) assert dataset_collection["collection_type"] == "list:paired" assert dataset_collection["name"] == "a nested collection" returned_collections = dataset_collection["elements"] assert len(returned_collections) == 1, dataset_collection pair_1_element = returned_collections[0] self._assert_has_keys(pair_1_element, "element_identifier", "element_index", "object") assert pair_1_element["element_identifier"] == "test_level_1", pair_1_element assert pair_1_element["element_index"] == 0, pair_1_element pair_1_object = pair_1_element["object"] self._assert_has_keys(pair_1_object, "collection_type", "elements", "element_count") self.assertEqual(pair_1_object["collection_type"], "paired") self.assertEqual(pair_1_object["populated"], True) pair_elements = pair_1_object["elements"] assert len(pair_elements) == 2 pair_1_element_1 = pair_elements[0] assert pair_1_element_1["element_index"] == 0
[docs] def test_list_download(self): fetch_response = self.dataset_collection_populator.create_list_in_history(self.history_id, direct_upload=True).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 3, dataset_collection create_response = self._download_dataset_collection(history_id=self.history_id, hdca_id=dataset_collection['id']) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]" collection_name = dataset_collection['name'] for element, zip_path in zip(returned_dce, namelist): assert f"{collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_pair_download(self): fetch_response = self.dataset_collection_populator.create_pair_in_history(self.history_id, direct_upload=True).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 2, dataset_collection hdca_id = dataset_collection['id'] create_response = self._download_dataset_collection(history_id=self.history_id, hdca_id=hdca_id) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 2, f"Expected 2 elements in [{namelist}]" collection_name = dataset_collection['name'] for element, zip_path in zip(returned_dce, namelist): assert f"{collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_list_pair_download(self): fetch_response = self.dataset_collection_populator.create_list_of_pairs_in_history(self.history_id).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection list_collection_name = dataset_collection['name'] pair = returned_dce[0] create_response = self._download_dataset_collection(history_id=self.history_id, hdca_id=dataset_collection['id']) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 2, f"Expected 2 elements in [{namelist}]" pair_collection_name = pair['element_identifier'] for element, zip_path in zip(pair['object']['elements'], namelist): assert f"{list_collection_name}/{pair_collection_name}/{element['element_identifier']}.{element['object']['file_ext']}" == zip_path
[docs] def test_list_list_download(self): dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history(self.history_id).json() self.dataset_collection_populator.wait_for_dataset_collection(dataset_collection, assert_ok=True) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection create_response = self._download_dataset_collection(history_id=self.history_id, hdca_id=dataset_collection['id']) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]"
[docs] def test_list_list_list_download(self): dataset_collection = self.dataset_collection_populator.create_list_of_list_in_history(self.history_id, collection_type='list:list:list').json() self.dataset_collection_populator.wait_for_dataset_collection(dataset_collection, assert_ok=True) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 1, dataset_collection create_response = self._download_dataset_collection(history_id=self.history_id, hdca_id=dataset_collection['id']) self._assert_status_code_is(create_response, 200) archive = zipfile.ZipFile(BytesIO(create_response.content)) namelist = archive.namelist() assert len(namelist) == 3, f"Expected 3 elements in [{namelist}]"
[docs] def test_hda_security(self): element_identifiers = self.dataset_collection_populator.pair_identifiers(self.history_id) self.dataset_populator.make_private(self.history_id, element_identifiers[0]["id"]) with self._different_user(): history_id = self.dataset_populator.new_history() payload = dict( instance_type="history", history_id=history_id, element_identifiers=element_identifiers, collection_type="paired", ) create_response = self._post("dataset_collections", payload, json=True) self._assert_status_code_is(create_response, 403)
[docs] def test_enforces_unique_names(self): element_identifiers = self.dataset_collection_populator.list_identifiers(self.history_id) element_identifiers[2]["name"] = element_identifiers[0]["name"] payload = dict( instance_type="history", history_id=self.history_id, element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post("dataset_collections", payload, json=True) self._assert_status_code_is(create_response, 400)
[docs] def test_upload_collection(self): elements = [{"src": "files", "dbkey": "hg19", "info": "my cool bed", "tags": ["name:data1", "group:condition:treated", "machine:illumina"]}] targets = [{ "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "Test upload", "tags": ["name:collection1"] }] payload = { "history_id": self.history_id, "targets": json.dumps(targets), "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history() self.assertEqual(hdca["name"], "Test upload") hdca_tags = hdca["tags"] assert len(hdca_tags) == 1 assert "name:collection1" in hdca_tags assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" dataset0 = element0["object"] assert dataset0["file_size"] == 61 dataset_tags = dataset0["tags"] assert len(dataset_tags) == 3, dataset0
[docs] def test_upload_nested(self): elements = [{"name": "samp1", "elements": [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}]}] targets = [{ "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list:list", "name": "Test upload", }] payload = { "history_id": self.history_id, "targets": json.dumps(targets), "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history() self.assertEqual(hdca["name"], "Test upload") assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "samp1"
[docs] @skip_if_github_down def test_upload_collection_from_url(self): elements = [{"src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", "info": "my cool bed"}] targets = [{ "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", }] payload = { "history_id": self.history_id, "targets": json.dumps(targets), "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload) hdca = self._assert_one_collection_created_in_history() assert len(hdca["elements"]) == 1, hdca element0 = hdca["elements"][0] assert element0["element_identifier"] == "4.bed" assert element0["object"]["file_size"] == 61
[docs] @skip_if_github_down def test_upload_collection_failed_expansion_url(self): targets = [{ "destination": {"type": "hdca"}, "elements_from": "bagit", "collection_type": "list", "src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed", }] payload = { "history_id": self.history_id, "targets": json.dumps(targets), "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } self.dataset_populator.fetch(payload, assert_ok=False, wait=True) hdca = self._assert_one_collection_created_in_history() assert hdca["populated"] is False assert "bagit.txt" in hdca["populated_state_message"], hdca
def _assert_one_collection_created_in_history(self): contents_response = self._get(f"histories/{self.history_id}/contents/dataset_collections") self._assert_status_code_is(contents_response, 200) contents = contents_response.json() assert len(contents) == 1 hdca = contents[0] assert hdca["history_content_type"] == "dataset_collection" hdca_id = hdca["id"] collection_response = self._get(f"histories/{self.history_id}/contents/dataset_collections/{hdca_id}") self._assert_status_code_is(collection_response, 200) return collection_response.json() def _check_create_response(self, create_response): self._assert_status_code_is(create_response, 200) dataset_collection = create_response.json() self._assert_has_keys(dataset_collection, "elements", "url", "name", "collection_type", "element_count") return dataset_collection def _download_dataset_collection(self, history_id, hdca_id): return self._get(f"histories/{history_id}/contents/dataset_collections/{hdca_id}/download")
[docs] def test_collection_contents_security(self): # request contents on an hdca that doesn't belong to user hdca, contents_url = self._create_collection_contents_pair() with self._different_user(): contents_response = self._get(contents_url) self._assert_status_code_is(contents_response, 403)
[docs] def test_collection_contents_invalid_collection(self): # request an invalid collection from a valid hdca, should get 404 hdca, contents_url = self._create_collection_contents_pair() response = self._get(contents_url) self._assert_status_code_is(response, 200) fake_collection_id = '5d7db0757a2eb7ef' fake_contents_url = f"/api/dataset_collections/{hdca['id']}/contents/{fake_collection_id}" error_response = self._get(fake_contents_url) assert_object_id_error(error_response)
[docs] def test_show_dataset_collection(self): fetch_response = self.dataset_collection_populator.create_list_in_history(self.history_id, direct_upload=True).json() dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) returned_dce = dataset_collection["elements"] assert len(returned_dce) == 3, dataset_collection hdca_id = dataset_collection['id'] dataset_collection_url = f"/api/dataset_collections/{hdca_id}" dataset_collection = self._get(dataset_collection_url).json() assert dataset_collection['id'] == hdca_id assert dataset_collection['collection_type'] == 'list'
[docs] def test_show_dataset_collection_contents(self): # Get contents_url from history contents, use it to show the first level # of collection contents in the created HDCA, then use it again to drill # down into the nested collection contents hdca = self.dataset_collection_populator.create_list_of_list_in_history(self.history_id).json() root_contents_url = self._get_contents_url_for_hdca(hdca) # check root contents for this collection root_contents = self._get(root_contents_url).json() assert len(root_contents) == len(hdca['elements']) self._compare_collection_contents_elements(root_contents, hdca['elements']) # drill down, retrieve nested collection contents assert 'object' in root_contents[0] assert 'contents_url' in root_contents[0]['object'] drill_contents_url = root_contents[0]['object']['contents_url'] drill_contents = self._get(drill_contents_url).json() assert len(drill_contents) == len(hdca['elements'][0]['object']['elements']) self._compare_collection_contents_elements(drill_contents, hdca['elements'][0]['object']['elements'])
[docs] def test_collection_contents_limit_offset(self): # check limit/offset params for collection contents endpoint hdca, root_contents_url = self._create_collection_contents_pair() # check limit limited_contents = self._get(f"{root_contents_url}?limit=1").json() assert len(limited_contents) == 1 assert limited_contents[0]['element_index'] == 0 # check offset offset_contents = self._get(f"{root_contents_url}?offset=1").json() assert len(offset_contents) == 1 assert offset_contents[0]['element_index'] == 1
[docs] def test_collection_contents_empty_root(self): hdca = self.dataset_collection_populator.create_list_in_history(self.history_id, contents=[]).json() assert hdca["elements"] == [] root_contents_url = hdca["contents_url"] response = self._get(root_contents_url) response.raise_for_status() assert response.json() == []
[docs] def test_get_suitable_converters_single_datatype(self): response = self.dataset_collection_populator.upload_collection(self.history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ] }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "bed"}, ] } ]) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") expected = [ # This list is subject to change, but it's unlikely we'll be removing converters 'CONVERTER_bed_to_fli_0', 'CONVERTER_bed_gff_or_vcf_to_bigwig_0', 'CONVERTER_bed_to_gff_0', 'CONVERTER_interval_to_bgzip_0', 'tabular_to_csv', 'CONVERTER_interval_to_bed6_0', 'CONVERTER_interval_to_bedstrict_0', 'CONVERTER_interval_to_tabix_0', 'CONVERTER_interval_to_bed12_0'] actual = [] for converter in converters.json(): actual.append(converter["tool_id"]) missing_expected_converters = set(expected) - set(actual) assert not missing_expected_converters, f"Expected converter(s) {', '.join(missing_expected_converters)} missing from response"
[docs] def test_get_suitable_converters_different_datatypes_matches(self): response = self.dataset_collection_populator.upload_collection(self.history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ] }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "tabular"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "tabular"}, ] } ]) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") expected = 'tabular_to_csv' actual = [] for converter in converters.json(): actual.append(converter["tool_id"]) assert expected in actual
[docs] def test_get_suitable_converters_different_datatypes_no_matches(self): response = self.dataset_collection_populator.upload_collection(self.history_id, "list:paired", elements=[ { "name": "test0", "elements": [ {"src": "pasted", "paste_content": "123\n", "name": "forward", "ext": "bed"}, {"src": "pasted", "paste_content": "456\n", "name": "reverse", "ext": "bed"}, ] }, { "name": "test1", "elements": [ {"src": "pasted", "paste_content": "789\n", "name": "forward", "ext": "fasta"}, {"src": "pasted", "paste_content": "0ab\n", "name": "reverse", "ext": "fasta"}, ] } ]) self._assert_status_code_is(response, 200) hdca_list_id = response.json()["outputs"][0]["id"] converters = self._get("dataset_collections/" + hdca_list_id + "/suitable_converters") actual: List[str] = [] for converter in converters.json(): actual.append(converter["tool_id"]) assert actual == []
[docs] def test_collection_tools_tag_propagation(self): elements = [{"src": "files", "tags": ["name:element_tag"]}] targets = [{ "destination": {"type": "hdca"}, "elements": elements, "collection_type": "list", "name": "Test collection", "tags": ["name:collection_tag"] }] payload = { "history_id": self.history_id, "targets": json.dumps(targets), "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, } hdca_id = self.dataset_populator.fetch(payload).json()['output_collections'][0]['id'] inputs = { "input": {"batch": False, "src": "hdca", "id": hdca_id}, } payload = self.dataset_populator.run_tool_payload( tool_id='__FILTER_FAILED_DATASETS__', inputs=inputs, history_id=self.history_id, input_format='legacy', ) response = self._post("tools", payload).json() self.dataset_populator.wait_for_history(self.history_id, assert_ok=False) output_collection = response["output_collections"][0] # collection should not inherit tags from input collection elements, only parent collection assert output_collection['tags'] == ["name:collection_tag"] element = output_collection['elements'][0] # new element hda should have tags copied from old hda assert element['object']['tags'] == ['name:element_tag']
def _compare_collection_contents_elements(self, contents_elements, hdca_elements): # compare collection api results to existing hdca element contents fields = ['element_identifier', 'element_index', 'element_type', 'id', 'model_class'] for (content_element, hdca_element) in zip(contents_elements, hdca_elements): for f in fields: assert content_element[f] == hdca_element[f] def _create_collection_contents_pair(self): # Create a simple collection, return hdca and contents_url payload = self.dataset_collection_populator.create_pair_payload(self.history_id, instance_type="history") create_response = self._post("dataset_collections", payload, json=True) hdca = self._check_create_response(create_response) root_contents_url = self._get_contents_url_for_hdca(hdca) return hdca, root_contents_url def _get_contents_url_for_hdca(self, hdca): # look up the history contents using optional serialization key history_contents_url = f"histories/{self.history_id}/contents?v=dev&view=summary&keys=contents_url" json = self._get(history_contents_url).json() # filter out the collection we just made id = hdca.id # make sure the contents_url appears def find_hdca(c): return c['history_content_type'] == 'dataset_collection' and c['id'] == hdca['id'] matches = list(filter(find_hdca, json)) assert len(matches) == 1 assert 'contents_url' in matches[0] return matches[0]['contents_url']