Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_history_contents

import json

from requests import delete, put

from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    LibraryPopulator,
    skip_without_tool,
    TestsDatasets,
)
from ._framework import ApiTestCase


# TODO: Test anonymous access.
[docs]class HistoryContentsApiTestCase(ApiTestCase, TestsDatasets):
[docs] def setUp(self): super().setUp() self.history_id = self._new_history() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.library_populator = LibraryPopulator(self.galaxy_interactor)
[docs] def test_index_hda_summary(self): hda1 = self._new_dataset(self.history_id) contents_response = self._get("histories/%s/contents" % self.history_id) hda_summary = self.__check_for_hda(contents_response, hda1) assert "display_types" not in hda_summary # Quick summary, not full details
[docs] def test_make_private_and_public(self): hda1 = self._wait_for_new_hda() update_url = "histories/{}/contents/{}/permissions".format(self.history_id, hda1["id"]) role_id = self.dataset_populator.user_private_role_id() # Give manage permission to the user. payload = { "access": [], "manage": [role_id], } update_response = self._update_permissions(update_url, payload, admin=True) self._assert_status_code_is(update_response, 200) self._assert_other_user_can_access(hda1["id"]) # Then we restrict access. payload = { "action": "make_private", } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) self._assert_other_user_cannot_access(hda1["id"]) # Then we restrict access. payload = { "action": "remove_restrictions", } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) self._assert_other_user_can_access(hda1["id"])
[docs] def test_set_permissions_add_admin_history_contents(self): self._verify_dataset_permissions("history_contents")
[docs] def test_set_permissions_add_admin_datasets(self): self._verify_dataset_permissions("dataset")
def _verify_dataset_permissions(self, api_endpoint): hda1 = self._wait_for_new_hda() hda_id = hda1["id"] if api_endpoint == "history_contents": update_url = "histories/{}/contents/{}/permissions".format(self.history_id, hda_id) else: update_url = "datasets/%s/permissions" % hda_id role_id = self.dataset_populator.user_private_role_id() payload = { "access": [role_id], "manage": [role_id], } # Other users cannot modify permissions. with self._different_user(): update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 403) # First the details render for another user. self._assert_other_user_can_access(hda_id) # Then we restrict access. update_response = self._update_permissions(update_url, payload, admin=True) self._assert_status_code_is(update_response, 200) # Finally the details don't render. self._assert_other_user_cannot_access(hda_id) # But they do for the original user. contents_response = self._get("histories/{}/contents/{}".format(self.history_id, hda_id)).json() assert "name" in contents_response update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) payload = { "access": [role_id], "manage": [role_id], } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) self._assert_other_user_cannot_access(hda_id) user_id = self.dataset_populator.user_id() with self._different_user(): different_user_id = self.dataset_populator.user_id() combined_user_role = self.dataset_populator.create_role([user_id, different_user_id], description="role for testing permissions") payload = { "access": [combined_user_role["id"]], "manage": [role_id], } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) # Now other user can see dataset again with access permission. self._assert_other_user_can_access(hda_id) # access doesn't imply management though... with self._different_user(): update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 403) def _assert_other_user_cannot_access(self, history_content_id): with self._different_user(): contents_response = self._get("histories/{}/contents/{}".format(self.history_id, history_content_id)).json() assert "name" not in contents_response def _assert_other_user_can_access(self, history_content_id): with self._different_user(): contents_response = self._get("histories/{}/contents/{}".format(self.history_id, history_content_id)).json() assert "name" in contents_response
[docs] def test_index_hda_all_details(self): hda1 = self._new_dataset(self.history_id) contents_response = self._get("histories/%s/contents?details=all" % self.history_id) hda_details = self.__check_for_hda(contents_response, hda1) self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_hda_detail_by_id(self): hda1 = self._new_dataset(self.history_id) contents_response = self._get("histories/{}/contents?details={}".format(self.history_id, hda1["id"])) hda_details = self.__check_for_hda(contents_response, hda1) self.__assert_hda_has_full_details(hda_details)
[docs] def test_show_hda(self): hda1 = self._new_dataset(self.history_id) show_response = self.__show(hda1) self._assert_status_code_is(show_response, 200) self.__assert_matches_hda(hda1, show_response.json())
[docs] def test_hda_copy(self): hda1 = self._new_dataset(self.history_id) create_data = dict( source='hda', content=hda1["id"], ) second_history_id = self._new_history() assert self.__count_contents(second_history_id) == 0 create_response = self._post("histories/%s/contents" % second_history_id, create_data) self._assert_status_code_is(create_response, 200) assert self.__count_contents(second_history_id) == 1
[docs] def test_library_copy(self): ld = self.library_populator.new_library_dataset("lda_test_library") create_data = dict( source='library', content=ld["id"], ) assert self.__count_contents(self.history_id) == 0 create_response = self._post("histories/%s/contents" % self.history_id, create_data) self._assert_status_code_is(create_response, 200) assert self.__count_contents(self.history_id) == 1
[docs] def test_update(self): hda1 = self._wait_for_new_hda() assert str(hda1["deleted"]).lower() == "false" update_response = self._raw_update(hda1["id"], dict(deleted=True)) self._assert_status_code_is(update_response, 200) show_response = self.__show(hda1) assert str(show_response.json()["deleted"]).lower() == "true" update_response = self._raw_update(hda1["id"], dict(name="Updated Name")) assert self.__show(hda1).json()["name"] == "Updated Name" update_response = self._raw_update(hda1["id"], dict(name="Updated Name")) assert self.__show(hda1).json()["name"] == "Updated Name" unicode_name = 'ржевский сапоги' update_response = self._raw_update(hda1["id"], dict(name=unicode_name)) updated_hda = self.__show(hda1).json() assert updated_hda["name"] == unicode_name, updated_hda quoted_name = '"Mooo"' update_response = self._raw_update(hda1["id"], dict(name=quoted_name)) updated_hda = self.__show(hda1).json() assert updated_hda["name"] == quoted_name, quoted_name data = { "dataset_id": hda1["id"], "name": "moocow", "dbkey": "?", "annotation": None, "info": "my info is", "operation": "attributes" } update_response = self._set_edit_update(data) # No key or anything supplied, expect a permission problem. # A bit questionable but I think this is a 400 instead of a 403 so that # we don't distinguish between this is a valid ID you don't have access to # and this is an invalid ID. assert update_response.status_code == 400, update_response.content
[docs] def test_update_batch(self): hda1 = self._wait_for_new_hda() assert str(hda1["deleted"]).lower() == "false" payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=True) update_response = self._raw_update_batch(payload) objects = update_response.json() assert objects[0]["deleted"]
[docs] def test_update_type_failures(self): hda1 = self._wait_for_new_hda() update_response = self._raw_update(hda1["id"], dict(deleted='not valid')) self._assert_status_code_is(update_response, 400)
def _wait_for_new_hda(self): hda1 = self._new_dataset(self.history_id) self._wait_for_history(self.history_id) return hda1 def _set_edit_update(self, json): set_edit_url = "%s/dataset/set_edit" % self.url update_response = put(set_edit_url, json=json) return update_response def _raw_update(self, item_id, data, admin=False, history_id=None): history_id = history_id or self.history_id key_param = "use_admin_key" if admin else "use_key" update_url = self._api_url("histories/{}/contents/{}".format(history_id, item_id), **{key_param: True}) update_response = put(update_url, json=data) return update_response def _update_permissions(self, url, data, admin=False): key_param = "use_admin_key" if admin else "use_key" update_url = self._api_url(url, **{key_param: True}) update_response = put(update_url, json=data) return update_response def _raw_update_batch(self, data): update_url = self._api_url("histories/%s/contents" % (self.history_id), use_key=True) update_response = put(update_url, json=data) return update_response
[docs] def test_delete(self): hda1 = self._new_dataset(self.history_id) self._wait_for_history(self.history_id) assert str(self.__show(hda1).json()["deleted"]).lower() == "false" delete_response = self._delete("histories/{}/contents/{}".format(self.history_id, hda1["id"])) assert delete_response.status_code < 300 # Something in the 200s :). assert str(self.__show(hda1).json()["deleted"]).lower() == "true"
[docs] def test_purge(self): hda1 = self._new_dataset(self.history_id) self._wait_for_history(self.history_id) assert str(self.__show(hda1).json()["deleted"]).lower() == "false" assert str(self.__show(hda1).json()["purged"]).lower() == "false" data = {'purge': True} delete_response = self._delete("histories/{}/contents/{}".format(self.history_id, hda1["id"]), data=data) assert delete_response.status_code < 300 # Something in the 200s :). assert str(self.__show(hda1).json()["deleted"]).lower() == "true" assert str(self.__show(hda1).json()["purged"]).lower() == "true"
[docs] def test_dataset_collection_creation_on_contents(self): payload = self.dataset_collection_populator.create_pair_payload( self.history_id, type="dataset_collection" ) endpoint = "histories/%s/contents" % self.history_id self._check_pair_creation(endpoint, payload)
[docs] def test_dataset_collection_creation_on_typed_contents(self): payload = self.dataset_collection_populator.create_pair_payload( self.history_id, ) endpoint = "histories/%s/contents/dataset_collections" % self.history_id self._check_pair_creation(endpoint, payload)
[docs] def test_dataset_collection_create_from_exisiting_datasets_with_new_tags(self): with self.dataset_populator.test_history() as history_id: hda_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")['id'] hda2_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")['id'] update_response = self._raw_update(hda2_id, dict(tags=['existing:tag']), history_id=history_id).json() assert update_response['tags'] == ['existing:tag'] creation_payload = {'collection_type': 'list', 'history_id': history_id, 'element_identifiers': json.dumps([{'id': hda_id, 'src': 'hda', 'name': 'element_id1', 'tags': ['my_new_tag']}, {'id': hda2_id, 'src': 'hda', 'name': 'element_id2', 'tags': ['another_new_tag']} ]), 'type': 'dataset_collection', 'copy_elements': True} r = self._post("histories/%s/contents" % self.history_id, creation_payload).json() assert r['elements'][0]['object']['id'] != hda_id, "HDA has not been copied" assert len(r['elements'][0]['object']['tags']) == 1 assert r['elements'][0]['object']['tags'][0] == 'my_new_tag' assert len(r['elements'][1]['object']['tags']) == 2, r['elements'][1]['object']['tags'] original_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id, dataset_id=hda_id) assert len(original_hda['tags']) == 0, original_hda['tags']
def _check_pair_creation(self, endpoint, payload): pre_collection_count = self.__count_contents(type="dataset_collection") pre_dataset_count = self.__count_contents(type="dataset") pre_combined_count = self.__count_contents(type="dataset,dataset_collection") dataset_collection_response = self._post(endpoint, payload) dataset_collection = self.__check_create_collection_response(dataset_collection_response) post_collection_count = self.__count_contents(type="dataset_collection") post_dataset_count = self.__count_contents(type="dataset") post_combined_count = self.__count_contents(type="dataset,dataset_collection") # Test filtering types with index. assert pre_collection_count == 0 assert post_collection_count == 1 assert post_combined_count == pre_dataset_count + 1 assert post_combined_count == pre_combined_count + 1 assert pre_dataset_count == post_dataset_count # Test show dataset colleciton. collection_url = "histories/{}/contents/dataset_collections/{}".format(self.history_id, dataset_collection["id"]) show_response = self._get(collection_url) self._assert_status_code_is(show_response, 200) dataset_collection = show_response.json() self._assert_has_keys(dataset_collection, "url", "name", "deleted") assert not dataset_collection["deleted"] delete_response = delete(self._api_url(collection_url, use_key=True)) self._assert_status_code_is(delete_response, 200) show_response = self._get(collection_url) dataset_collection = show_response.json() assert dataset_collection["deleted"]
[docs] @skip_without_tool("collection_creates_list") def test_jobs_summary_simple_hdca(self): create_response = self.dataset_collection_populator.create_list_in_history(self.history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"]) hdca_id = create_response.json()["id"] run = self.dataset_populator.run_collection_creates_list(self.history_id, hdca_id) collections = run['output_collections'] collection = collections[0] jobs_summary_url = "histories/{}/contents/dataset_collections/{}/jobs_summary".format(self.history_id, collection["id"]) jobs_summary_response = self._get(jobs_summary_url) self._assert_status_code_is(jobs_summary_response, 200) jobs_summary = jobs_summary_response.json() self._assert_has_keys(jobs_summary, "populated_state", "states")
[docs] @skip_without_tool("cat1") def test_jobs_summary_implicit_hdca(self): create_response = self.dataset_collection_populator.create_pair_in_history(self.history_id, contents=["123", "456"]) hdca_id = create_response.json()["id"] inputs = { "input1": {'batch': True, 'values': [{'src': 'hdca', 'id': hdca_id}]}, } run = self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=self.history_id) self.dataset_populator.wait_for_history_jobs(self.history_id) collections = run['implicit_collections'] collection = collections[0] jobs_summary_url = "histories/{}/contents/dataset_collections/{}/jobs_summary".format(self.history_id, collection["id"]) jobs_summary_response = self._get(jobs_summary_url) self._assert_status_code_is(jobs_summary_response, 200) jobs_summary = jobs_summary_response.json() self._assert_has_keys(jobs_summary, "populated_state", "states") states = jobs_summary["states"] assert states.get("ok") == 2, states
[docs] def test_dataset_collection_hide_originals(self): payload = self.dataset_collection_populator.create_pair_payload( self.history_id, type="dataset_collection" ) payload["hide_source_items"] = True dataset_collection_response = self._post("histories/%s/contents" % self.history_id, payload) self.__check_create_collection_response(dataset_collection_response) contents_response = self._get("histories/%s/contents" % self.history_id) datasets = [d for d in contents_response.json() if d["history_content_type"] == "dataset" and d["hid"] in [1, 2]] # Assert two datasets in source were hidden. assert len(datasets) == 2 assert not datasets[0]["visible"] assert not datasets[1]["visible"]
[docs] def test_update_dataset_collection(self): payload = self.dataset_collection_populator.create_pair_payload( self.history_id, type="dataset_collection" ) dataset_collection_response = self._post("histories/%s/contents" % self.history_id, payload) self._assert_status_code_is(dataset_collection_response, 200) hdca = dataset_collection_response.json() update_url = self._api_url("histories/{}/contents/dataset_collections/{}".format(self.history_id, hdca["id"]), use_key=True) # Awkward json.dumps required here because of https://trello.com/c/CQwmCeG6 body = json.dumps(dict(name="newnameforpair")) update_response = put(update_url, data=body) self._assert_status_code_is(update_response, 200) show_response = self.__show(hdca) assert str(show_response.json()["name"]) == "newnameforpair"
[docs] def test_hdca_copy(self): hdca = self.dataset_collection_populator.create_pair_in_history(self.history_id).json() hdca_id = hdca["id"] second_history_id = self._new_history() create_data = dict( source='hdca', content=hdca_id, ) assert len(self._get("histories/%s/contents/dataset_collections" % second_history_id).json()) == 0 create_response = self._post("histories/%s/contents/dataset_collections" % second_history_id, create_data) self.__check_create_collection_response(create_response) contents = self._get("histories/%s/contents/dataset_collections" % second_history_id).json() assert len(contents) == 1 new_forward, _ = self.__get_paired_response_elements(contents[0]) self._assert_has_keys(new_forward, "history_id") assert new_forward["history_id"] == self.history_id
[docs] def test_hdca_copy_and_elements(self): hdca = self.dataset_collection_populator.create_pair_in_history(self.history_id).json() hdca_id = hdca["id"] second_history_id = self._new_history() create_data = dict( source='hdca', content=hdca_id, copy_elements=True, ) assert len(self._get("histories/%s/contents/dataset_collections" % second_history_id).json()) == 0 create_response = self._post("histories/%s/contents/dataset_collections" % second_history_id, create_data) self.__check_create_collection_response(create_response) contents = self._get("histories/%s/contents/dataset_collections" % second_history_id).json() assert len(contents) == 1 new_forward, _ = self.__get_paired_response_elements(contents[0]) self._assert_has_keys(new_forward, "history_id") assert new_forward["history_id"] == second_history_id
def __get_paired_response_elements(self, contents): hdca = self.__show(contents).json() self._assert_has_keys(hdca, "name", "deleted", "visible", "elements") elements = hdca["elements"] assert len(elements) == 2 element0 = elements[0] element1 = elements[1] self._assert_has_keys(element0, "object") self._assert_has_keys(element1, "object") return element0["object"], element1["object"]
[docs] def test_hdca_from_library_datasets(self): ld = self.library_populator.new_library_dataset("el1") ldda_id = ld["ldda_id"] element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}] history_id = self._new_history() create_data = dict( history_id=history_id, type="dataset_collection", name="Test From Library", element_identifiers=json.dumps(element_identifiers), collection_type="list", ) create_response = self._post("histories/%s/contents/dataset_collections" % history_id, create_data) hdca = self.__check_create_collection_response(create_response) elements = hdca["elements"] assert len(elements) == 1 hda = elements[0]["object"] assert hda["hda_ldda"] == "hda" assert hda["history_content_type"] == "dataset" assert hda["copied_from_ldda_id"] == ldda_id assert hda['history_id'] == history_id
[docs] def test_hdca_from_inaccessible_library_datasets(self): library, library_dataset = self.library_populator.new_library_dataset_in_private_library("HDCACreateInaccesibleLibrary") ldda_id = library_dataset["id"] element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}] create_data = dict( history_id=self.history_id, type="dataset_collection", name="Test From Library", element_identifiers=json.dumps(element_identifiers), collection_type="list", ) with self._different_user(): second_history_id = self._new_history() create_response = self._post("histories/%s/contents/dataset_collections" % second_history_id, create_data) self._assert_status_code_is(create_response, 403)
def __check_create_collection_response(self, response): self._assert_status_code_is(response, 200) dataset_collection = response.json() self._assert_has_keys(dataset_collection, "url", "name", "deleted", "visible", "elements") return dataset_collection def __show(self, contents): show_response = self._get("histories/{}/contents/{}s/{}".format(self.history_id, contents["history_content_type"], contents["id"])) return show_response def __count_contents(self, history_id=None, **kwds): if history_id is None: history_id = self.history_id contents_response = self._get("histories/%s/contents" % history_id, kwds) return len(contents_response.json()) def __assert_hda_has_full_details(self, hda_details): self._assert_has_keys(hda_details, "display_types", "display_apps") def __check_for_hda(self, contents_response, hda): self._assert_status_code_is(contents_response, 200) contents = contents_response.json() assert len(contents) == 1 hda_summary = contents[0] self.__assert_matches_hda(hda, hda_summary) return hda_summary def __assert_matches_hda(self, input_hda, query_hda): self._assert_has_keys(query_hda, "id", "name") assert input_hda["name"] == query_hda["name"] assert input_hda["id"] == query_hda["id"]
[docs] def test_job_state_summary_field(self): create_response = self.dataset_collection_populator.create_pair_in_history(self.history_id, contents=["123", "456"]) self._assert_status_code_is(create_response, 200) contents_response = self._get("histories/%s/contents?v=dev&keys=job_state_summary&view=summary" % self.history_id) self._assert_status_code_is(contents_response, 200) contents = contents_response.json() for c in filter(lambda c: c['history_content_type'] == 'dataset_collection', contents): assert isinstance(c, dict) assert 'job_state_summary' in c assert isinstance(c['job_state_summary'], dict)