Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_datasets

import textwrap

from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    skip_without_tool,
)
from ._framework import ApiTestCase


[docs]class DatasetsApiTestCase(ApiTestCase):
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.history_id = self.dataset_populator.new_history()
[docs] def test_index(self): index_response = self._get("datasets") self._assert_status_code_is(index_response, 200)
[docs] def test_search_datasets(self): hda_id = self.dataset_populator.new_dataset(self.history_id)['id'] payload = {'limit': 1, 'offset': 0} index_response = self._get("datasets", payload).json() assert len(index_response) == 1 assert index_response[0]['id'] == hda_id hdca_id = self.dataset_collection_populator.create_list_in_history(self.history_id, contents=["1\n2\n3"]).json()['id'] payload = {'limit': 3, 'offset': 0} index_response = self._get("datasets", payload).json() assert len(index_response) == 3 assert index_response[0]['id'] == hdca_id assert index_response[0]['history_content_type'] == 'dataset_collection' assert index_response[2]['id'] == hda_id assert index_response[2]['history_content_type'] == 'dataset' payload = {'limit': 2, 'offset': 0, 'q': ['history_content_type'], 'qv': ['dataset']} index_response = self._get("datasets", payload).json() assert index_response[1]['id'] == hda_id
[docs] def test_search_by_tag(self): hda_id = self.dataset_populator.new_dataset(self.history_id)['id'] update_payload = { 'tags': ['cool:new_tag', 'cool:another_tag'], } updated_hda = self._put( "histories/{history_id}/contents/{hda_id}".format(history_id=self.history_id, hda_id=hda_id), update_payload).json() assert 'cool:new_tag' in updated_hda['tags'] assert 'cool:another_tag' in updated_hda['tags'] payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag'], 'qv': ['dataset', 'cool:new_tag']} index_response = self._get("datasets", payload).json() assert len(index_response) == 1 payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag-contains'], 'qv': ['dataset', 'new_tag']} index_response = self._get("datasets", payload).json() assert len(index_response) == 1 payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag-contains'], 'qv': ['dataset', 'notag']} index_response = self._get("datasets", payload).json() assert len(index_response) == 0
[docs] def test_search_by_tool_id(self): self.dataset_populator.new_dataset(self.history_id) payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id'], 'qv': ['dataset', 'upload1']} assert len(self._get("datasets", payload).json()) == 1 payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id'], 'qv': ['dataset', 'uploadX']} assert len(self._get("datasets", payload).json()) == 0 payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id-contains'], 'qv': ['dataset', 'pload1']} assert len(self._get("datasets", payload).json()) == 1 self.dataset_collection_populator.create_list_in_history(self.history_id, name="search by tool id", contents=["1\n2\n3"]).json() self.dataset_populator.wait_for_history(self.history_id) payload = {'limit': 10, 'offset': 0, 'history_id': self.history_id, 'q': ['name', 'tool_id'], 'qv': ['search by tool id', 'upload1']} result = self._get("datasets", payload).json() assert result[0]['name'] == 'search by tool id', result payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id'], 'qv': ['dataset_collection', 'uploadX']} result = self._get("datasets", payload).json() assert len(result) == 0
[docs] def test_search_returns_only_accessible(self): hda_id = self.dataset_populator.new_dataset(self.history_id)['id'] with self._different_user(): payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type'], 'qv': ['dataset']} index_response = self._get("datasets", payload).json() for item in index_response: assert hda_id != item['id']
[docs] def test_show(self): hda1 = self.dataset_populator.new_dataset(self.history_id) show_response = self._get("datasets/%s" % (hda1["id"])) self._assert_status_code_is(show_response, 200) self.__assert_matches_hda(hda1, show_response.json())
def __assert_matches_hda(self, input_hda, query_hda): self._assert_has_keys(query_hda, "id", "name") assert input_hda["name"] == query_hda["name"] assert input_hda["id"] == query_hda["id"]
[docs] def test_display(self): contents = textwrap.dedent("""\ 1 2 3 4 A B C D 10 20 30 40 """) hda1 = self.dataset_populator.new_dataset(self.history_id, content=contents) self.dataset_populator.wait_for_history(self.history_id) display_response = self._get("histories/{}/contents/{}/display".format(self.history_id, hda1["id"]), { 'raw': 'True' }) self._assert_status_code_is(display_response, 200) assert display_response.text == contents
[docs] def test_tag_change(self): hda_id = self.dataset_populator.new_dataset(self.history_id)['id'] payload = { 'item_id': hda_id, 'item_class': 'HistoryDatasetAssociation', 'item_tags': ['cool:tag_a', 'cool:tag_b', 'tag_c', 'name:tag_d', '#tag_e'], } self._put("tags", payload).json() updated_hda = self._get( "histories/{history_id}/contents/{hda_id}".format(history_id=self.history_id, hda_id=hda_id)).json() assert 'cool:tag_a' in updated_hda['tags'] assert 'cool:tag_b' in updated_hda['tags'] assert 'tag_c' in updated_hda['tags'] assert 'name:tag_d' in updated_hda['tags'] assert 'name:tag_e' in updated_hda['tags']
[docs] @skip_without_tool("cat_data_and_sleep") def test_update_datatype(self): hda_id = self.dataset_populator.new_dataset(self.history_id)['id'] original_hda = self._get( "histories/{history_id}/contents/{hda_id}".format(history_id=self.history_id, hda_id=hda_id)).json() assert original_hda['extension'] == 'txt' assert original_hda['data_type'] == 'galaxy.datatypes.data.Text' assert 'scatterplot' not in [viz['name'] for viz in original_hda['visualizations']] inputs = { 'input1': {'src': 'hda', 'id': hda_id}, 'sleep_time': 10, } run_response = self.dataset_populator.run_tool( "cat_data_and_sleep", inputs, self.history_id, assert_ok=False, ) queued_id = run_response.json()["outputs"][0]["id"] update_while_incomplete_response = self._put( # try updating datatype while used as output of a running job "histories/{history_id}/contents/{hda_id}".format(history_id=self.history_id, hda_id=queued_id), {'datatype': 'tabular'}) self._assert_status_code_is(update_while_incomplete_response, 400) self.dataset_populator.wait_for_history_jobs(self.history_id) # now wait for upload to complete successful_updated_hda_response = self._put( "histories/{history_id}/contents/{hda_id}".format(history_id=self.history_id, hda_id=hda_id), {'datatype': 'tabular'}).json() assert successful_updated_hda_response['extension'] == 'tabular' assert successful_updated_hda_response['data_type'] == 'galaxy.datatypes.tabular.Tabular' assert 'scatterplot' in [viz['name'] for viz in successful_updated_hda_response['visualizations']] invalidly_updated_hda_response = self._put( # try updating with invalid datatype "histories/{history_id}/contents/{hda_id}".format(history_id=self.history_id, hda_id=hda_id), {'datatype': 'invalid'}) self._assert_status_code_is(invalidly_updated_hda_response, 400)