Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_history_contents
import urllib.parse
from typing import (
Any,
List,
Optional,
Tuple,
)
from galaxy_test.api._framework import ApiTestCase
from galaxy_test.base.decorators import (
requires_admin,
requires_celery,
requires_new_library,
requires_new_user,
)
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
LibraryPopulator,
skip_without_tool,
)
TEST_SOURCE_URI = "http://google.com/dataset.txt"
TEST_HASH_FUNCTION = "MD5"
TEST_HASH_VALUE = "moocowpretendthisisahas"
# TODO: Test anonymous access.
[docs]class TestHistoryContentsApi(ApiTestCase):
dataset_populator: DatasetPopulator
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
self.library_populator = LibraryPopulator(self.galaxy_interactor)
[docs] def test_index_hda_summary(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
contents_response = self._get(f"histories/{history_id}/contents")
hda_summary = self.__check_for_hda(contents_response, hda1)
assert "display_types" not in hda_summary # Quick summary, not full details
[docs] @requires_admin
def test_make_private_and_public(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
update_url = f"histories/{history_id}/contents/{hda1['id']}/permissions"
role_id = self.dataset_populator.user_private_role_id()
# Give manage permission to the user.
payload = {
"access": [],
"manage": [role_id],
}
update_response = self._update_permissions(update_url, payload, admin=True)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_can_access(history_id, hda1["id"])
# Then we restrict access.
payload = {
"action": "make_private",
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_cannot_access(history_id, hda1["id"])
# Then we restrict access.
payload = {
"action": "remove_restrictions",
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_can_access(history_id, hda1["id"])
[docs] @requires_new_user
def test_set_permissions_add_admin_history_contents(self, history_id):
self._verify_dataset_permissions(history_id, "history_contents")
[docs] @requires_new_user
def test_set_permissions_add_admin_datasets(self, history_id):
self._verify_dataset_permissions(history_id, "dataset")
def _verify_dataset_permissions(self, history_id: str, api_endpoint):
hda1 = self._wait_for_new_hda(history_id)
hda_id = hda1["id"]
if api_endpoint == "history_contents":
update_url = f"histories/{history_id}/contents/{hda_id}/permissions"
else:
update_url = f"datasets/{hda_id}/permissions"
role_id = self.dataset_populator.user_private_role_id()
payload = {
"access": [role_id],
"manage": [role_id],
}
# Other users cannot modify permissions.
with self._different_user():
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 403)
# First the details render for another user.
self._assert_other_user_can_access(history_id, hda_id)
# Then we restrict access.
update_response = self._update_permissions(update_url, payload, admin=True)
self._assert_status_code_is(update_response, 200)
# Finally the details don't render.
self._assert_other_user_cannot_access(history_id, hda_id)
# But they do for the original user.
contents_response = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert "name" in contents_response
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
payload = {
"access": [role_id],
"manage": [role_id],
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_cannot_access(history_id, hda_id)
user_id = self.dataset_populator.user_id()
with self._different_user():
different_user_id = self.dataset_populator.user_id()
combined_user_role = self.dataset_populator.create_role(
[user_id, different_user_id], description="role for testing permissions"
)
payload = {
"access": [combined_user_role["id"]],
"manage": [role_id],
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
# Now other user can see dataset again with access permission.
self._assert_other_user_can_access(history_id, hda_id)
# access doesn't imply management though...
with self._different_user():
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 403)
def _assert_other_user_cannot_access(self, history_id: str, history_content_id: str):
with self._different_user():
contents_response = self.dataset_populator.get_history_dataset_details_raw(
history_id=history_id, dataset_id=history_content_id
)
assert contents_response.status_code == 403
def _assert_other_user_can_access(self, history_id: str, history_content_id: str):
with self._different_user():
contents_response = self.dataset_populator.get_history_dataset_details_raw(
history_id=history_id, dataset_id=history_content_id
)
contents_response.raise_for_status()
assert "name" in contents_response.json()
[docs] def test_index_hda_all_details(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
contents_response = self._get(f"histories/{history_id}/contents?details=all")
hda_details = self.__check_for_hda(contents_response, hda1)
self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_hda_detail_by_id(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
contents_response = self._get(f"histories/{history_id}/contents?details={hda1['id']}")
hda_details = self.__check_for_hda(contents_response, hda1)
self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_detail_parameter_error(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
# Invalid details should return 400
contents_response = self._get(f"histories/{history_id}/contents?v=dev&details= ")
self._assert_status_code_is(contents_response, 400)
# Empty IDs should return 400
contents_response = self._get(f"histories/{history_id}/contents?v=dev&details=,,{hda1['id']}")
self._assert_status_code_is(contents_response, 400)
# Invalid IDs should return 400
contents_response = self._get(f"histories/{history_id}/contents?v=dev&details={hda1['id']}, ,{hda1['id']}")
self._assert_status_code_is(contents_response, 400)
[docs] def test_view_and_keys_parameters_for_datasets(self, history_id):
created_hda = self.dataset_populator.new_dataset(history_id)
hda_id = created_hda["id"]
item_type = "dataset"
summary_view_keys = [
"id",
"name",
"history_id",
"hid",
"history_content_type",
"deleted",
"visible",
"type_id",
"type",
"create_time",
"update_time",
"url",
"tags",
"dataset_id",
"state",
"extension",
"purged",
"genome_build",
]
detailed_view_only_keys = [
"created_from_basename",
"api_type",
"accessible",
"misc_info",
"resubmitted",
"misc_blurb",
"hda_ldda",
"file_size",
"hashes",
"drs_id",
"validated_state_message",
"creating_job",
"file_ext",
"copied_from_ldda_id",
"peek",
"validated_state",
"permissions",
"uuid",
"model_class",
"sources",
"annotation",
"display_apps",
"display_types",
"file_name",
"download_url",
"rerunnable",
"data_type",
"meta_files",
]
detailed_view_keys = summary_view_keys + detailed_view_only_keys
# Expect summary view to be returned.
view = "summary"
keys = None
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *summary_view_keys)
for key in detailed_view_only_keys:
assert key not in item
# Expect "dynamic" metadata fields to NOT be returned.
metadata_keys = [key for key in item.keys() if key.startswith("metadata_")]
assert len(metadata_keys) == 0
# Expect detailed view to be returned.
view = "detailed"
keys = None
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *detailed_view_keys)
# Expect also "dynamic" metadata fields to be returned.
metadata_keys = [key for key in item.keys() if key.startswith("metadata_")]
assert len(metadata_keys) > 0
# Expect only specific keys to be returned.
view = None
keys = detailed_view_only_keys + ["id"]
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *keys)
assert len(item) == len(keys)
# Make sure the id is encoded in the response.
assert isinstance(item["id"], str)
assert item["id"] == hda_id
# Expect combined view and keys to be returned.
view = "summary"
keys = ["file_size"]
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *summary_view_keys, *keys)
assert "peek" not in item
[docs] def test_view_and_keys_parameters_for_collections(self, history_id):
fetch_response = self.dataset_collection_populator.create_list_in_history(history_id, direct_upload=True).json()
created_dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = created_dataset_collection["id"]
item_type = "dataset_collection"
# Collections seems to have 3 different views, "collection", "element" and "element-reference".
# We cannot use the keys parameter with collections, so we will only test the view parameter.
collection_view_keys = [
"hid",
"history_id",
"history_content_type",
"visible",
"deleted",
"job_source_id",
"job_source_type",
"job_state_summary",
"create_time",
"update_time",
"id",
"name",
"collection_id",
"collection_type",
"populated",
"populated_state",
"populated_state_message",
"element_count",
"elements_datatypes",
"type",
"model_class",
"tags",
"url",
"contents_url",
]
element_view_only_keys = ["elements", "implicit_collection_jobs_id"]
element_view_keys = collection_view_keys + element_view_only_keys
# Expect summary view to be returned.
view = "collection"
item = self._get_history_item_with_custom_serialization(history_id, hdca_id, item_type, view)
self._assert_has_keys(item, *collection_view_keys)
for key in element_view_only_keys:
assert key not in item
# Expect detailed view to be returned.
view = "element"
item = self._get_history_item_with_custom_serialization(history_id, hdca_id, item_type, view)
self._assert_has_keys(item, *element_view_keys)
# The `elements` field should be populated for the "element" view.
assert len(item["elements"]) > 0
def _get_history_item_with_custom_serialization(
self,
history_id: str,
content_id: str,
item_type: str,
expected_view: Optional[str] = None,
expected_keys: Optional[List[str]] = None,
):
view = f"&view={expected_view}" if expected_view else ""
keys = f"&keys={','.join(expected_keys)}" if expected_keys else ""
response = self._get(f"histories/{history_id}/contents/{item_type}s/{content_id}?v=dev{view}{keys}")
self._assert_status_code_is_ok(response)
return response.json()
[docs] def test_show_hda(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
show_response = self.__show(history_id, hda1)
self._assert_status_code_is(show_response, 200)
self.__assert_matches_hda(hda1, show_response.json())
def _create_copy(self, history_id: str):
hda1 = self.dataset_populator.new_dataset(history_id)
create_data = dict(
source="hda",
content=hda1["id"],
)
second_history_id = self.dataset_populator.new_history()
assert self.__count_contents(second_history_id) == 0
create_response = self._post(f"histories/{second_history_id}/contents", create_data, json=True)
self._assert_status_code_is(create_response, 200)
return create_response.json()
[docs] def test_hda_copy(self, history_id):
response = self._create_copy(history_id)
assert self.__count_contents(response["history_id"]) == 1
[docs] def test_inheritance_chain(self, history_id):
response = self._create_copy(history_id)
inheritance_chain_response = self._get(f"datasets/{response['id']}/inheritance_chain")
self._assert_status_code_is_ok(inheritance_chain_response)
inheritance_chain = inheritance_chain_response.json()
assert len(inheritance_chain) == 1
[docs] @requires_new_library
def test_library_copy(self, history_id):
ld = self.library_populator.new_library_dataset("lda_test_library")
create_data = dict(
source="library",
content=ld["id"],
)
assert self.__count_contents(history_id) == 0
create_response = self._post(f"histories/{history_id}/contents", create_data, json=True)
self._assert_status_code_is(create_response, 200)
assert self.__count_contents(history_id) == 1
[docs] def test_update(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
assert str(hda1["deleted"]).lower() == "false"
update_response = self._update(history_id, hda1["id"], dict(deleted=True))
self._assert_status_code_is(update_response, 200)
show_response = self.__show(history_id, hda1)
assert str(show_response.json()["deleted"]).lower() == "true"
update_response = self._update(history_id, hda1["id"], dict(name="Updated Name"))
assert self.__show(history_id, hda1).json()["name"] == "Updated Name"
update_response = self._update(history_id, hda1["id"], dict(name="Updated Name"))
assert self.__show(history_id, hda1).json()["name"] == "Updated Name"
unicode_name = "ржевский сапоги"
update_response = self._update(history_id, hda1["id"], dict(name=unicode_name))
updated_hda = self.__show(history_id, hda1).json()
assert updated_hda["name"] == unicode_name, updated_hda
quoted_name = '"Mooo"'
update_response = self._update(history_id, hda1["id"], dict(name=quoted_name))
updated_hda = self.__show(history_id, hda1).json()
assert updated_hda["name"] == quoted_name, quoted_name
data = {
"dataset_id": hda1["id"],
"name": "moocow",
"dbkey": "?",
"annotation": None,
"info": "my info is",
"operation": "attributes",
}
update_response = self._set_edit_update(data)
# No key or anything supplied, expect a permission problem.
# A bit questionable but I think this is a 400 instead of a 403 so that
# we don't distinguish between this is a valid ID you don't have access to
# and this is an invalid ID.
assert update_response.status_code == 400, update_response.content
[docs] def test_update_batch(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
assert str(hda1["deleted"]).lower() == "false"
assert str(hda1["visible"]).lower() == "true"
# update deleted flag => true
payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=True)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is True
# update visibility flag => false
payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], visible=False)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is False
# update both flags
payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=False, visible=True)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is False
assert objects[0]["visible"] is True
[docs] def test_update_batch_collections(self, history_id):
hdca = self._create_pair_collection(history_id)
assert hdca["deleted"] is False
assert hdca["visible"] is True
# update deleted flag => true
payload = dict(items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], deleted=True)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is True
# update visibility flag => false
payload = dict(items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], visible=False)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is False
# update both flags
payload = dict(
items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], deleted=False, visible=True
)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is False
assert objects[0]["visible"] is True
[docs] def test_update_type_failures(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
update_response = self._update(history_id, hda1["id"], dict(deleted="not valid"))
self._assert_status_code_is(update_response, 400)
def _wait_for_new_hda(self, history_id: str):
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
return hda1
def _set_edit_update(self, data):
update_response = self._put(urllib.parse.urljoin(self.url, "dataset/set_edit"), data=data, json=True)
return update_response
def _update(self, history_id: str, item_id, data, admin=False):
update_response = self._put(f"histories/{history_id}/contents/{item_id}", data=data, json=True, admin=admin)
return update_response
def _update_permissions(self, url, data, admin=False):
update_response = self._put(url, data=data, json=True, admin=admin)
return update_response
def _update_batch(self, history_id: str, data):
update_response = self._put(f"histories/{history_id}/contents", data=data, json=True)
return update_response
[docs] def test_delete(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false"
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
assert delete_response.status_code < 300 # Something in the 200s :).
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "true"
[docs] def test_delete_anon(self):
with self._different_user(anon=True):
history_id = self._get(urllib.parse.urljoin(self.url, "history/current_history_json")).json()["id"]
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false"
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
assert delete_response.status_code < 300 # Something in the 200s :).
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "true"
[docs] def test_delete_permission_denied(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
with self._different_user(anon=True):
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
assert delete_response.status_code == 403
assert delete_response.json()["err_msg"] == "HistoryDatasetAssociation is not owned by user"
[docs] def test_purge(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false"
assert str(self.__show(history_id, hda1).json()["purged"]).lower() == "false"
data = {"purge": True}
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}", data=data, json=True)
assert delete_response.status_code < 300 # Something in the 200s :).
# Purging and deleting the dataset may or may not happen asynchronously.
# On 202 the request was accepted and purging will happen later.
if delete_response.status_code == 202:
self.dataset_populator.wait_for_purge(history_id, hda1["id"])
else:
assert self.__show(history_id, hda1).json()["deleted"]
assert self.__show(history_id, hda1).json()["purged"]
[docs] def test_dataset_collection_creation_on_contents(self, history_id):
payload = self.dataset_collection_populator.create_pair_payload(
history_id, type="dataset_collection", wait=True
)
endpoint = "tools/fetch"
self._check_pair_creation(history_id, endpoint, payload)
[docs] def test_dataset_collection_creation_on_typed_contents(self, history_id):
payload = self.dataset_collection_populator.create_pair_payload(history_id, wait=True)
endpoint = "tools/fetch"
self._check_pair_creation(history_id, endpoint, payload)
[docs] def test_dataset_collection_create_from_exisiting_datasets_with_new_tags(self):
with self.dataset_populator.test_history() as history_id:
hda_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")["id"]
hda2_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")["id"]
update_response = self._update(history_id, hda2_id, dict(tags=["existing:tag"])).json()
assert update_response["tags"] == ["existing:tag"]
creation_payload = {
"collection_type": "list",
"history_id": history_id,
"element_identifiers": [
{"id": hda_id, "src": "hda", "name": "element_id1", "tags": ["my_new_tag"]},
{"id": hda2_id, "src": "hda", "name": "element_id2", "tags": ["another_new_tag"]},
],
"type": "dataset_collection",
"copy_elements": True,
}
r = self._post(f"histories/{history_id}/contents", creation_payload, json=True).json()
assert r["elements"][0]["object"]["id"] != hda_id, "HDA has not been copied"
assert len(r["elements"][0]["object"]["tags"]) == 1
assert r["elements"][0]["object"]["tags"][0] == "my_new_tag"
assert len(r["elements"][1]["object"]["tags"]) == 2, r["elements"][1]["object"]["tags"]
original_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id, dataset_id=hda_id)
assert len(original_hda["tags"]) == 0, original_hda["tags"]
def _check_pair_creation(self, history_id: str, endpoint, payload):
pre_collection_count = self.__count_contents(history_id, type="dataset_collection")
pre_dataset_count = self.__count_contents(history_id, type="dataset")
pre_combined_count = self.__count_contents(history_id, type="dataset,dataset_collection")
dataset_collection_response = self._post(endpoint, payload, json=True)
dataset_collection = self.__check_create_collection_response(dataset_collection_response)
post_collection_count = self.__count_contents(history_id, type="dataset_collection")
post_dataset_count = self.__count_contents(history_id, type="dataset")
post_combined_count = self.__count_contents(history_id, type="dataset,dataset_collection")
# Test filtering types with index.
assert pre_collection_count == 0
assert post_collection_count == 1
assert post_combined_count == pre_dataset_count + 1
assert post_combined_count == pre_combined_count + 1
assert pre_dataset_count == post_dataset_count
# Test show dataset collection.
collection_url = f"histories/{history_id}/contents/dataset_collections/{dataset_collection['id']}"
show_response = self._get(collection_url)
self._assert_status_code_is(show_response, 200)
dataset_collection = show_response.json()
self._assert_has_keys(dataset_collection, "url", "name", "deleted")
assert not dataset_collection["deleted"]
delete_response = self._delete(collection_url)
self._assert_status_code_is(delete_response, 200)
show_response = self._get(collection_url)
dataset_collection = show_response.json()
assert dataset_collection["deleted"]
[docs] @skip_without_tool("collection_creates_list")
def test_jobs_summary_simple_hdca(self, history_id):
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"]
).json()
hdca_id = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)["id"]
run = self.dataset_populator.run_collection_creates_list(history_id, hdca_id)
collections = run["output_collections"]
collection = collections[0]
jobs_summary_url = f"histories/{history_id}/contents/dataset_collections/{collection['id']}/jobs_summary"
jobs_summary_response = self._get(jobs_summary_url)
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
self._assert_has_keys(jobs_summary, "populated_state", "states")
[docs] @skip_without_tool("cat1")
def test_jobs_summary_implicit_hdca(self, history_id):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["123", "456"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
run = self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id)
self.dataset_populator.wait_for_history_jobs(history_id)
collections = run["implicit_collections"]
collection = collections[0]
jobs_summary_url = f"histories/{history_id}/contents/dataset_collections/{collection['id']}/jobs_summary"
jobs_summary_response = self._get(jobs_summary_url)
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
self._assert_has_keys(jobs_summary, "populated_state", "states")
states = jobs_summary["states"]
assert states.get("ok") == 2, states
[docs] def test_dataset_collection_hide_originals(self, history_id):
payload = self.dataset_collection_populator.create_pair_payload(
history_id, type="dataset_collection", direct_upload=False, copy_elements=False
)
payload["hide_source_items"] = True
dataset_collection_response = self._post(f"histories/{history_id}/contents", payload, json=True)
self.__check_create_collection_response(dataset_collection_response)
contents_response = self._get(f"histories/{history_id}/contents")
datasets = [d for d in contents_response.json() if d["history_content_type"] == "dataset"]
# Assert two datasets in source were hidden.
assert len(datasets) == 2
assert not datasets[0]["visible"]
assert not datasets[1]["visible"]
[docs] def test_update_dataset_collection(self, history_id):
hdca = self._create_pair_collection(history_id)
body = dict(name="newnameforpair")
update_response = self._put(
f"histories/{history_id}/contents/dataset_collections/{hdca['id']}", data=body, json=True
)
self._assert_status_code_is(update_response, 200)
show_response = self.__show(history_id, hdca)
assert str(show_response.json()["name"]) == "newnameforpair"
[docs] def test_update_batch_dataset_collection(self, history_id):
hdca = self._create_pair_collection(history_id)
body = {"items": [{"history_content_type": "dataset_collection", "id": hdca["id"]}], "name": "newnameforpair"}
update_response = self._put(f"histories/{history_id}/contents", data=body, json=True)
self._assert_status_code_is(update_response, 200)
show_response = self.__show(history_id, hdca)
assert str(show_response.json()["name"]) == "newnameforpair"
def _create_pair_collection(self, history_id: str):
payload = self.dataset_collection_populator.create_pair_payload(history_id, type="dataset_collection")
dataset_collection_response = self._post("tools/fetch", payload, json=True)
self._assert_status_code_is(dataset_collection_response, 200)
hdca = dataset_collection_response.json()["output_collections"][0]
return hdca
[docs] def test_hdca_copy(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()
hdca_id = hdca["outputs"][0]["id"]
second_history_id = self.dataset_populator.new_history()
create_data = dict(
source="hdca",
content=hdca_id,
)
assert len(self._get(f"histories/{second_history_id}/contents/dataset_collections").json()) == 0
create_response = self._post(
f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True
)
self.__check_create_collection_response(create_response)
contents = self._get(f"histories/{second_history_id}/contents/dataset_collections").json()
assert len(contents) == 1
new_forward, _ = self.__get_paired_response_elements(history_id, contents[0])
self._assert_has_keys(new_forward, "history_id")
assert new_forward["history_id"] == second_history_id
[docs] def test_hdca_copy_with_new_dbkey(self, history_id):
fetch_response = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()
hdca = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = hdca["id"]
assert hdca["elements"][0]["object"]["metadata_dbkey"] == "?"
assert hdca["elements"][0]["object"]["genome_build"] == "?"
create_data = {"source": "hdca", "content": hdca_id, "dbkey": "hg19"}
create_response = self._post(f"histories/{history_id}/contents/dataset_collections", create_data, json=True)
collection = self.__check_create_collection_response(create_response)
new_forward = collection["elements"][0]["object"]
assert new_forward["metadata_dbkey"] == "hg19"
assert new_forward["genome_build"] == "hg19"
[docs] def test_hdca_copy_and_elements(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()["outputs"][0]
hdca_id = hdca["id"]
second_history_id = self.dataset_populator.new_history()
create_data = dict(
source="hdca",
content=hdca_id,
copy_elements=True,
)
assert len(self._get(f"histories/{second_history_id}/contents/dataset_collections").json()) == 0
create_response = self._post(
f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True
)
self.__check_create_collection_response(create_response)
contents = self._get(f"histories/{second_history_id}/contents/dataset_collections").json()
assert len(contents) == 1
new_forward, _ = self.__get_paired_response_elements(history_id, contents[0])
self._assert_has_keys(new_forward, "history_id")
assert new_forward["history_id"] == second_history_id
def __get_paired_response_elements(self, history_id: str, contents):
hdca = self.__show(history_id, contents).json()
self._assert_has_keys(hdca, "name", "deleted", "visible", "elements")
elements = hdca["elements"]
assert len(elements) == 2
element0 = elements[0]
element1 = elements[1]
self._assert_has_keys(element0, "object")
self._assert_has_keys(element1, "object")
return element0["object"], element1["object"]
[docs] @requires_new_library
def test_hdca_from_library_datasets(self, history_id):
ld = self.library_populator.new_library_dataset("el1")
ldda_id = ld["ldda_id"]
element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}]
create_data = dict(
history_id=history_id,
type="dataset_collection",
name="Test From Library",
element_identifiers=element_identifiers,
collection_type="list",
)
create_response = self._post(f"histories/{history_id}/contents/dataset_collections", create_data, json=True)
hdca = self.__check_create_collection_response(create_response)
elements = hdca["elements"]
assert len(elements) == 1
hda = elements[0]["object"]
assert hda["hda_ldda"] == "hda"
assert hda["history_content_type"] == "dataset"
assert hda["copied_from_ldda_id"] == ldda_id
assert hda["history_id"] == history_id
[docs] @requires_new_library
def test_hdca_from_inaccessible_library_datasets(self, history_id):
library, library_dataset = self.library_populator.new_library_dataset_in_private_library(
"HDCACreateInaccesibleLibrary"
)
ldda_id = library_dataset["id"]
element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}]
create_data = dict(
history_id=history_id,
type="dataset_collection",
name="Test From Library",
element_identifiers=element_identifiers,
collection_type="list",
)
with self._different_user():
second_history_id = self.dataset_populator.new_history()
create_response = self._post(
f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True
)
self._assert_status_code_is(create_response, 403)
def __check_create_collection_response(self, response):
self._assert_status_code_is(response, 200)
dataset_collection = response.json()
if "output_collections" in dataset_collection:
dataset_collection = dataset_collection["output_collections"][0]
self._assert_has_keys(dataset_collection, "url", "name", "deleted", "visible", "elements")
return dataset_collection
def __show(self, history_id, contents):
show_response = self._get(
f"histories/{history_id}/contents/{contents['history_content_type']}s/{contents['id']}"
)
return show_response
def __count_contents(self, history_id: str, **kwds):
contents_response = self._get(f"histories/{history_id}/contents", kwds)
return len(contents_response.json())
def __assert_hda_has_full_details(self, hda_details):
self._assert_has_keys(hda_details, "display_types", "display_apps")
def __check_for_hda(self, contents_response, hda):
self._assert_status_code_is(contents_response, 200)
contents = contents_response.json()
assert len(contents) == 1
hda_summary = contents[0]
self.__assert_matches_hda(hda, hda_summary)
return hda_summary
def __assert_matches_hda(self, input_hda, query_hda):
self._assert_has_keys(query_hda, "id", "name")
assert input_hda["name"] == query_hda["name"]
assert input_hda["id"] == query_hda["id"]
[docs] def test_job_state_summary_field(self, history_id):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id,
contents=["123", "456"],
)
self._assert_status_code_is(create_response, 200)
contents_response = self._get(f"histories/{history_id}/contents?v=dev&keys=job_state_summary&view=summary")
self._assert_status_code_is(contents_response, 200)
contents = contents_response.json()
for c in contents:
if c["history_content_type"] == "dataset_collection":
assert isinstance(c, dict)
assert "job_state_summary" in c
assert isinstance(c["job_state_summary"], dict)
[docs] def test_index_filter_by_type(self, history_id):
self.dataset_populator.new_dataset(history_id)
self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True)
contents_response = self._get(f"histories/{history_id}/contents").json()
num_items = len(contents_response)
expected_num_collections = 1
expected_num_datasets = num_items - expected_num_collections
contents_response = self._get(f"histories/{history_id}/contents?types=dataset").json()
assert len(contents_response) == expected_num_datasets
contents_response = self._get(f"histories/{history_id}/contents?types=dataset_collection").json()
assert len(contents_response) == expected_num_collections
contents_response = self._get(f"histories/{history_id}/contents?types=dataset,dataset_collection").json()
assert len(contents_response) == expected_num_datasets + expected_num_collections
contents_response = self._get(f"histories/{history_id}/contents?types=dataset&types=dataset_collection").json()
assert len(contents_response) == expected_num_datasets + expected_num_collections
[docs] def test_index_filter_by_name_ignores_case(self, history_id):
self.dataset_populator.new_dataset(history_id, name="AC")
self.dataset_populator.new_dataset(history_id, name="ac")
self.dataset_populator.new_dataset(history_id, name="Bc")
contains_text = "a"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 2
contains_text = "b"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 1
contains_text = "c"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 3
contains_text = "%"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 0
[docs] def test_elements_datatypes_field(self, history_id):
collection_name = "homogeneous"
expected_datatypes = ["txt"]
elements = [ # List with all elements of txt datatype (homogeneous)
{"name": "test1", "src": "pasted", "paste_content": "abc", "ext": "txt"},
{"name": "test2", "src": "pasted", "paste_content": "abc", "ext": "txt"},
]
self._upload_collection_list_with_elements(history_id, collection_name, elements)
self._assert_collection_has_expected_elements_datatypes(history_id, collection_name, expected_datatypes)
collection_name = "heterogeneous"
expected_datatypes = ["txt", "tabular"]
elements = [ # List with txt and tabular datatype (heterogeneous)
{"name": "test2", "src": "pasted", "paste_content": "abc", "ext": "txt"},
{"name": "test3", "src": "pasted", "paste_content": "a,b,c\n", "ext": "tabular"},
]
self._upload_collection_list_with_elements(history_id, collection_name, elements)
self._assert_collection_has_expected_elements_datatypes(history_id, collection_name, expected_datatypes)
def _upload_collection_list_with_elements(self, history_id: str, collection_name: str, elements: List[Any]):
create_homogeneous_response = self.dataset_collection_populator.upload_collection(
history_id, "list", elements=elements, name=collection_name, wait=True
)
self._assert_status_code_is_ok(create_homogeneous_response)
def _assert_collection_has_expected_elements_datatypes(self, history_id, collection_name, expected_datatypes):
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&view=detailed&q=name-eq&qv={collection_name}"
)
self._assert_status_code_is(contents_response, 200)
collection = contents_response.json()[0]
assert sorted(collection["elements_datatypes"]) == sorted(expected_datatypes)
[docs] @skip_without_tool("cat1")
def test_cannot_run_tools_on_immutable_histories(self, history_id):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["123", "456"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
with self.assertRaisesRegex(AssertionError, "History is immutable"):
self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id)
[docs] def test_cannot_update_dataset_collection_on_immutable_history(self, history_id):
hdca = self._create_pair_collection(history_id)
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
body = dict(name="newnameforpair")
update_response = self._put(
f"histories/{history_id}/contents/dataset_collections/{hdca['id']}", data=body, json=True
)
self._assert_status_code_is(update_response, 403)
assert update_response.json()["err_msg"] == "History is immutable"
[docs] def test_cannot_update_dataset_on_immutable_history(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
update_response = self._update(history_id, hda1["id"], dict(name="Updated Name"))
self._assert_status_code_is(update_response, 403)
assert update_response.json()["err_msg"] == "History is immutable"
[docs]class TestHistoryContentsApiBulkOperation(ApiTestCase):
"""
Test the `/api/histories/{history_id}/contents/bulk` endpoint and the new
`count` special view for `/api/histories/{history_id}/contents?v=dev`
"""
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_explicit_items_selection(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Hide 2 collections and 3 datasets, 5 in total
payload = {
"operation": "hide",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
{
"id": collection_ids[0],
"history_content_type": "dataset_collection",
},
{
"id": datasets_ids[1],
"history_content_type": "dataset",
},
{
"id": collection_ids[1],
"history_content_type": "dataset_collection",
},
{
"id": datasets_ids[2],
"history_content_type": "dataset",
},
],
}
expected_hidden_item_ids = list(map(lambda item: item["id"], payload["items"]))
expected_hidden_item_count = len(expected_hidden_item_ids)
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, expected_hidden_item_count)
assert len(hidden_items) == expected_hidden_item_count
for item in hidden_items:
assert item["id"] in expected_hidden_item_ids
[docs] def test_dynamic_query_selection(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Hide all collections using query
payload = {"operation": "hide"}
query = "q=history_content_type-eq&qv=dataset_collection"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, len(collection_ids))
assert len(hidden_items) == len(collection_ids)
for item in hidden_items:
assert item["id"] in collection_ids
[docs] def test_bulk_operations(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Hide all datasets using query
payload = {"operation": "hide"}
query = "q=history_content_type-eq&qv=dataset"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, len(datasets_ids))
assert len(hidden_items) == len(datasets_ids)
# Unhide datasets_ids[0] and datasets_ids[3]
payload = {
"operation": "unhide",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
{
"id": datasets_ids[3],
"history_content_type": "dataset",
},
],
}
expected_unhidden_count = len(payload["items"])
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, expected_unhidden_count)
for item in history_contents:
if item["id"] in [datasets_ids[0], datasets_ids[3]]:
assert item["visible"] is True
# Delete all hidden datasets (total dataset - 2 previously unhidden)
expected_hidden_item_count = len(datasets_ids) - expected_unhidden_count
payload = {"operation": "delete"}
query = "q=history_content_type-eq&qv=dataset&q=visible&qv=False"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, expected_hidden_item_count)
for item in hidden_items:
assert item["deleted"] is True
# Undelete all items in history
payload = {
"operation": "undelete",
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, len(history_contents))
for item in history_contents:
assert item["deleted"] is False
# Purge datasets_ids[0] and collection_ids[0]
payload = {
"operation": "purge",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
{
"id": collection_ids[0],
"history_content_type": "dataset_collection",
},
],
}
expected_purged_count = len(payload["items"])
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, expected_purged_count)
purged_dataset = self._get_dataset_with_id_from_history_contents(history_contents, datasets_ids[0])
self.dataset_populator.wait_for_purge(history_id=history_id, content_id=purged_dataset["id"])
assert purged_dataset["deleted"] is True
purged_collection = self._get_collection_with_id_from_history_contents(history_contents, collection_ids[0])
# collections don't have a `purged` attribute but they should be marked deleted on purge
assert purged_collection["deleted"] is True
# Un-deleting a purged dataset should not have any effect and raise an error
payload = {
"operation": "undelete",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
],
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
assert bulk_operation_result["success_count"] == 0
assert len(bulk_operation_result["errors"]) == 1
error = bulk_operation_result["errors"][0]
assert error["item"]["id"] == datasets_ids[0]
purged_dataset = self._get_dataset_with_id_from_history_contents(history_contents, datasets_ids[0])
assert purged_dataset["deleted"] is True
assert purged_dataset["purged"] is True
[docs] def test_purging_collection_should_purge_contents(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Purge all collections
payload = {"operation": "purge"}
query = "q=history_content_type-eq&qv=dataset_collection"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, len(collection_ids))
for item in history_contents:
assert item["deleted"] is True
if item["history_content_type"] == "dataset":
self.dataset_populator.wait_for_purge(history_id=history_id, content_id=item["id"])
[docs] def test_deleting_collection_should_delete_contents(self):
with self.dataset_populator.test_history() as history_id:
num_expected_datasets = 2
# Create collection and datasets
collection_ids = self._create_collection_in_history(history_id, num_collections=1)
original_collection_id = collection_ids[0]
# Check datasets are hidden and not deleted
history_contents = self._get_history_contents(history_id)
datasets = list(filter(lambda item: item["history_content_type"] == "dataset", history_contents))
assert len(datasets) == num_expected_datasets
for dataset in datasets:
assert dataset["deleted"] is False
assert dataset["visible"] is False
# Delete the collection
payload = {
"operation": "delete",
"items": [
{
"id": original_collection_id,
"history_content_type": "dataset_collection",
},
],
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, 1)
# We expect the original collection and the datasets to be deleted
num_expected_history_contents = num_expected_datasets + 1
history_contents = self._get_history_contents(history_id)
assert len(history_contents) == num_expected_history_contents
for item in history_contents:
assert item["deleted"] is True
[docs] @requires_new_user
def test_only_owner_can_apply_bulk_operations(self):
with self.dataset_populator.test_history() as history_id:
self._create_test_history_contents(history_id)
with self._different_user():
payload = {"operation": "hide"}
bulk_operation_result = self._apply_bulk_operation(history_id, payload, expected_status_code=403)
assert bulk_operation_result["err_msg"]
[docs] def test_bulk_tag_changes(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
expected_tags = ["cool_tag", "tag01"]
# Add same tag to all items
payload = {
"operation": "add_tags",
"params": {
"type": "add_tags",
"tags": expected_tags,
},
}
expected_success_count = len(history_contents)
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id)
for item in history_contents:
for expected_tag in expected_tags:
assert expected_tag in item["tags"]
# Remove tag from all collections
payload = {
"operation": "remove_tags",
"params": {
"type": "remove_tags",
"tags": expected_tags,
},
}
query = "q=history_content_type-eq&qv=dataset_collection"
expected_success_count = len(collection_ids)
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id)
for item in history_contents:
if item["history_content_type"] == "dataset_collection":
assert not item["tags"]
else:
for expected_tag in expected_tags:
assert expected_tag in item["tags"]
[docs] @requires_celery
def test_bulk_dbkey_change(self):
with self.dataset_populator.test_history() as history_id:
_, _, history_contents = self._create_test_history_contents(history_id)
expected_dbkey = "apiMel3"
# Change dbkey of all items
payload = {
"operation": "change_dbkey",
"params": {
"type": "change_dbkey",
"dbkey": expected_dbkey,
},
}
# All items should succeed
expected_success_count = len(history_contents)
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey")
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["dbkey"] == expected_dbkey
[docs] @requires_celery
def test_bulk_dbkey_change_dataset_collection(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
expected_dbkey = "apiMel3"
# Change dbkey of all items
payload = {
"operation": "change_dbkey",
"params": {
"type": "change_dbkey",
"dbkey": expected_dbkey,
},
}
# All items should succeed
expected_success_count = len(collection_ids)
query = "q=history_content_type-eq&qv=dataset_collection"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey")
# now verify that datasets within collections have the expected dbkey
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["dbkey"] == expected_dbkey
[docs] def test_bulk_datatype_change(self):
with self.dataset_populator.test_history() as history_id:
num_datasets = 3
dataset_ids = []
for _ in range(num_datasets):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
dataset_ids.append(hda_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
for item in history_contents:
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
assert "metadata_column_names" not in item
self.dataset_populator.wait_for_history_jobs(history_id)
expected_datatype = "tabular"
# Change datatype of all datasets
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": expected_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count=num_datasets)
# Wait for celery tasks to finish
self.dataset_populator.wait_for_history(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
for item in history_contents:
assert item["extension"] == "tabular"
assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
assert "metadata_column_names" in item
[docs] def test_bulk_datatype_change_collection(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
original_collection_update_times = []
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
assert "metadata_column_names" not in item
if item["history_content_type"] == "dataset_collection":
original_collection_update_times.append(item["update_time"])
expected_datatype = "tabular"
# Change datatype of all datasets
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": expected_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(
history_id, payload, query="q=history_content_type-eq&qv=dataset_collection"
)
self._assert_bulk_success(bulk_operation_result, expected_success_count=len(collection_ids))
# Wait for celery tasks to finish
self.dataset_populator.wait_for_history(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
new_collection_update_times = []
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["extension"] == "tabular"
assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
assert "metadata_column_names" in item
if item["history_content_type"] == "dataset_collection":
new_collection_update_times.append(item["update_time"])
assert original_collection_update_times != new_collection_update_times
[docs] def test_bulk_datatype_change_should_skip_set_metadata_on_deferred_data(self):
with self.dataset_populator.test_history() as history_id:
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
)
assert details["state"] == "deferred"
assert details["extension"] == "bed"
assert details["data_type"] == "galaxy.datatypes.interval.Bed"
assert "metadata_columns" in details
assert "metadata_delimiter" in details
assert "metadata_comment_lines" in details
new_datatype = "txt"
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": new_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count=1)
history_contents = self._get_history_contents(history_id, query="?v=dev&view=detailed")
for item in history_contents:
assert item["state"] == "deferred"
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
# It should discard old metadata
assert "metadata_columns" not in item
assert "metadata_delimiter" not in item
assert "metadata_comment_lines" not in item
[docs] @skip_without_tool("cat_data_and_sleep")
def test_bulk_datatype_change_errors(self):
with self.dataset_populator.test_history() as history_id:
num_datasets = 3
dataset_ids = []
for _ in range(num_datasets):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
dataset_ids.append(hda_id)
self.dataset_populator.wait_for_history_jobs(history_id)
# Run tool on last dataset
input_hda_id = hda_id
inputs = {
"input1": {"src": "hda", "id": input_hda_id},
"sleep_time": 10,
}
run_response = self.dataset_populator.run_tool_raw(
"cat_data_and_sleep",
inputs,
history_id,
)
output_hda_id = run_response.json()["outputs"][0]["id"]
num_datasets += 1 # the new output dataset
dataset_ids_in_use = [input_hda_id, output_hda_id]
expected_datatype = "tabular"
# Change datatype of all datasets (4 in total)
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": expected_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
# First 2 datasets are ok
assert bulk_operation_result["success_count"] == 2
# Last 2 are in use (input and output) and must fail
assert len(bulk_operation_result["errors"]) == 2
for error in bulk_operation_result["errors"]:
assert error["item"]["id"] in dataset_ids_in_use
[docs] def test_bulk_datatype_change_auto(self):
with self.dataset_populator.test_history() as history_id:
tabular_contents = "1\t2\t3\na\tb\tc\n"
dataset_ids = [
self.dataset_populator.new_dataset(history_id, content=tabular_contents)["id"],
self.dataset_populator.new_dataset(history_id, content=tabular_contents)["id"],
]
self.dataset_populator.wait_for_history_jobs(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
for item in history_contents:
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
assert "metadata_delimiter" not in item
# Change datatype of all datasets to auto
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": "auto",
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count=len(dataset_ids))
# Wait for celery tasks to finish
self.dataset_populator.wait_for_history(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
# Should be detected as `tabular` and set the metadata correctly
for item in history_contents:
assert item["extension"] == "tabular"
assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
assert "metadata_delimiter" in item
assert item["metadata_delimiter"] == "\t"
[docs] def test_index_returns_expected_total_matches(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
self._test_index_total_matches(history_id, expected_total_matches=len(history_contents))
self._test_index_total_matches(
history_id,
search_query="&q=history_content_type-eq&qv=dataset_collection",
expected_total_matches=len(collection_ids),
)
self._test_index_total_matches(
history_id,
search_query="&q=history_content_type-eq&qv=dataset",
expected_total_matches=len(datasets_ids),
)
[docs] def test_index_with_stats_fails_with_non_orm_filters(self):
with self.dataset_populator.test_history() as history_id:
self._create_test_history_contents(history_id)
invalid_filter_keys_with_stats = ["data_type", "annotation"]
for filter_key in invalid_filter_keys_with_stats:
response = self._get_contents_with_stats(
history_id,
search_query=f"&q={filter_key}-contains&qv=anything",
)
self._assert_status_code_is(response, 400)
[docs] def test_index_with_stats_has_extra_serialization(self):
expected_extra_keys_in_collections = ["elements_datatypes"]
with self.dataset_populator.test_history() as history_id:
self._create_collection_in_history(history_id)
response = self._get_contents_with_stats(
history_id,
search_query="&q=history_content_type-eq&qv=dataset_collection",
)
self._assert_status_code_is(response, 200)
contents_with_stats = response.json()
assert contents_with_stats["contents"]
collection = contents_with_stats["contents"][0]
self._assert_has_keys(collection, *expected_extra_keys_in_collections)
def _get_contents_with_stats(self, history_id: str, search_query: str = ""):
headers = {"accept": "application/vnd.galaxy.history.contents.stats+json"}
search_response = self._get(f"histories/{history_id}/contents?v=dev{search_query}", headers=headers)
return search_response
def _test_index_total_matches(self, history_id: str, expected_total_matches: int, search_query: str = ""):
search_response = self._get_contents_with_stats(history_id, search_query)
self._assert_status_code_is(search_response, 200)
self._assert_total_matches_is(search_response.json(), expected_total_matches)
def _assert_total_matches_is(self, response, expected_total_matches: int):
assert response["stats"]
assert response["stats"]["total_matches"]
assert response["stats"]["total_matches"] == expected_total_matches
def _create_test_history_contents(self, history_id) -> Tuple[List[str], List[str], List[Any]]:
"""Creates 3 collections (pairs) and their corresponding datasets (6 in total)
Returns a tuple with the list of ids for the datasets and the collections and the
complete history contents
"""
num_expected_collections = 3
num_expected_datasets = num_expected_collections * 2
collection_ids = self._create_collection_in_history(history_id, num_expected_collections)
history_contents = self._get_history_contents(history_id)
datasets = filter(lambda item: item["history_content_type"] == "dataset", history_contents)
datasets_ids = list(map(lambda dataset: dataset["id"], datasets))
assert len(history_contents) == num_expected_datasets + num_expected_collections
assert len(datasets_ids) == num_expected_datasets
for dataset_id in datasets_ids:
self._put(f"histories/{history_id}/contents/{dataset_id}", {"visible": True}, json=True).json()
# All items are visible
history_contents = self._get_history_contents(history_id)
for item in history_contents:
assert item["visible"]
return datasets_ids, collection_ids, history_contents
def _create_collection_in_history(self, history_id, num_collections=1) -> List[str]:
collection_ids = []
for _ in range(num_collections):
collection_id = self.dataset_collection_populator.create_pair_in_history(
history_id=history_id, wait=True
).json()["outputs"][0]["id"]
collection_ids.append(collection_id)
return collection_ids
def _get_history_contents(self, history_id: str, query: str = ""):
return self._get(f"histories/{history_id}/contents{query}").json()
def _get_hidden_items_from_history_contents(self, history_contents) -> List[Any]:
return [content for content in history_contents if not content["visible"]]
def _get_collection_with_id_from_history_contents(self, history_contents, collection_id: str) -> Optional[Any]:
return self._get_item_with_id_from_history_contents(history_contents, "dataset_collection", collection_id)
def _get_dataset_with_id_from_history_contents(self, history_contents, dataset_id: str) -> Optional[Any]:
return self._get_item_with_id_from_history_contents(history_contents, "dataset", dataset_id)
def _get_item_with_id_from_history_contents(
self, history_contents, history_content_type: str, dataset_id: str
) -> Optional[Any]:
for item in history_contents:
if item["history_content_type"] == history_content_type and item["id"] == dataset_id:
return item
return None
def _apply_bulk_operation(self, history_id: str, payload, query: str = "", expected_status_code: int = 200):
original_history_update_time = self._get_history_update_time(history_id)
if query:
query = f"?{query}"
response = self._put(
f"histories/{history_id}/contents/bulk{query}",
data=payload,
json=True,
)
self._assert_status_code_is(response, expected_status_code)
result = response.json()
if "err_msg" in result or result.get("success_count", 0) == 0:
# We don't need to check the history update time if there was an error or no items were updated
return result
# After a successful operation, history update time should be updated so the changes can be detected by the frontend
after_bulk_operation_history_update_time = self._get_history_update_time(history_id)
assert after_bulk_operation_history_update_time > original_history_update_time
return result
def _assert_bulk_success(self, bulk_operation_result, expected_success_count: int):
assert bulk_operation_result["success_count"] == expected_success_count, bulk_operation_result
assert not bulk_operation_result["errors"]
def _get_history_update_time(self, history_id: str):
history = self._get(f"histories/{history_id}").json()
return history.get("update_time")