Source code for galaxy_test.api.test_history_contents

import urllib.parse
from typing import (
    Any,
    List,
    Optional,
    Tuple,
)

from galaxy_test.api._framework import ApiTestCase
from galaxy_test.base.decorators import (
    requires_admin,
    requires_celery,
    requires_new_library,
    requires_new_user,
)
from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    LibraryPopulator,
    skip_without_tool,
)

TEST_SOURCE_URI = "http://google.com/dataset.txt"
TEST_HASH_FUNCTION = "MD5"
TEST_HASH_VALUE = "moocowpretendthisisahas"


# TODO: Test anonymous access.
[docs]class TestHistoryContentsApi(ApiTestCase): dataset_populator: DatasetPopulator
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.library_populator = LibraryPopulator(self.galaxy_interactor)
[docs] def test_index_hda_summary(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) contents_response = self._get(f"histories/{history_id}/contents") hda_summary = self.__check_for_hda(contents_response, hda1) assert "display_types" not in hda_summary # Quick summary, not full details
[docs] @requires_admin def test_make_private_and_public(self, history_id): hda1 = self._wait_for_new_hda(history_id) update_url = f"histories/{history_id}/contents/{hda1['id']}/permissions" role_id = self.dataset_populator.user_private_role_id() # Give manage permission to the user. payload = { "access": [], "manage": [role_id], } update_response = self._update_permissions(update_url, payload, admin=True) self._assert_status_code_is(update_response, 200) self._assert_other_user_can_access(history_id, hda1["id"]) # Then we restrict access. payload = { "action": "make_private", } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) self._assert_other_user_cannot_access(history_id, hda1["id"]) # Then we restrict access. payload = { "action": "remove_restrictions", } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) self._assert_other_user_can_access(history_id, hda1["id"])
[docs] @requires_new_user def test_set_permissions_add_admin_history_contents(self, history_id): self._verify_dataset_permissions(history_id, "history_contents")
[docs] @requires_new_user def test_set_permissions_add_admin_datasets(self, history_id): self._verify_dataset_permissions(history_id, "dataset")
def _verify_dataset_permissions(self, history_id: str, api_endpoint): hda1 = self._wait_for_new_hda(history_id) hda_id = hda1["id"] if api_endpoint == "history_contents": update_url = f"histories/{history_id}/contents/{hda_id}/permissions" else: update_url = f"datasets/{hda_id}/permissions" role_id = self.dataset_populator.user_private_role_id() payload = { "access": [role_id], "manage": [role_id], } # Other users cannot modify permissions. with self._different_user(): update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 403) # First the details render for another user. self._assert_other_user_can_access(history_id, hda_id) # Then we restrict access. update_response = self._update_permissions(update_url, payload, admin=True) self._assert_status_code_is(update_response, 200) # Finally the details don't render. self._assert_other_user_cannot_access(history_id, hda_id) # But they do for the original user. contents_response = self._get(f"histories/{history_id}/contents/{hda_id}").json() assert "name" in contents_response update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) payload = { "access": [role_id], "manage": [role_id], } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) self._assert_other_user_cannot_access(history_id, hda_id) user_id = self.dataset_populator.user_id() with self._different_user(): different_user_id = self.dataset_populator.user_id() combined_user_role = self.dataset_populator.create_role( [user_id, different_user_id], description="role for testing permissions" ) payload = { "access": [combined_user_role["id"]], "manage": [role_id], } update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 200) # Now other user can see dataset again with access permission. self._assert_other_user_can_access(history_id, hda_id) # access doesn't imply management though... with self._different_user(): update_response = self._update_permissions(update_url, payload) self._assert_status_code_is(update_response, 403) def _assert_other_user_cannot_access(self, history_id: str, history_content_id: str): with self._different_user(): contents_response = self.dataset_populator.get_history_dataset_details_raw( history_id=history_id, dataset_id=history_content_id ) assert contents_response.status_code == 403 def _assert_other_user_can_access(self, history_id: str, history_content_id: str): with self._different_user(): contents_response = self.dataset_populator.get_history_dataset_details_raw( history_id=history_id, dataset_id=history_content_id ) contents_response.raise_for_status() assert "name" in contents_response.json()
[docs] def test_index_hda_all_details(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) contents_response = self._get(f"histories/{history_id}/contents?details=all") hda_details = self.__check_for_hda(contents_response, hda1) self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_hda_detail_by_id(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) contents_response = self._get(f"histories/{history_id}/contents?details={hda1['id']}") hda_details = self.__check_for_hda(contents_response, hda1) self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_detail_parameter_error(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) # Invalid details should return 400 contents_response = self._get(f"histories/{history_id}/contents?v=dev&details= ") self._assert_status_code_is(contents_response, 400) # Empty IDs should return 400 contents_response = self._get(f"histories/{history_id}/contents?v=dev&details=,,{hda1['id']}") self._assert_status_code_is(contents_response, 400) # Invalid IDs should return 400 contents_response = self._get(f"histories/{history_id}/contents?v=dev&details={hda1['id']}, ,{hda1['id']}") self._assert_status_code_is(contents_response, 400)
[docs] def test_view_and_keys_parameters_for_datasets(self, history_id): created_hda = self.dataset_populator.new_dataset(history_id) hda_id = created_hda["id"] item_type = "dataset" summary_view_keys = [ "id", "name", "history_id", "hid", "history_content_type", "deleted", "visible", "type_id", "type", "create_time", "update_time", "url", "tags", "dataset_id", "state", "extension", "purged", "genome_build", ] detailed_view_only_keys = [ "created_from_basename", "api_type", "accessible", "misc_info", "resubmitted", "misc_blurb", "hda_ldda", "file_size", "hashes", "drs_id", "validated_state_message", "creating_job", "file_ext", "copied_from_ldda_id", "peek", "validated_state", "permissions", "uuid", "model_class", "sources", "annotation", "display_apps", "display_types", "file_name", "download_url", "rerunnable", "data_type", "meta_files", ] detailed_view_keys = summary_view_keys + detailed_view_only_keys # Expect summary view to be returned. view = "summary" keys = None item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys) self._assert_has_keys(item, *summary_view_keys) for key in detailed_view_only_keys: assert key not in item # Expect "dynamic" metadata fields to NOT be returned. metadata_keys = [key for key in item.keys() if key.startswith("metadata_")] assert len(metadata_keys) == 0 # Expect detailed view to be returned. view = "detailed" keys = None item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys) self._assert_has_keys(item, *detailed_view_keys) # Expect also "dynamic" metadata fields to be returned. metadata_keys = [key for key in item.keys() if key.startswith("metadata_")] assert len(metadata_keys) > 0 # Expect only specific keys to be returned. view = None keys = detailed_view_only_keys + ["id"] item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys) self._assert_has_keys(item, *keys) assert len(item) == len(keys) # Make sure the id is encoded in the response. assert isinstance(item["id"], str) assert item["id"] == hda_id # Expect combined view and keys to be returned. view = "summary" keys = ["file_size"] item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys) self._assert_has_keys(item, *summary_view_keys, *keys) assert "peek" not in item
[docs] def test_view_and_keys_parameters_for_collections(self, history_id): fetch_response = self.dataset_collection_populator.create_list_in_history(history_id, direct_upload=True).json() created_dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) hdca_id = created_dataset_collection["id"] item_type = "dataset_collection" # Collections seems to have 3 different views, "collection", "element" and "element-reference". # We cannot use the keys parameter with collections, so we will only test the view parameter. collection_view_keys = [ "hid", "history_id", "history_content_type", "visible", "deleted", "job_source_id", "job_source_type", "job_state_summary", "create_time", "update_time", "id", "name", "collection_id", "collection_type", "populated", "populated_state", "populated_state_message", "element_count", "elements_datatypes", "type", "model_class", "tags", "url", "contents_url", ] element_view_only_keys = ["elements", "implicit_collection_jobs_id"] element_view_keys = collection_view_keys + element_view_only_keys # Expect summary view to be returned. view = "collection" item = self._get_history_item_with_custom_serialization(history_id, hdca_id, item_type, view) self._assert_has_keys(item, *collection_view_keys) for key in element_view_only_keys: assert key not in item # Expect detailed view to be returned. view = "element" item = self._get_history_item_with_custom_serialization(history_id, hdca_id, item_type, view) self._assert_has_keys(item, *element_view_keys) # The `elements` field should be populated for the "element" view. assert len(item["elements"]) > 0
def _get_history_item_with_custom_serialization( self, history_id: str, content_id: str, item_type: str, expected_view: Optional[str] = None, expected_keys: Optional[List[str]] = None, ): view = f"&view={expected_view}" if expected_view else "" keys = f"&keys={','.join(expected_keys)}" if expected_keys else "" response = self._get(f"histories/{history_id}/contents/{item_type}s/{content_id}?v=dev{view}{keys}") self._assert_status_code_is_ok(response) return response.json()
[docs] def test_show_hda(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) show_response = self.__show(history_id, hda1) self._assert_status_code_is(show_response, 200) self.__assert_matches_hda(hda1, show_response.json())
def _create_copy(self, history_id: str): hda1 = self.dataset_populator.new_dataset(history_id) create_data = dict( source="hda", content=hda1["id"], ) second_history_id = self.dataset_populator.new_history() assert self.__count_contents(second_history_id) == 0 create_response = self._post(f"histories/{second_history_id}/contents", create_data, json=True) self._assert_status_code_is(create_response, 200) return create_response.json()
[docs] def test_hda_copy(self, history_id): response = self._create_copy(history_id) assert self.__count_contents(response["history_id"]) == 1
[docs] def test_inheritance_chain(self, history_id): response = self._create_copy(history_id) inheritance_chain_response = self._get(f"datasets/{response['id']}/inheritance_chain") self._assert_status_code_is_ok(inheritance_chain_response) inheritance_chain = inheritance_chain_response.json() assert len(inheritance_chain) == 1
[docs] @requires_new_library def test_library_copy(self, history_id): ld = self.library_populator.new_library_dataset("lda_test_library") create_data = dict( source="library", content=ld["id"], ) assert self.__count_contents(history_id) == 0 create_response = self._post(f"histories/{history_id}/contents", create_data, json=True) self._assert_status_code_is(create_response, 200) assert self.__count_contents(history_id) == 1
[docs] def test_update(self, history_id): hda1 = self._wait_for_new_hda(history_id) assert str(hda1["deleted"]).lower() == "false" update_response = self._update(history_id, hda1["id"], dict(deleted=True)) self._assert_status_code_is(update_response, 200) show_response = self.__show(history_id, hda1) assert str(show_response.json()["deleted"]).lower() == "true" update_response = self._update(history_id, hda1["id"], dict(name="Updated Name")) assert self.__show(history_id, hda1).json()["name"] == "Updated Name" update_response = self._update(history_id, hda1["id"], dict(name="Updated Name")) assert self.__show(history_id, hda1).json()["name"] == "Updated Name" unicode_name = "ржевский сапоги" update_response = self._update(history_id, hda1["id"], dict(name=unicode_name)) updated_hda = self.__show(history_id, hda1).json() assert updated_hda["name"] == unicode_name, updated_hda quoted_name = '"Mooo"' update_response = self._update(history_id, hda1["id"], dict(name=quoted_name)) updated_hda = self.__show(history_id, hda1).json() assert updated_hda["name"] == quoted_name, quoted_name data = { "dataset_id": hda1["id"], "name": "moocow", "dbkey": "?", "annotation": None, "info": "my info is", "operation": "attributes", } update_response = self._set_edit_update(data) # No key or anything supplied, expect a permission problem. # A bit questionable but I think this is a 400 instead of a 403 so that # we don't distinguish between this is a valid ID you don't have access to # and this is an invalid ID. assert update_response.status_code == 400, update_response.content
[docs] def test_update_batch(self, history_id): hda1 = self._wait_for_new_hda(history_id) assert str(hda1["deleted"]).lower() == "false" assert str(hda1["visible"]).lower() == "true" # update deleted flag => true payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=True) update_response = self._update_batch(history_id, payload) objects = update_response.json() assert objects[0]["deleted"] is True assert objects[0]["visible"] is True # update visibility flag => false payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], visible=False) update_response = self._update_batch(history_id, payload) objects = update_response.json() assert objects[0]["deleted"] is True assert objects[0]["visible"] is False # update both flags payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=False, visible=True) update_response = self._update_batch(history_id, payload) objects = update_response.json() assert objects[0]["deleted"] is False assert objects[0]["visible"] is True
[docs] def test_update_batch_collections(self, history_id): hdca = self._create_pair_collection(history_id) assert hdca["deleted"] is False assert hdca["visible"] is True # update deleted flag => true payload = dict(items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], deleted=True) update_response = self._update_batch(history_id, payload) objects = update_response.json() assert objects[0]["deleted"] is True assert objects[0]["visible"] is True # update visibility flag => false payload = dict(items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], visible=False) update_response = self._update_batch(history_id, payload) objects = update_response.json() assert objects[0]["deleted"] is True assert objects[0]["visible"] is False # update both flags payload = dict( items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], deleted=False, visible=True ) update_response = self._update_batch(history_id, payload) objects = update_response.json() assert objects[0]["deleted"] is False assert objects[0]["visible"] is True
[docs] def test_update_type_failures(self, history_id): hda1 = self._wait_for_new_hda(history_id) update_response = self._update(history_id, hda1["id"], dict(deleted="not valid")) self._assert_status_code_is(update_response, 400)
def _wait_for_new_hda(self, history_id: str): hda1 = self.dataset_populator.new_dataset(history_id) self.dataset_populator.wait_for_history(history_id) return hda1 def _set_edit_update(self, data): update_response = self._put(urllib.parse.urljoin(self.url, "dataset/set_edit"), data=data, json=True) return update_response def _update(self, history_id: str, item_id, data, admin=False): update_response = self._put(f"histories/{history_id}/contents/{item_id}", data=data, json=True, admin=admin) return update_response def _update_permissions(self, url, data, admin=False): update_response = self._put(url, data=data, json=True, admin=admin) return update_response def _update_batch(self, history_id: str, data): update_response = self._put(f"histories/{history_id}/contents", data=data, json=True) return update_response
[docs] def test_delete(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) self.dataset_populator.wait_for_history(history_id) assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false" delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}") assert delete_response.status_code < 300 # Something in the 200s :). assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "true"
[docs] def test_delete_anon(self): with self._different_user(anon=True): history_id = self._get(urllib.parse.urljoin(self.url, "history/current_history_json")).json()["id"] hda1 = self.dataset_populator.new_dataset(history_id) self.dataset_populator.wait_for_history(history_id) assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false" delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}") assert delete_response.status_code < 300 # Something in the 200s :). assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "true"
[docs] def test_delete_permission_denied(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) with self._different_user(anon=True): delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}") assert delete_response.status_code == 403 assert delete_response.json()["err_msg"] == "HistoryDatasetAssociation is not owned by user"
[docs] def test_purge(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) self.dataset_populator.wait_for_history(history_id) assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false" assert str(self.__show(history_id, hda1).json()["purged"]).lower() == "false" data = {"purge": True} delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}", data=data, json=True) assert delete_response.status_code < 300 # Something in the 200s :). # Purging and deleting the dataset may or may not happen asynchronously. # On 202 the request was accepted and purging will happen later. if delete_response.status_code == 202: self.dataset_populator.wait_for_purge(history_id, hda1["id"]) else: assert self.__show(history_id, hda1).json()["deleted"] assert self.__show(history_id, hda1).json()["purged"]
[docs] def test_dataset_collection_creation_on_contents(self, history_id): payload = self.dataset_collection_populator.create_pair_payload( history_id, type="dataset_collection", wait=True ) endpoint = "tools/fetch" self._check_pair_creation(history_id, endpoint, payload)
[docs] def test_dataset_collection_creation_on_typed_contents(self, history_id): payload = self.dataset_collection_populator.create_pair_payload(history_id, wait=True) endpoint = "tools/fetch" self._check_pair_creation(history_id, endpoint, payload)
[docs] def test_dataset_collection_create_from_exisiting_datasets_with_new_tags(self): with self.dataset_populator.test_history() as history_id: hda_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")["id"] hda2_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")["id"] update_response = self._update(history_id, hda2_id, dict(tags=["existing:tag"])).json() assert update_response["tags"] == ["existing:tag"] creation_payload = { "collection_type": "list", "history_id": history_id, "element_identifiers": [ {"id": hda_id, "src": "hda", "name": "element_id1", "tags": ["my_new_tag"]}, {"id": hda2_id, "src": "hda", "name": "element_id2", "tags": ["another_new_tag"]}, ], "type": "dataset_collection", "copy_elements": True, } r = self._post(f"histories/{history_id}/contents", creation_payload, json=True).json() assert r["elements"][0]["object"]["id"] != hda_id, "HDA has not been copied" assert len(r["elements"][0]["object"]["tags"]) == 1 assert r["elements"][0]["object"]["tags"][0] == "my_new_tag" assert len(r["elements"][1]["object"]["tags"]) == 2, r["elements"][1]["object"]["tags"] original_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id, dataset_id=hda_id) assert len(original_hda["tags"]) == 0, original_hda["tags"]
def _check_pair_creation(self, history_id: str, endpoint, payload): pre_collection_count = self.__count_contents(history_id, type="dataset_collection") pre_dataset_count = self.__count_contents(history_id, type="dataset") pre_combined_count = self.__count_contents(history_id, type="dataset,dataset_collection") dataset_collection_response = self._post(endpoint, payload, json=True) dataset_collection = self.__check_create_collection_response(dataset_collection_response) post_collection_count = self.__count_contents(history_id, type="dataset_collection") post_dataset_count = self.__count_contents(history_id, type="dataset") post_combined_count = self.__count_contents(history_id, type="dataset,dataset_collection") # Test filtering types with index. assert pre_collection_count == 0 assert post_collection_count == 1 assert post_combined_count == pre_dataset_count + 1 assert post_combined_count == pre_combined_count + 1 assert pre_dataset_count == post_dataset_count # Test show dataset collection. collection_url = f"histories/{history_id}/contents/dataset_collections/{dataset_collection['id']}" show_response = self._get(collection_url) self._assert_status_code_is(show_response, 200) dataset_collection = show_response.json() self._assert_has_keys(dataset_collection, "url", "name", "deleted") assert not dataset_collection["deleted"] delete_response = self._delete(collection_url) self._assert_status_code_is(delete_response, 200) show_response = self._get(collection_url) dataset_collection = show_response.json() assert dataset_collection["deleted"]
[docs] @skip_without_tool("collection_creates_list") def test_jobs_summary_simple_hdca(self, history_id): fetch_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"] ).json() hdca_id = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)["id"] run = self.dataset_populator.run_collection_creates_list(history_id, hdca_id) collections = run["output_collections"] collection = collections[0] jobs_summary_url = f"histories/{history_id}/contents/dataset_collections/{collection['id']}/jobs_summary" jobs_summary_response = self._get(jobs_summary_url) self._assert_status_code_is(jobs_summary_response, 200) jobs_summary = jobs_summary_response.json() self._assert_has_keys(jobs_summary, "populated_state", "states")
[docs] @skip_without_tool("cat1") def test_jobs_summary_implicit_hdca(self, history_id): create_response = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["123", "456"], wait=True ) hdca_id = create_response.json()["outputs"][0]["id"] inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } run = self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id) self.dataset_populator.wait_for_history_jobs(history_id) collections = run["implicit_collections"] collection = collections[0] jobs_summary_url = f"histories/{history_id}/contents/dataset_collections/{collection['id']}/jobs_summary" jobs_summary_response = self._get(jobs_summary_url) self._assert_status_code_is(jobs_summary_response, 200) jobs_summary = jobs_summary_response.json() self._assert_has_keys(jobs_summary, "populated_state", "states") states = jobs_summary["states"] assert states.get("ok") == 2, states
[docs] def test_dataset_collection_hide_originals(self, history_id): payload = self.dataset_collection_populator.create_pair_payload( history_id, type="dataset_collection", direct_upload=False, copy_elements=False ) payload["hide_source_items"] = True dataset_collection_response = self._post(f"histories/{history_id}/contents", payload, json=True) self.__check_create_collection_response(dataset_collection_response) contents_response = self._get(f"histories/{history_id}/contents") datasets = [d for d in contents_response.json() if d["history_content_type"] == "dataset"] # Assert two datasets in source were hidden. assert len(datasets) == 2 assert not datasets[0]["visible"] assert not datasets[1]["visible"]
[docs] def test_update_dataset_collection(self, history_id): hdca = self._create_pair_collection(history_id) body = dict(name="newnameforpair") update_response = self._put( f"histories/{history_id}/contents/dataset_collections/{hdca['id']}", data=body, json=True ) self._assert_status_code_is(update_response, 200) show_response = self.__show(history_id, hdca) assert str(show_response.json()["name"]) == "newnameforpair"
[docs] def test_update_batch_dataset_collection(self, history_id): hdca = self._create_pair_collection(history_id) body = {"items": [{"history_content_type": "dataset_collection", "id": hdca["id"]}], "name": "newnameforpair"} update_response = self._put(f"histories/{history_id}/contents", data=body, json=True) self._assert_status_code_is(update_response, 200) show_response = self.__show(history_id, hdca) assert str(show_response.json()["name"]) == "newnameforpair"
def _create_pair_collection(self, history_id: str): payload = self.dataset_collection_populator.create_pair_payload(history_id, type="dataset_collection") dataset_collection_response = self._post("tools/fetch", payload, json=True) self._assert_status_code_is(dataset_collection_response, 200) hdca = dataset_collection_response.json()["output_collections"][0] return hdca
[docs] def test_hdca_copy(self, history_id): hdca = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json() hdca_id = hdca["outputs"][0]["id"] second_history_id = self.dataset_populator.new_history() create_data = dict( source="hdca", content=hdca_id, ) assert len(self._get(f"histories/{second_history_id}/contents/dataset_collections").json()) == 0 create_response = self._post( f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True ) self.__check_create_collection_response(create_response) contents = self._get(f"histories/{second_history_id}/contents/dataset_collections").json() assert len(contents) == 1 new_forward, _ = self.__get_paired_response_elements(history_id, contents[0]) self._assert_has_keys(new_forward, "history_id") assert new_forward["history_id"] == second_history_id
[docs] def test_hdca_copy_with_new_dbkey(self, history_id): fetch_response = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json() hdca = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) hdca_id = hdca["id"] assert hdca["elements"][0]["object"]["metadata_dbkey"] == "?" assert hdca["elements"][0]["object"]["genome_build"] == "?" create_data = {"source": "hdca", "content": hdca_id, "dbkey": "hg19"} create_response = self._post(f"histories/{history_id}/contents/dataset_collections", create_data, json=True) collection = self.__check_create_collection_response(create_response) new_forward = collection["elements"][0]["object"] assert new_forward["metadata_dbkey"] == "hg19" assert new_forward["genome_build"] == "hg19"
[docs] def test_hdca_copy_and_elements(self, history_id): hdca = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()["outputs"][0] hdca_id = hdca["id"] second_history_id = self.dataset_populator.new_history() create_data = dict( source="hdca", content=hdca_id, copy_elements=True, ) assert len(self._get(f"histories/{second_history_id}/contents/dataset_collections").json()) == 0 create_response = self._post( f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True ) self.__check_create_collection_response(create_response) contents = self._get(f"histories/{second_history_id}/contents/dataset_collections").json() assert len(contents) == 1 new_forward, _ = self.__get_paired_response_elements(history_id, contents[0]) self._assert_has_keys(new_forward, "history_id") assert new_forward["history_id"] == second_history_id
def __get_paired_response_elements(self, history_id: str, contents): hdca = self.__show(history_id, contents).json() self._assert_has_keys(hdca, "name", "deleted", "visible", "elements") elements = hdca["elements"] assert len(elements) == 2 element0 = elements[0] element1 = elements[1] self._assert_has_keys(element0, "object") self._assert_has_keys(element1, "object") return element0["object"], element1["object"]
[docs] @requires_new_library def test_hdca_from_library_datasets(self, history_id): ld = self.library_populator.new_library_dataset("el1") ldda_id = ld["ldda_id"] element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}] create_data = dict( history_id=history_id, type="dataset_collection", name="Test From Library", element_identifiers=element_identifiers, collection_type="list", ) create_response = self._post(f"histories/{history_id}/contents/dataset_collections", create_data, json=True) hdca = self.__check_create_collection_response(create_response) elements = hdca["elements"] assert len(elements) == 1 hda = elements[0]["object"] assert hda["hda_ldda"] == "hda" assert hda["history_content_type"] == "dataset" assert hda["copied_from_ldda_id"] == ldda_id assert hda["history_id"] == history_id
[docs] @requires_new_library def test_hdca_from_inaccessible_library_datasets(self, history_id): library, library_dataset = self.library_populator.new_library_dataset_in_private_library( "HDCACreateInaccesibleLibrary" ) ldda_id = library_dataset["id"] element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}] create_data = dict( history_id=history_id, type="dataset_collection", name="Test From Library", element_identifiers=element_identifiers, collection_type="list", ) with self._different_user(): second_history_id = self.dataset_populator.new_history() create_response = self._post( f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True ) self._assert_status_code_is(create_response, 403)
def __check_create_collection_response(self, response): self._assert_status_code_is(response, 200) dataset_collection = response.json() if "output_collections" in dataset_collection: dataset_collection = dataset_collection["output_collections"][0] self._assert_has_keys(dataset_collection, "url", "name", "deleted", "visible", "elements") return dataset_collection def __show(self, history_id, contents): show_response = self._get( f"histories/{history_id}/contents/{contents['history_content_type']}s/{contents['id']}" ) return show_response def __count_contents(self, history_id: str, **kwds): contents_response = self._get(f"histories/{history_id}/contents", kwds) return len(contents_response.json()) def __assert_hda_has_full_details(self, hda_details): self._assert_has_keys(hda_details, "display_types", "display_apps") def __check_for_hda(self, contents_response, hda): self._assert_status_code_is(contents_response, 200) contents = contents_response.json() assert len(contents) == 1 hda_summary = contents[0] self.__assert_matches_hda(hda, hda_summary) return hda_summary def __assert_matches_hda(self, input_hda, query_hda): self._assert_has_keys(query_hda, "id", "name") assert input_hda["name"] == query_hda["name"] assert input_hda["id"] == query_hda["id"]
[docs] def test_job_state_summary_field(self, history_id): create_response = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["123", "456"], ) self._assert_status_code_is(create_response, 200) contents_response = self._get(f"histories/{history_id}/contents?v=dev&keys=job_state_summary&view=summary") self._assert_status_code_is(contents_response, 200) contents = contents_response.json() for c in contents: if c["history_content_type"] == "dataset_collection": assert isinstance(c, dict) assert "job_state_summary" in c assert isinstance(c["job_state_summary"], dict)
[docs] def test_index_filter_by_type(self, history_id): self.dataset_populator.new_dataset(history_id) self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True) contents_response = self._get(f"histories/{history_id}/contents").json() num_items = len(contents_response) expected_num_collections = 1 expected_num_datasets = num_items - expected_num_collections contents_response = self._get(f"histories/{history_id}/contents?types=dataset").json() assert len(contents_response) == expected_num_datasets contents_response = self._get(f"histories/{history_id}/contents?types=dataset_collection").json() assert len(contents_response) == expected_num_collections contents_response = self._get(f"histories/{history_id}/contents?types=dataset,dataset_collection").json() assert len(contents_response) == expected_num_datasets + expected_num_collections contents_response = self._get(f"histories/{history_id}/contents?types=dataset&types=dataset_collection").json() assert len(contents_response) == expected_num_datasets + expected_num_collections
[docs] def test_index_filter_by_name_ignores_case(self, history_id): self.dataset_populator.new_dataset(history_id, name="AC") self.dataset_populator.new_dataset(history_id, name="ac") self.dataset_populator.new_dataset(history_id, name="Bc") contains_text = "a" contents_response = self._get( f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}" ).json() assert len(contents_response) == 2 contains_text = "b" contents_response = self._get( f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}" ).json() assert len(contents_response) == 1 contains_text = "c" contents_response = self._get( f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}" ).json() assert len(contents_response) == 3 contains_text = "%" contents_response = self._get( f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}" ).json() assert len(contents_response) == 0
[docs] def test_elements_datatypes_field(self, history_id): collection_name = "homogeneous" expected_datatypes = ["txt"] elements = [ # List with all elements of txt datatype (homogeneous) {"name": "test1", "src": "pasted", "paste_content": "abc", "ext": "txt"}, {"name": "test2", "src": "pasted", "paste_content": "abc", "ext": "txt"}, ] self._upload_collection_list_with_elements(history_id, collection_name, elements) self._assert_collection_has_expected_elements_datatypes(history_id, collection_name, expected_datatypes) collection_name = "heterogeneous" expected_datatypes = ["txt", "tabular"] elements = [ # List with txt and tabular datatype (heterogeneous) {"name": "test2", "src": "pasted", "paste_content": "abc", "ext": "txt"}, {"name": "test3", "src": "pasted", "paste_content": "a,b,c\n", "ext": "tabular"}, ] self._upload_collection_list_with_elements(history_id, collection_name, elements) self._assert_collection_has_expected_elements_datatypes(history_id, collection_name, expected_datatypes)
def _upload_collection_list_with_elements(self, history_id: str, collection_name: str, elements: List[Any]): create_homogeneous_response = self.dataset_collection_populator.upload_collection( history_id, "list", elements=elements, name=collection_name, wait=True ) self._assert_status_code_is_ok(create_homogeneous_response) def _assert_collection_has_expected_elements_datatypes(self, history_id, collection_name, expected_datatypes): contents_response = self._get( f"histories/{history_id}/contents?v=dev&view=detailed&q=name-eq&qv={collection_name}" ) self._assert_status_code_is(contents_response, 200) collection = contents_response.json()[0] assert sorted(collection["elements_datatypes"]) == sorted(expected_datatypes)
[docs] @skip_without_tool("cat1") def test_cannot_run_tools_on_immutable_histories(self, history_id): create_response = self.dataset_collection_populator.create_pair_in_history( history_id, contents=["123", "456"], wait=True ) hdca_id = create_response.json()["outputs"][0]["id"] inputs = { "input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]}, } # once we purge the history, it becomes immutable self._delete(f"histories/{history_id}", data={"purge": True}, json=True) with self.assertRaisesRegex(AssertionError, "History is immutable"): self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id)
[docs] def test_cannot_update_dataset_collection_on_immutable_history(self, history_id): hdca = self._create_pair_collection(history_id) # once we purge the history, it becomes immutable self._delete(f"histories/{history_id}", data={"purge": True}, json=True) body = dict(name="newnameforpair") update_response = self._put( f"histories/{history_id}/contents/dataset_collections/{hdca['id']}", data=body, json=True ) self._assert_status_code_is(update_response, 403) assert update_response.json()["err_msg"] == "History is immutable"
[docs] def test_cannot_update_dataset_on_immutable_history(self, history_id): hda1 = self._wait_for_new_hda(history_id) # once we purge the history, it becomes immutable self._delete(f"histories/{history_id}", data={"purge": True}, json=True) update_response = self._update(history_id, hda1["id"], dict(name="Updated Name")) self._assert_status_code_is(update_response, 403) assert update_response.json()["err_msg"] == "History is immutable"
[docs]class TestHistoryContentsApiBulkOperation(ApiTestCase): """ Test the `/api/histories/{history_id}/contents/bulk` endpoint and the new `count` special view for `/api/histories/{history_id}/contents?v=dev` """
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_explicit_items_selection(self): with self.dataset_populator.test_history() as history_id: datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id) # Hide 2 collections and 3 datasets, 5 in total payload = { "operation": "hide", "items": [ { "id": datasets_ids[0], "history_content_type": "dataset", }, { "id": collection_ids[0], "history_content_type": "dataset_collection", }, { "id": datasets_ids[1], "history_content_type": "dataset", }, { "id": collection_ids[1], "history_content_type": "dataset_collection", }, { "id": datasets_ids[2], "history_content_type": "dataset", }, ], } expected_hidden_item_ids = [item["id"] for item in payload["items"]] expected_hidden_item_count = len(expected_hidden_item_ids) bulk_operation_result = self._apply_bulk_operation(history_id, payload) history_contents = self._get_history_contents(history_id) hidden_items = self._get_hidden_items_from_history_contents(history_contents) self._assert_bulk_success(bulk_operation_result, expected_hidden_item_count) assert len(hidden_items) == expected_hidden_item_count for item in hidden_items: assert item["id"] in expected_hidden_item_ids
[docs] def test_dynamic_query_selection(self): with self.dataset_populator.test_history() as history_id: _, collection_ids, history_contents = self._create_test_history_contents(history_id) # Hide all collections using query payload = {"operation": "hide"} query = "q=history_content_type-eq&qv=dataset_collection" bulk_operation_result = self._apply_bulk_operation(history_id, payload, query) history_contents = self._get_history_contents(history_id) hidden_items = self._get_hidden_items_from_history_contents(history_contents) self._assert_bulk_success(bulk_operation_result, len(collection_ids)) assert len(hidden_items) == len(collection_ids) for item in hidden_items: assert item["id"] in collection_ids
[docs] def test_bulk_operations(self): with self.dataset_populator.test_history() as history_id: datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id) # Hide all datasets using query payload = {"operation": "hide"} query = "q=history_content_type-eq&qv=dataset" bulk_operation_result = self._apply_bulk_operation(history_id, payload, query) history_contents = self._get_history_contents(history_id) hidden_items = self._get_hidden_items_from_history_contents(history_contents) self._assert_bulk_success(bulk_operation_result, len(datasets_ids)) assert len(hidden_items) == len(datasets_ids) # Unhide datasets_ids[0] and datasets_ids[3] payload = { "operation": "unhide", "items": [ { "id": datasets_ids[0], "history_content_type": "dataset", }, { "id": datasets_ids[3], "history_content_type": "dataset", }, ], } expected_unhidden_count = len(payload["items"]) bulk_operation_result = self._apply_bulk_operation(history_id, payload) history_contents = self._get_history_contents(history_id) self._assert_bulk_success(bulk_operation_result, expected_unhidden_count) for item in history_contents: if item["id"] in [datasets_ids[0], datasets_ids[3]]: assert item["visible"] is True # Delete all hidden datasets (total dataset - 2 previously unhidden) expected_hidden_item_count = len(datasets_ids) - expected_unhidden_count payload = {"operation": "delete"} query = "q=history_content_type-eq&qv=dataset&q=visible&qv=False" bulk_operation_result = self._apply_bulk_operation(history_id, payload, query) history_contents = self._get_history_contents(history_id) hidden_items = self._get_hidden_items_from_history_contents(history_contents) self._assert_bulk_success(bulk_operation_result, expected_hidden_item_count) for item in hidden_items: assert item["deleted"] is True # Undelete all items in history payload = { "operation": "undelete", } bulk_operation_result = self._apply_bulk_operation(history_id, payload) history_contents = self._get_history_contents(history_id) self._assert_bulk_success(bulk_operation_result, len(history_contents)) for item in history_contents: assert item["deleted"] is False # Purge datasets_ids[0] and collection_ids[0] payload = { "operation": "purge", "items": [ { "id": datasets_ids[0], "history_content_type": "dataset", }, { "id": collection_ids[0], "history_content_type": "dataset_collection", }, ], } expected_purged_count = len(payload["items"]) bulk_operation_result = self._apply_bulk_operation(history_id, payload) history_contents = self._get_history_contents(history_id) self._assert_bulk_success(bulk_operation_result, expected_purged_count) purged_dataset = self._get_dataset_with_id_from_history_contents(history_contents, datasets_ids[0]) self.dataset_populator.wait_for_purge(history_id=history_id, content_id=purged_dataset["id"]) assert purged_dataset["deleted"] is True purged_collection = self._get_collection_with_id_from_history_contents(history_contents, collection_ids[0]) # collections don't have a `purged` attribute but they should be marked deleted on purge assert purged_collection["deleted"] is True # Un-deleting a purged dataset should not have any effect and raise an error payload = { "operation": "undelete", "items": [ { "id": datasets_ids[0], "history_content_type": "dataset", }, ], } bulk_operation_result = self._apply_bulk_operation(history_id, payload) history_contents = self._get_history_contents(history_id) assert bulk_operation_result["success_count"] == 0 assert len(bulk_operation_result["errors"]) == 1 error = bulk_operation_result["errors"][0] assert error["item"]["id"] == datasets_ids[0] purged_dataset = self._get_dataset_with_id_from_history_contents(history_contents, datasets_ids[0]) assert purged_dataset["deleted"] is True assert purged_dataset["purged"] is True
[docs] def test_purging_collection_should_purge_contents(self): with self.dataset_populator.test_history() as history_id: datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id) # Purge all collections payload = {"operation": "purge"} query = "q=history_content_type-eq&qv=dataset_collection" bulk_operation_result = self._apply_bulk_operation(history_id, payload, query) history_contents = self._get_history_contents(history_id) self._assert_bulk_success(bulk_operation_result, len(collection_ids)) for item in history_contents: assert item["deleted"] is True if item["history_content_type"] == "dataset": self.dataset_populator.wait_for_purge(history_id=history_id, content_id=item["id"])
[docs] def test_deleting_collection_should_delete_contents(self): with self.dataset_populator.test_history() as history_id: num_expected_datasets = 2 # Create collection and datasets collection_ids = self._create_collection_in_history(history_id, num_collections=1) original_collection_id = collection_ids[0] # Check datasets are hidden and not deleted history_contents = self._get_history_contents(history_id) datasets = list(filter(lambda item: item["history_content_type"] == "dataset", history_contents)) assert len(datasets) == num_expected_datasets for dataset in datasets: assert dataset["deleted"] is False assert dataset["visible"] is False # Delete the collection payload = { "operation": "delete", "items": [ { "id": original_collection_id, "history_content_type": "dataset_collection", }, ], } bulk_operation_result = self._apply_bulk_operation(history_id, payload) self._assert_bulk_success(bulk_operation_result, 1) # We expect the original collection and the datasets to be deleted num_expected_history_contents = num_expected_datasets + 1 history_contents = self._get_history_contents(history_id) assert len(history_contents) == num_expected_history_contents for item in history_contents: assert item["deleted"] is True
[docs] @requires_new_user def test_only_owner_can_apply_bulk_operations(self): with self.dataset_populator.test_history() as history_id: self._create_test_history_contents(history_id) with self._different_user(): payload = {"operation": "hide"} bulk_operation_result = self._apply_bulk_operation(history_id, payload, expected_status_code=403) assert bulk_operation_result["err_msg"]
[docs] def test_bulk_tag_changes(self): with self.dataset_populator.test_history() as history_id: _, collection_ids, history_contents = self._create_test_history_contents(history_id) expected_tags = ["cool_tag", "tag01"] # Add same tag to all items payload = { "operation": "add_tags", "params": { "type": "add_tags", "tags": expected_tags, }, } expected_success_count = len(history_contents) bulk_operation_result = self._apply_bulk_operation(history_id, payload) self._assert_bulk_success(bulk_operation_result, expected_success_count) history_contents = self._get_history_contents(history_id) for item in history_contents: for expected_tag in expected_tags: assert expected_tag in item["tags"] # Remove tag from all collections payload = { "operation": "remove_tags", "params": { "type": "remove_tags", "tags": expected_tags, }, } query = "q=history_content_type-eq&qv=dataset_collection" expected_success_count = len(collection_ids) bulk_operation_result = self._apply_bulk_operation(history_id, payload, query) self._assert_bulk_success(bulk_operation_result, expected_success_count) history_contents = self._get_history_contents(history_id) for item in history_contents: if item["history_content_type"] == "dataset_collection": assert not item["tags"] else: for expected_tag in expected_tags: assert expected_tag in item["tags"]
[docs] @requires_celery def test_bulk_dbkey_change(self): with self.dataset_populator.test_history() as history_id: _, _, history_contents = self._create_test_history_contents(history_id) expected_dbkey = "apiMel3" # Change dbkey of all items payload = { "operation": "change_dbkey", "params": { "type": "change_dbkey", "dbkey": expected_dbkey, }, } # All items should succeed expected_success_count = len(history_contents) bulk_operation_result = self._apply_bulk_operation(history_id, payload) self._assert_bulk_success(bulk_operation_result, expected_success_count) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey") for item in history_contents: if item["history_content_type"] == "dataset": assert item["dbkey"] == expected_dbkey
[docs] @requires_celery def test_bulk_dbkey_change_dataset_collection(self): with self.dataset_populator.test_history() as history_id: _, collection_ids, history_contents = self._create_test_history_contents(history_id) expected_dbkey = "apiMel3" # Change dbkey of all items payload = { "operation": "change_dbkey", "params": { "type": "change_dbkey", "dbkey": expected_dbkey, }, } # All items should succeed expected_success_count = len(collection_ids) query = "q=history_content_type-eq&qv=dataset_collection" bulk_operation_result = self._apply_bulk_operation(history_id, payload, query) self._assert_bulk_success(bulk_operation_result, expected_success_count) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey") # now verify that datasets within collections have the expected dbkey for item in history_contents: if item["history_content_type"] == "dataset": assert item["dbkey"] == expected_dbkey
[docs] def test_bulk_datatype_change(self): with self.dataset_populator.test_history() as history_id: num_datasets = 3 dataset_ids = [] for _ in range(num_datasets): hda_id = self.dataset_populator.new_dataset(history_id)["id"] dataset_ids.append(hda_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") for item in history_contents: assert item["extension"] == "txt" assert item["data_type"] == "galaxy.datatypes.data.Text" assert "metadata_column_names" not in item self.dataset_populator.wait_for_history_jobs(history_id) expected_datatype = "tabular" # Change datatype of all datasets payload = { "operation": "change_datatype", "params": { "type": "change_datatype", "datatype": expected_datatype, }, } bulk_operation_result = self._apply_bulk_operation(history_id, payload) self._assert_bulk_success(bulk_operation_result, expected_success_count=num_datasets) # Wait for celery tasks to finish self.dataset_populator.wait_for_history(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") for item in history_contents: assert item["extension"] == "tabular" assert item["data_type"] == "galaxy.datatypes.tabular.Tabular" assert "metadata_column_names" in item
[docs] def test_bulk_datatype_change_collection(self): with self.dataset_populator.test_history() as history_id: _, collection_ids, history_contents = self._create_test_history_contents(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") original_collection_update_times = [] for item in history_contents: if item["history_content_type"] == "dataset": assert item["extension"] == "txt" assert item["data_type"] == "galaxy.datatypes.data.Text" assert "metadata_column_names" not in item if item["history_content_type"] == "dataset_collection": original_collection_update_times.append(item["update_time"]) expected_datatype = "tabular" # Change datatype of all datasets payload = { "operation": "change_datatype", "params": { "type": "change_datatype", "datatype": expected_datatype, }, } bulk_operation_result = self._apply_bulk_operation( history_id, payload, query="q=history_content_type-eq&qv=dataset_collection" ) self._assert_bulk_success(bulk_operation_result, expected_success_count=len(collection_ids)) # Wait for celery tasks to finish self.dataset_populator.wait_for_history(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") new_collection_update_times = [] for item in history_contents: if item["history_content_type"] == "dataset": assert item["extension"] == "tabular" assert item["data_type"] == "galaxy.datatypes.tabular.Tabular" assert "metadata_column_names" in item if item["history_content_type"] == "dataset_collection": new_collection_update_times.append(item["update_time"]) assert original_collection_update_times != new_collection_update_times
[docs] def test_bulk_datatype_change_should_skip_set_metadata_on_deferred_data(self): with self.dataset_populator.test_history() as history_id: details = self.dataset_populator.create_deferred_hda( history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed" ) assert details["state"] == "deferred" assert details["extension"] == "bed" assert details["data_type"] == "galaxy.datatypes.interval.Bed" assert "metadata_columns" in details assert "metadata_delimiter" in details assert "metadata_comment_lines" in details new_datatype = "txt" payload = { "operation": "change_datatype", "params": { "type": "change_datatype", "datatype": new_datatype, }, } bulk_operation_result = self._apply_bulk_operation(history_id, payload) self._assert_bulk_success(bulk_operation_result, expected_success_count=1) history_contents = self._get_history_contents(history_id, query="?v=dev&view=detailed") for item in history_contents: assert item["state"] == "deferred" assert item["extension"] == "txt" assert item["data_type"] == "galaxy.datatypes.data.Text" # It should discard old metadata assert "metadata_columns" not in item assert "metadata_delimiter" not in item assert "metadata_comment_lines" not in item
[docs] @skip_without_tool("cat_data_and_sleep") def test_bulk_datatype_change_errors(self): with self.dataset_populator.test_history() as history_id: num_datasets = 3 dataset_ids = [] for _ in range(num_datasets): hda_id = self.dataset_populator.new_dataset(history_id)["id"] dataset_ids.append(hda_id) self.dataset_populator.wait_for_history_jobs(history_id) # Run tool on last dataset input_hda_id = hda_id inputs = { "input1": {"src": "hda", "id": input_hda_id}, "sleep_time": 10, } run_response = self.dataset_populator.run_tool_raw( "cat_data_and_sleep", inputs, history_id, ) output_hda_id = run_response.json()["outputs"][0]["id"] num_datasets += 1 # the new output dataset dataset_ids_in_use = [input_hda_id, output_hda_id] expected_datatype = "tabular" # Change datatype of all datasets (4 in total) payload = { "operation": "change_datatype", "params": { "type": "change_datatype", "datatype": expected_datatype, }, } bulk_operation_result = self._apply_bulk_operation(history_id, payload) # First 2 datasets are ok assert bulk_operation_result["success_count"] == 2 # Last 2 are in use (input and output) and must fail assert len(bulk_operation_result["errors"]) == 2 for error in bulk_operation_result["errors"]: assert error["item"]["id"] in dataset_ids_in_use
[docs] def test_bulk_datatype_change_auto(self): with self.dataset_populator.test_history() as history_id: tabular_contents = "1\t2\t3\na\tb\tc\n" dataset_ids = [ self.dataset_populator.new_dataset(history_id, content=tabular_contents)["id"], self.dataset_populator.new_dataset(history_id, content=tabular_contents)["id"], ] self.dataset_populator.wait_for_history_jobs(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") for item in history_contents: assert item["extension"] == "txt" assert item["data_type"] == "galaxy.datatypes.data.Text" assert "metadata_delimiter" not in item # Change datatype of all datasets to auto payload = { "operation": "change_datatype", "params": { "type": "change_datatype", "datatype": "auto", }, } bulk_operation_result = self._apply_bulk_operation(history_id, payload) self._assert_bulk_success(bulk_operation_result, expected_success_count=len(dataset_ids)) # Wait for celery tasks to finish self.dataset_populator.wait_for_history(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") # Should be detected as `tabular` and set the metadata correctly for item in history_contents: assert item["extension"] == "tabular" assert item["data_type"] == "galaxy.datatypes.tabular.Tabular" assert "metadata_delimiter" in item assert item["metadata_delimiter"] == "\t"
[docs] def test_index_returns_expected_total_matches(self): with self.dataset_populator.test_history() as history_id: datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id) self._test_index_total_matches(history_id, expected_total_matches=len(history_contents)) self._test_index_total_matches( history_id, search_query="&q=history_content_type-eq&qv=dataset_collection", expected_total_matches=len(collection_ids), ) self._test_index_total_matches( history_id, search_query="&q=history_content_type-eq&qv=dataset", expected_total_matches=len(datasets_ids), )
[docs] def test_index_with_stats_fails_with_non_orm_filters(self): with self.dataset_populator.test_history() as history_id: self._create_test_history_contents(history_id) invalid_filter_keys_with_stats = ["data_type", "annotation"] for filter_key in invalid_filter_keys_with_stats: response = self._get_contents_with_stats( history_id, search_query=f"&q={filter_key}-contains&qv=anything", ) self._assert_status_code_is(response, 400)
[docs] def test_index_with_stats_has_extra_serialization(self): expected_extra_keys_in_collections = ["elements_datatypes"] with self.dataset_populator.test_history() as history_id: self._create_collection_in_history(history_id) response = self._get_contents_with_stats( history_id, search_query="&q=history_content_type-eq&qv=dataset_collection", ) self._assert_status_code_is(response, 200) contents_with_stats = response.json() assert contents_with_stats["contents"] collection = contents_with_stats["contents"][0] self._assert_has_keys(collection, *expected_extra_keys_in_collections)
def _get_contents_with_stats(self, history_id: str, search_query: str = ""): headers = {"accept": "application/vnd.galaxy.history.contents.stats+json"} search_response = self._get(f"histories/{history_id}/contents?v=dev{search_query}", headers=headers) return search_response def _test_index_total_matches(self, history_id: str, expected_total_matches: int, search_query: str = ""): search_response = self._get_contents_with_stats(history_id, search_query) self._assert_status_code_is(search_response, 200) self._assert_total_matches_is(search_response.json(), expected_total_matches) def _assert_total_matches_is(self, response, expected_total_matches: int): assert response["stats"] assert response["stats"]["total_matches"] assert response["stats"]["total_matches"] == expected_total_matches def _create_test_history_contents(self, history_id) -> Tuple[List[str], List[str], List[Any]]: """Creates 3 collections (pairs) and their corresponding datasets (6 in total) Returns a tuple with the list of ids for the datasets and the collections and the complete history contents """ num_expected_collections = 3 num_expected_datasets = num_expected_collections * 2 collection_ids = self._create_collection_in_history(history_id, num_expected_collections) history_contents = self._get_history_contents(history_id) datasets = filter(lambda item: item["history_content_type"] == "dataset", history_contents) datasets_ids = [dataset["id"] for dataset in datasets] assert len(history_contents) == num_expected_datasets + num_expected_collections assert len(datasets_ids) == num_expected_datasets for dataset_id in datasets_ids: self._put(f"histories/{history_id}/contents/{dataset_id}", {"visible": True}, json=True).json() # All items are visible history_contents = self._get_history_contents(history_id) for item in history_contents: assert item["visible"] return datasets_ids, collection_ids, history_contents def _create_collection_in_history(self, history_id, num_collections=1) -> List[str]: collection_ids = [] for _ in range(num_collections): collection_id = self.dataset_collection_populator.create_pair_in_history( history_id=history_id, wait=True ).json()["outputs"][0]["id"] collection_ids.append(collection_id) return collection_ids def _get_history_contents(self, history_id: str, query: str = ""): return self._get(f"histories/{history_id}/contents{query}").json() def _get_hidden_items_from_history_contents(self, history_contents) -> List[Any]: return [content for content in history_contents if not content["visible"]] def _get_collection_with_id_from_history_contents(self, history_contents, collection_id: str) -> Optional[Any]: return self._get_item_with_id_from_history_contents(history_contents, "dataset_collection", collection_id) def _get_dataset_with_id_from_history_contents(self, history_contents, dataset_id: str) -> Optional[Any]: return self._get_item_with_id_from_history_contents(history_contents, "dataset", dataset_id) def _get_item_with_id_from_history_contents( self, history_contents, history_content_type: str, dataset_id: str ) -> Optional[Any]: for item in history_contents: if item["history_content_type"] == history_content_type and item["id"] == dataset_id: return item return None def _apply_bulk_operation(self, history_id: str, payload, query: str = "", expected_status_code: int = 200): original_history_update_time = self._get_history_update_time(history_id) if query: query = f"?{query}" response = self._put( f"histories/{history_id}/contents/bulk{query}", data=payload, json=True, ) self._assert_status_code_is(response, expected_status_code) result = response.json() if "err_msg" in result or result.get("success_count", 0) == 0: # We don't need to check the history update time if there was an error or no items were updated return result # After a successful operation, history update time should be updated so the changes can be detected by the frontend after_bulk_operation_history_update_time = self._get_history_update_time(history_id) assert after_bulk_operation_history_update_time > original_history_update_time return result def _assert_bulk_success(self, bulk_operation_result, expected_success_count: int): assert bulk_operation_result["success_count"] == expected_success_count, bulk_operation_result assert not bulk_operation_result["errors"] def _get_history_update_time(self, history_id: str): history = self._get(f"histories/{history_id}").json() return history.get("update_time")