import urllib.parse
from typing import (
Any,
List,
Optional,
Tuple,
)
from galaxy_test.api._framework import ApiTestCase
from galaxy_test.base.decorators import (
requires_admin,
requires_celery,
requires_new_library,
requires_new_user,
)
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
LibraryPopulator,
skip_without_tool,
)
TEST_SOURCE_URI = "http://google.com/dataset.txt"
TEST_HASH_FUNCTION = "MD5"
TEST_HASH_VALUE = "moocowpretendthisisahas"
# TODO: Test anonymous access.
[docs]class TestHistoryContentsApi(ApiTestCase):
dataset_populator: DatasetPopulator
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
self.library_populator = LibraryPopulator(self.galaxy_interactor)
[docs] def test_index_hda_summary(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
contents_response = self._get(f"histories/{history_id}/contents")
hda_summary = self.__check_for_hda(contents_response, hda1)
assert "display_types" not in hda_summary # Quick summary, not full details
[docs] @requires_admin
def test_make_private_and_public(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
update_url = f"histories/{history_id}/contents/{hda1['id']}/permissions"
role_id = self.dataset_populator.user_private_role_id()
# Give manage permission to the user.
payload = {
"access": [],
"manage": [role_id],
}
update_response = self._update_permissions(update_url, payload, admin=True)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_can_access(history_id, hda1["id"])
# Then we restrict access.
payload = {
"action": "make_private",
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_cannot_access(history_id, hda1["id"])
# Then we restrict access.
payload = {
"action": "remove_restrictions",
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_can_access(history_id, hda1["id"])
[docs] @requires_new_user
def test_set_permissions_add_admin_history_contents(self, history_id):
self._verify_dataset_permissions(history_id, "history_contents")
[docs] @requires_new_user
def test_set_permissions_add_admin_datasets(self, history_id):
self._verify_dataset_permissions(history_id, "dataset")
def _verify_dataset_permissions(self, history_id: str, api_endpoint):
hda1 = self._wait_for_new_hda(history_id)
hda_id = hda1["id"]
if api_endpoint == "history_contents":
update_url = f"histories/{history_id}/contents/{hda_id}/permissions"
else:
update_url = f"datasets/{hda_id}/permissions"
role_id = self.dataset_populator.user_private_role_id()
payload = {
"access": [role_id],
"manage": [role_id],
}
# Other users cannot modify permissions.
with self._different_user():
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 403)
# First the details render for another user.
self._assert_other_user_can_access(history_id, hda_id)
# Then we restrict access.
update_response = self._update_permissions(update_url, payload, admin=True)
self._assert_status_code_is(update_response, 200)
# Finally the details don't render.
self._assert_other_user_cannot_access(history_id, hda_id)
# But they do for the original user.
contents_response = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert "name" in contents_response
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
payload = {
"access": [role_id],
"manage": [role_id],
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
self._assert_other_user_cannot_access(history_id, hda_id)
user_id = self.dataset_populator.user_id()
with self._different_user():
different_user_id = self.dataset_populator.user_id()
combined_user_role = self.dataset_populator.create_role(
[user_id, different_user_id], description="role for testing permissions"
)
payload = {
"access": [combined_user_role["id"]],
"manage": [role_id],
}
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 200)
# Now other user can see dataset again with access permission.
self._assert_other_user_can_access(history_id, hda_id)
# access doesn't imply management though...
with self._different_user():
update_response = self._update_permissions(update_url, payload)
self._assert_status_code_is(update_response, 403)
def _assert_other_user_cannot_access(self, history_id: str, history_content_id: str):
with self._different_user():
contents_response = self.dataset_populator.get_history_dataset_details_raw(
history_id=history_id, dataset_id=history_content_id
)
assert contents_response.status_code == 403
def _assert_other_user_can_access(self, history_id: str, history_content_id: str):
with self._different_user():
contents_response = self.dataset_populator.get_history_dataset_details_raw(
history_id=history_id, dataset_id=history_content_id
)
contents_response.raise_for_status()
assert "name" in contents_response.json()
[docs] def test_index_hda_all_details(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
contents_response = self._get(f"histories/{history_id}/contents?details=all")
hda_details = self.__check_for_hda(contents_response, hda1)
self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_hda_detail_by_id(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
contents_response = self._get(f"histories/{history_id}/contents?details={hda1['id']}")
hda_details = self.__check_for_hda(contents_response, hda1)
self.__assert_hda_has_full_details(hda_details)
[docs] def test_index_detail_parameter_error(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
# Invalid details should return 400
contents_response = self._get(f"histories/{history_id}/contents?v=dev&details= ")
self._assert_status_code_is(contents_response, 400)
# Empty IDs should return 400
contents_response = self._get(f"histories/{history_id}/contents?v=dev&details=,,{hda1['id']}")
self._assert_status_code_is(contents_response, 400)
# Invalid IDs should return 400
contents_response = self._get(f"histories/{history_id}/contents?v=dev&details={hda1['id']}, ,{hda1['id']}")
self._assert_status_code_is(contents_response, 400)
[docs] def test_view_and_keys_parameters_for_datasets(self, history_id):
created_hda = self.dataset_populator.new_dataset(history_id)
hda_id = created_hda["id"]
item_type = "dataset"
summary_view_keys = [
"id",
"name",
"history_id",
"hid",
"history_content_type",
"deleted",
"visible",
"type_id",
"type",
"create_time",
"update_time",
"url",
"tags",
"dataset_id",
"state",
"extension",
"purged",
"genome_build",
]
detailed_view_only_keys = [
"created_from_basename",
"api_type",
"accessible",
"misc_info",
"resubmitted",
"misc_blurb",
"hda_ldda",
"file_size",
"hashes",
"drs_id",
"validated_state_message",
"creating_job",
"file_ext",
"copied_from_ldda_id",
"peek",
"validated_state",
"permissions",
"uuid",
"model_class",
"sources",
"annotation",
"display_apps",
"display_types",
"file_name",
"download_url",
"rerunnable",
"data_type",
"meta_files",
]
detailed_view_keys = summary_view_keys + detailed_view_only_keys
# Expect summary view to be returned.
view = "summary"
keys = None
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *summary_view_keys)
for key in detailed_view_only_keys:
assert key not in item
# Expect "dynamic" metadata fields to NOT be returned.
metadata_keys = [key for key in item.keys() if key.startswith("metadata_")]
assert len(metadata_keys) == 0
# Expect detailed view to be returned.
view = "detailed"
keys = None
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *detailed_view_keys)
# Expect also "dynamic" metadata fields to be returned.
metadata_keys = [key for key in item.keys() if key.startswith("metadata_")]
assert len(metadata_keys) > 0
# Expect only specific keys to be returned.
view = None
keys = detailed_view_only_keys + ["id"]
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *keys)
assert len(item) == len(keys)
# Make sure the id is encoded in the response.
assert isinstance(item["id"], str)
assert item["id"] == hda_id
# Expect combined view and keys to be returned.
view = "summary"
keys = ["file_size"]
item = self._get_history_item_with_custom_serialization(history_id, hda_id, item_type, view, keys)
self._assert_has_keys(item, *summary_view_keys, *keys)
assert "peek" not in item
[docs] def test_view_and_keys_parameters_for_collections(self, history_id):
fetch_response = self.dataset_collection_populator.create_list_in_history(history_id, direct_upload=True).json()
created_dataset_collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = created_dataset_collection["id"]
item_type = "dataset_collection"
# Collections seems to have 3 different views, "collection", "element" and "element-reference".
# We cannot use the keys parameter with collections, so we will only test the view parameter.
collection_view_keys = [
"hid",
"history_id",
"history_content_type",
"visible",
"deleted",
"job_source_id",
"job_source_type",
"job_state_summary",
"create_time",
"update_time",
"id",
"name",
"collection_id",
"collection_type",
"populated",
"populated_state",
"populated_state_message",
"element_count",
"elements_datatypes",
"type",
"model_class",
"tags",
"url",
"contents_url",
]
element_view_only_keys = ["elements", "implicit_collection_jobs_id"]
element_view_keys = collection_view_keys + element_view_only_keys
# Expect summary view to be returned.
view = "collection"
item = self._get_history_item_with_custom_serialization(history_id, hdca_id, item_type, view)
self._assert_has_keys(item, *collection_view_keys)
for key in element_view_only_keys:
assert key not in item
# Expect detailed view to be returned.
view = "element"
item = self._get_history_item_with_custom_serialization(history_id, hdca_id, item_type, view)
self._assert_has_keys(item, *element_view_keys)
# The `elements` field should be populated for the "element" view.
assert len(item["elements"]) > 0
def _get_history_item_with_custom_serialization(
self,
history_id: str,
content_id: str,
item_type: str,
expected_view: Optional[str] = None,
expected_keys: Optional[List[str]] = None,
):
view = f"&view={expected_view}" if expected_view else ""
keys = f"&keys={','.join(expected_keys)}" if expected_keys else ""
response = self._get(f"histories/{history_id}/contents/{item_type}s/{content_id}?v=dev{view}{keys}")
self._assert_status_code_is_ok(response)
return response.json()
[docs] def test_show_hda(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
show_response = self.__show(history_id, hda1)
self._assert_status_code_is(show_response, 200)
self.__assert_matches_hda(hda1, show_response.json())
def _create_copy(self, history_id: str):
hda1 = self.dataset_populator.new_dataset(history_id)
create_data = dict(
source="hda",
content=hda1["id"],
)
second_history_id = self.dataset_populator.new_history()
assert self.__count_contents(second_history_id) == 0
create_response = self._post(f"histories/{second_history_id}/contents", create_data, json=True)
self._assert_status_code_is(create_response, 200)
return create_response.json()
[docs] def test_hda_copy(self, history_id):
response = self._create_copy(history_id)
assert self.__count_contents(response["history_id"]) == 1
[docs] def test_inheritance_chain(self, history_id):
response = self._create_copy(history_id)
inheritance_chain_response = self._get(f"datasets/{response['id']}/inheritance_chain")
self._assert_status_code_is_ok(inheritance_chain_response)
inheritance_chain = inheritance_chain_response.json()
assert len(inheritance_chain) == 1
[docs] @requires_new_library
def test_library_copy(self, history_id):
ld = self.library_populator.new_library_dataset("lda_test_library")
create_data = dict(
source="library",
content=ld["id"],
)
assert self.__count_contents(history_id) == 0
create_response = self._post(f"histories/{history_id}/contents", create_data, json=True)
self._assert_status_code_is(create_response, 200)
assert self.__count_contents(history_id) == 1
[docs] def test_update(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
assert str(hda1["deleted"]).lower() == "false"
update_response = self._update(history_id, hda1["id"], dict(deleted=True))
self._assert_status_code_is(update_response, 200)
show_response = self.__show(history_id, hda1)
assert str(show_response.json()["deleted"]).lower() == "true"
update_response = self._update(history_id, hda1["id"], dict(name="Updated Name"))
assert self.__show(history_id, hda1).json()["name"] == "Updated Name"
update_response = self._update(history_id, hda1["id"], dict(name="Updated Name"))
assert self.__show(history_id, hda1).json()["name"] == "Updated Name"
unicode_name = "ржевский сапоги"
update_response = self._update(history_id, hda1["id"], dict(name=unicode_name))
updated_hda = self.__show(history_id, hda1).json()
assert updated_hda["name"] == unicode_name, updated_hda
quoted_name = '"Mooo"'
update_response = self._update(history_id, hda1["id"], dict(name=quoted_name))
updated_hda = self.__show(history_id, hda1).json()
assert updated_hda["name"] == quoted_name, quoted_name
data = {
"dataset_id": hda1["id"],
"name": "moocow",
"dbkey": "?",
"annotation": None,
"info": "my info is",
"operation": "attributes",
}
update_response = self._set_edit_update(data)
# No key or anything supplied, expect a permission problem.
# A bit questionable but I think this is a 400 instead of a 403 so that
# we don't distinguish between this is a valid ID you don't have access to
# and this is an invalid ID.
assert update_response.status_code == 400, update_response.content
[docs] def test_update_batch(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
assert str(hda1["deleted"]).lower() == "false"
assert str(hda1["visible"]).lower() == "true"
# update deleted flag => true
payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=True)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is True
# update visibility flag => false
payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], visible=False)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is False
# update both flags
payload = dict(items=[{"history_content_type": "dataset", "id": hda1["id"]}], deleted=False, visible=True)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is False
assert objects[0]["visible"] is True
[docs] def test_update_batch_collections(self, history_id):
hdca = self._create_pair_collection(history_id)
assert hdca["deleted"] is False
assert hdca["visible"] is True
# update deleted flag => true
payload = dict(items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], deleted=True)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is True
# update visibility flag => false
payload = dict(items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], visible=False)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is True
assert objects[0]["visible"] is False
# update both flags
payload = dict(
items=[{"history_content_type": "dataset_collection", "id": hdca["id"]}], deleted=False, visible=True
)
update_response = self._update_batch(history_id, payload)
objects = update_response.json()
assert objects[0]["deleted"] is False
assert objects[0]["visible"] is True
[docs] def test_update_type_failures(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
update_response = self._update(history_id, hda1["id"], dict(deleted="not valid"))
self._assert_status_code_is(update_response, 400)
def _wait_for_new_hda(self, history_id: str):
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
return hda1
def _set_edit_update(self, data):
update_response = self._put(urllib.parse.urljoin(self.url, "dataset/set_edit"), data=data, json=True)
return update_response
def _update(self, history_id: str, item_id, data, admin=False):
update_response = self._put(f"histories/{history_id}/contents/{item_id}", data=data, json=True, admin=admin)
return update_response
def _update_permissions(self, url, data, admin=False):
update_response = self._put(url, data=data, json=True, admin=admin)
return update_response
def _update_batch(self, history_id: str, data):
update_response = self._put(f"histories/{history_id}/contents", data=data, json=True)
return update_response
[docs] def test_delete(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false"
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
assert delete_response.status_code < 300 # Something in the 200s :).
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "true"
[docs] def test_delete_anon(self):
with self._different_user(anon=True):
history_id = self._get(urllib.parse.urljoin(self.url, "history/current_history_json")).json()["id"]
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false"
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
assert delete_response.status_code < 300 # Something in the 200s :).
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "true"
[docs] def test_delete_permission_denied(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
with self._different_user(anon=True):
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
assert delete_response.status_code == 403
assert delete_response.json()["err_msg"] == "HistoryDatasetAssociation is not owned by user"
[docs] def test_purge(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.wait_for_history(history_id)
assert str(self.__show(history_id, hda1).json()["deleted"]).lower() == "false"
assert str(self.__show(history_id, hda1).json()["purged"]).lower() == "false"
data = {"purge": True}
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}", data=data, json=True)
assert delete_response.status_code < 300 # Something in the 200s :).
# Purging and deleting the dataset may or may not happen asynchronously.
# On 202 the request was accepted and purging will happen later.
if delete_response.status_code == 202:
self.dataset_populator.wait_for_purge(history_id, hda1["id"])
else:
assert self.__show(history_id, hda1).json()["deleted"]
assert self.__show(history_id, hda1).json()["purged"]
[docs] def test_dataset_collection_creation_on_contents(self, history_id):
payload = self.dataset_collection_populator.create_pair_payload(
history_id, type="dataset_collection", wait=True
)
endpoint = "tools/fetch"
self._check_pair_creation(history_id, endpoint, payload)
[docs] def test_dataset_collection_creation_on_typed_contents(self, history_id):
payload = self.dataset_collection_populator.create_pair_payload(history_id, wait=True)
endpoint = "tools/fetch"
self._check_pair_creation(history_id, endpoint, payload)
[docs] def test_dataset_collection_create_from_exisiting_datasets_with_new_tags(self):
with self.dataset_populator.test_history() as history_id:
hda_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")["id"]
hda2_id = self.dataset_populator.new_dataset(history_id, content="1 2 3")["id"]
update_response = self._update(history_id, hda2_id, dict(tags=["existing:tag"])).json()
assert update_response["tags"] == ["existing:tag"]
creation_payload = {
"collection_type": "list",
"history_id": history_id,
"element_identifiers": [
{"id": hda_id, "src": "hda", "name": "element_id1", "tags": ["my_new_tag"]},
{"id": hda2_id, "src": "hda", "name": "element_id2", "tags": ["another_new_tag"]},
],
"type": "dataset_collection",
"copy_elements": True,
}
r = self._post(f"histories/{history_id}/contents", creation_payload, json=True).json()
assert r["elements"][0]["object"]["id"] != hda_id, "HDA has not been copied"
assert len(r["elements"][0]["object"]["tags"]) == 1
assert r["elements"][0]["object"]["tags"][0] == "my_new_tag"
assert len(r["elements"][1]["object"]["tags"]) == 2, r["elements"][1]["object"]["tags"]
original_hda = self.dataset_populator.get_history_dataset_details(history_id=history_id, dataset_id=hda_id)
assert len(original_hda["tags"]) == 0, original_hda["tags"]
def _check_pair_creation(self, history_id: str, endpoint, payload):
pre_collection_count = self.__count_contents(history_id, type="dataset_collection")
pre_dataset_count = self.__count_contents(history_id, type="dataset")
pre_combined_count = self.__count_contents(history_id, type="dataset,dataset_collection")
dataset_collection_response = self._post(endpoint, payload, json=True)
dataset_collection = self.__check_create_collection_response(dataset_collection_response)
post_collection_count = self.__count_contents(history_id, type="dataset_collection")
post_dataset_count = self.__count_contents(history_id, type="dataset")
post_combined_count = self.__count_contents(history_id, type="dataset,dataset_collection")
# Test filtering types with index.
assert pre_collection_count == 0
assert post_collection_count == 1
assert post_combined_count == pre_dataset_count + 1
assert post_combined_count == pre_combined_count + 1
assert pre_dataset_count == post_dataset_count
# Test show dataset collection.
collection_url = f"histories/{history_id}/contents/dataset_collections/{dataset_collection['id']}"
show_response = self._get(collection_url)
self._assert_status_code_is(show_response, 200)
dataset_collection = show_response.json()
self._assert_has_keys(dataset_collection, "url", "name", "deleted")
assert not dataset_collection["deleted"]
delete_response = self._delete(collection_url)
self._assert_status_code_is(delete_response, 200)
show_response = self._get(collection_url)
dataset_collection = show_response.json()
assert dataset_collection["deleted"]
[docs] @skip_without_tool("collection_creates_list")
def test_jobs_summary_simple_hdca(self, history_id):
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["a\nb\nc\nd", "e\nf\ng\nh"]
).json()
hdca_id = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)["id"]
run = self.dataset_populator.run_collection_creates_list(history_id, hdca_id)
collections = run["output_collections"]
collection = collections[0]
jobs_summary_url = f"histories/{history_id}/contents/dataset_collections/{collection['id']}/jobs_summary"
jobs_summary_response = self._get(jobs_summary_url)
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
self._assert_has_keys(jobs_summary, "populated_state", "states")
[docs] @skip_without_tool("cat1")
def test_jobs_summary_implicit_hdca(self, history_id):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["123", "456"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
run = self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id)
self.dataset_populator.wait_for_history_jobs(history_id)
collections = run["implicit_collections"]
collection = collections[0]
jobs_summary_url = f"histories/{history_id}/contents/dataset_collections/{collection['id']}/jobs_summary"
jobs_summary_response = self._get(jobs_summary_url)
self._assert_status_code_is(jobs_summary_response, 200)
jobs_summary = jobs_summary_response.json()
self._assert_has_keys(jobs_summary, "populated_state", "states")
states = jobs_summary["states"]
assert states.get("ok") == 2, states
[docs] def test_dataset_collection_hide_originals(self, history_id):
payload = self.dataset_collection_populator.create_pair_payload(
history_id, type="dataset_collection", direct_upload=False, copy_elements=False
)
payload["hide_source_items"] = True
dataset_collection_response = self._post(f"histories/{history_id}/contents", payload, json=True)
self.__check_create_collection_response(dataset_collection_response)
contents_response = self._get(f"histories/{history_id}/contents")
datasets = [d for d in contents_response.json() if d["history_content_type"] == "dataset"]
# Assert two datasets in source were hidden.
assert len(datasets) == 2
assert not datasets[0]["visible"]
assert not datasets[1]["visible"]
[docs] def test_update_dataset_collection(self, history_id):
hdca = self._create_pair_collection(history_id)
body = dict(name="newnameforpair")
update_response = self._put(
f"histories/{history_id}/contents/dataset_collections/{hdca['id']}", data=body, json=True
)
self._assert_status_code_is(update_response, 200)
show_response = self.__show(history_id, hdca)
assert str(show_response.json()["name"]) == "newnameforpair"
[docs] def test_update_batch_dataset_collection(self, history_id):
hdca = self._create_pair_collection(history_id)
body = {"items": [{"history_content_type": "dataset_collection", "id": hdca["id"]}], "name": "newnameforpair"}
update_response = self._put(f"histories/{history_id}/contents", data=body, json=True)
self._assert_status_code_is(update_response, 200)
show_response = self.__show(history_id, hdca)
assert str(show_response.json()["name"]) == "newnameforpair"
def _create_pair_collection(self, history_id: str):
payload = self.dataset_collection_populator.create_pair_payload(history_id, type="dataset_collection")
dataset_collection_response = self._post("tools/fetch", payload, json=True)
self._assert_status_code_is(dataset_collection_response, 200)
hdca = dataset_collection_response.json()["output_collections"][0]
return hdca
[docs] def test_hdca_copy(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()
hdca_id = hdca["outputs"][0]["id"]
second_history_id = self.dataset_populator.new_history()
create_data = dict(
source="hdca",
content=hdca_id,
)
assert len(self._get(f"histories/{second_history_id}/contents/dataset_collections").json()) == 0
create_response = self._post(
f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True
)
self.__check_create_collection_response(create_response)
contents = self._get(f"histories/{second_history_id}/contents/dataset_collections").json()
assert len(contents) == 1
new_forward, _ = self.__get_paired_response_elements(history_id, contents[0])
self._assert_has_keys(new_forward, "history_id")
assert new_forward["history_id"] == second_history_id
[docs] def test_hdca_copy_with_new_dbkey(self, history_id):
fetch_response = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()
hdca = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
hdca_id = hdca["id"]
assert hdca["elements"][0]["object"]["metadata_dbkey"] == "?"
assert hdca["elements"][0]["object"]["genome_build"] == "?"
create_data = {"source": "hdca", "content": hdca_id, "dbkey": "hg19"}
create_response = self._post(f"histories/{history_id}/contents/dataset_collections", create_data, json=True)
collection = self.__check_create_collection_response(create_response)
new_forward = collection["elements"][0]["object"]
assert new_forward["metadata_dbkey"] == "hg19"
assert new_forward["genome_build"] == "hg19"
[docs] def test_hdca_copy_and_elements(self, history_id):
hdca = self.dataset_collection_populator.create_pair_in_history(history_id, wait=True).json()["outputs"][0]
hdca_id = hdca["id"]
second_history_id = self.dataset_populator.new_history()
create_data = dict(
source="hdca",
content=hdca_id,
copy_elements=True,
)
assert len(self._get(f"histories/{second_history_id}/contents/dataset_collections").json()) == 0
create_response = self._post(
f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True
)
self.__check_create_collection_response(create_response)
contents = self._get(f"histories/{second_history_id}/contents/dataset_collections").json()
assert len(contents) == 1
new_forward, _ = self.__get_paired_response_elements(history_id, contents[0])
self._assert_has_keys(new_forward, "history_id")
assert new_forward["history_id"] == second_history_id
def __get_paired_response_elements(self, history_id: str, contents):
hdca = self.__show(history_id, contents).json()
self._assert_has_keys(hdca, "name", "deleted", "visible", "elements")
elements = hdca["elements"]
assert len(elements) == 2
element0 = elements[0]
element1 = elements[1]
self._assert_has_keys(element0, "object")
self._assert_has_keys(element1, "object")
return element0["object"], element1["object"]
[docs] @requires_new_library
def test_hdca_from_library_datasets(self, history_id):
ld = self.library_populator.new_library_dataset("el1")
ldda_id = ld["ldda_id"]
element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}]
create_data = dict(
history_id=history_id,
type="dataset_collection",
name="Test From Library",
element_identifiers=element_identifiers,
collection_type="list",
)
create_response = self._post(f"histories/{history_id}/contents/dataset_collections", create_data, json=True)
hdca = self.__check_create_collection_response(create_response)
elements = hdca["elements"]
assert len(elements) == 1
hda = elements[0]["object"]
assert hda["hda_ldda"] == "hda"
assert hda["history_content_type"] == "dataset"
assert hda["copied_from_ldda_id"] == ldda_id
assert hda["history_id"] == history_id
[docs] @requires_new_library
def test_hdca_from_inaccessible_library_datasets(self, history_id):
library, library_dataset = self.library_populator.new_library_dataset_in_private_library(
"HDCACreateInaccesibleLibrary"
)
ldda_id = library_dataset["id"]
element_identifiers = [{"name": "el1", "src": "ldda", "id": ldda_id}]
create_data = dict(
history_id=history_id,
type="dataset_collection",
name="Test From Library",
element_identifiers=element_identifiers,
collection_type="list",
)
with self._different_user():
second_history_id = self.dataset_populator.new_history()
create_response = self._post(
f"histories/{second_history_id}/contents/dataset_collections", create_data, json=True
)
self._assert_status_code_is(create_response, 403)
def __check_create_collection_response(self, response):
self._assert_status_code_is(response, 200)
dataset_collection = response.json()
if "output_collections" in dataset_collection:
dataset_collection = dataset_collection["output_collections"][0]
self._assert_has_keys(dataset_collection, "url", "name", "deleted", "visible", "elements")
return dataset_collection
def __show(self, history_id, contents):
show_response = self._get(
f"histories/{history_id}/contents/{contents['history_content_type']}s/{contents['id']}"
)
return show_response
def __count_contents(self, history_id: str, **kwds):
contents_response = self._get(f"histories/{history_id}/contents", kwds)
return len(contents_response.json())
def __assert_hda_has_full_details(self, hda_details):
self._assert_has_keys(hda_details, "display_types", "display_apps")
def __check_for_hda(self, contents_response, hda):
self._assert_status_code_is(contents_response, 200)
contents = contents_response.json()
assert len(contents) == 1
hda_summary = contents[0]
self.__assert_matches_hda(hda, hda_summary)
return hda_summary
def __assert_matches_hda(self, input_hda, query_hda):
self._assert_has_keys(query_hda, "id", "name")
assert input_hda["name"] == query_hda["name"]
assert input_hda["id"] == query_hda["id"]
[docs] def test_job_state_summary_field(self, history_id):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id,
contents=["123", "456"],
)
self._assert_status_code_is(create_response, 200)
contents_response = self._get(f"histories/{history_id}/contents?v=dev&keys=job_state_summary&view=summary")
self._assert_status_code_is(contents_response, 200)
contents = contents_response.json()
for c in contents:
if c["history_content_type"] == "dataset_collection":
assert isinstance(c, dict)
assert "job_state_summary" in c
assert isinstance(c["job_state_summary"], dict)
[docs] def test_index_filter_by_type(self, history_id):
self.dataset_populator.new_dataset(history_id)
self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True)
contents_response = self._get(f"histories/{history_id}/contents").json()
num_items = len(contents_response)
expected_num_collections = 1
expected_num_datasets = num_items - expected_num_collections
contents_response = self._get(f"histories/{history_id}/contents?types=dataset").json()
assert len(contents_response) == expected_num_datasets
contents_response = self._get(f"histories/{history_id}/contents?types=dataset_collection").json()
assert len(contents_response) == expected_num_collections
contents_response = self._get(f"histories/{history_id}/contents?types=dataset,dataset_collection").json()
assert len(contents_response) == expected_num_datasets + expected_num_collections
contents_response = self._get(f"histories/{history_id}/contents?types=dataset&types=dataset_collection").json()
assert len(contents_response) == expected_num_datasets + expected_num_collections
[docs] def test_index_filter_by_name_ignores_case(self, history_id):
self.dataset_populator.new_dataset(history_id, name="AC")
self.dataset_populator.new_dataset(history_id, name="ac")
self.dataset_populator.new_dataset(history_id, name="Bc")
contains_text = "a"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 2
contains_text = "b"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 1
contains_text = "c"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 3
contains_text = "%"
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&q=name-contains&qv={contains_text}"
).json()
assert len(contents_response) == 0
[docs] @skip_without_tool("cat_data_and_sleep")
def test_index_filter_by_related_items(self, history_id):
# initialise history with 2 datasets
input_hda_id = self.dataset_populator.new_dataset(history_id)["id"]
unrelated_hid = self.dataset_populator.new_dataset(history_id)["hid"]
# Run tool on first dataset to get 3rd, related dataset
inputs = {
"input1": {"src": "hda", "id": input_hda_id},
"sleep_time": 0,
}
run_response = self.dataset_populator.run_tool_raw(
"cat_data_and_sleep",
inputs,
history_id,
)
related_hid = run_response.json()["outputs"][0]["hid"]
# Test q = related-eq, for related items
contents_response = self._get(f"histories/{history_id}/contents?v=dev&q=related-eq&qv={related_hid}").json()
assert len(contents_response) == 2
# Test q = related, for unrelated item
contents_response = self._get(f"histories/{history_id}/contents?v=dev&q=related&qv={unrelated_hid}").json()
assert len(contents_response) == 1
# Test error case: qv is string
related_qv = "one"
contents_response = self._get(f"histories/{history_id}/contents?v=dev&q=related-eq&qv={related_qv}")
assert contents_response.status_code == 400
assert contents_response.json()["err_msg"] == "unparsable value for related filter"
[docs] def test_elements_datatypes_field(self, history_id):
collection_name = "homogeneous"
expected_datatypes = ["txt"]
elements = [ # List with all elements of txt datatype (homogeneous)
{"name": "test1", "src": "pasted", "paste_content": "abc", "ext": "txt"},
{"name": "test2", "src": "pasted", "paste_content": "abc", "ext": "txt"},
]
self._upload_collection_list_with_elements(history_id, collection_name, elements)
self._assert_collection_has_expected_elements_datatypes(history_id, collection_name, expected_datatypes)
collection_name = "heterogeneous"
expected_datatypes = ["txt", "tabular"]
elements = [ # List with txt and tabular datatype (heterogeneous)
{"name": "test2", "src": "pasted", "paste_content": "abc", "ext": "txt"},
{"name": "test3", "src": "pasted", "paste_content": "a,b,c\n", "ext": "tabular"},
]
self._upload_collection_list_with_elements(history_id, collection_name, elements)
self._assert_collection_has_expected_elements_datatypes(history_id, collection_name, expected_datatypes)
def _upload_collection_list_with_elements(self, history_id: str, collection_name: str, elements: List[Any]):
create_homogeneous_response = self.dataset_collection_populator.upload_collection(
history_id, "list", elements=elements, name=collection_name, wait=True
)
self._assert_status_code_is_ok(create_homogeneous_response)
def _assert_collection_has_expected_elements_datatypes(self, history_id, collection_name, expected_datatypes):
contents_response = self._get(
f"histories/{history_id}/contents?v=dev&view=detailed&q=name-eq&qv={collection_name}"
)
self._assert_status_code_is(contents_response, 200)
collection = contents_response.json()[0]
assert sorted(collection["elements_datatypes"]) == sorted(expected_datatypes)
[docs] @skip_without_tool("cat1")
def test_cannot_run_tools_on_immutable_histories(self, history_id):
create_response = self.dataset_collection_populator.create_pair_in_history(
history_id, contents=["123", "456"], wait=True
)
hdca_id = create_response.json()["outputs"][0]["id"]
inputs = {
"input1": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
}
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
with self.assertRaisesRegex(AssertionError, "History is immutable"):
self.dataset_populator.run_tool("cat1", inputs=inputs, history_id=history_id)
[docs] def test_cannot_update_dataset_collection_on_immutable_history(self, history_id):
hdca = self._create_pair_collection(history_id)
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
body = dict(name="newnameforpair")
update_response = self._put(
f"histories/{history_id}/contents/dataset_collections/{hdca['id']}", data=body, json=True
)
self._assert_status_code_is(update_response, 403)
assert update_response.json()["err_msg"] == "History is immutable"
[docs] def test_cannot_update_dataset_on_immutable_history(self, history_id):
hda1 = self._wait_for_new_hda(history_id)
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
update_response = self._update(history_id, hda1["id"], dict(name="Updated Name"))
self._assert_status_code_is(update_response, 403)
assert update_response.json()["err_msg"] == "History is immutable"
[docs]class TestHistoryContentsApiBulkOperation(ApiTestCase):
"""
Test the `/api/histories/{history_id}/contents/bulk` endpoint and the new
`count` special view for `/api/histories/{history_id}/contents?v=dev`
"""
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_explicit_items_selection(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Hide 2 collections and 3 datasets, 5 in total
payload = {
"operation": "hide",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
{
"id": collection_ids[0],
"history_content_type": "dataset_collection",
},
{
"id": datasets_ids[1],
"history_content_type": "dataset",
},
{
"id": collection_ids[1],
"history_content_type": "dataset_collection",
},
{
"id": datasets_ids[2],
"history_content_type": "dataset",
},
],
}
expected_hidden_item_ids = [item["id"] for item in payload["items"]]
expected_hidden_item_count = len(expected_hidden_item_ids)
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, expected_hidden_item_count)
assert len(hidden_items) == expected_hidden_item_count
for item in hidden_items:
assert item["id"] in expected_hidden_item_ids
[docs] def test_dynamic_query_selection(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Hide all collections using query
payload = {"operation": "hide"}
query = "q=history_content_type-eq&qv=dataset_collection"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, len(collection_ids))
assert len(hidden_items) == len(collection_ids)
for item in hidden_items:
assert item["id"] in collection_ids
[docs] def test_bulk_operations(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Hide all datasets using query
payload = {"operation": "hide"}
query = "q=history_content_type-eq&qv=dataset"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, len(datasets_ids))
assert len(hidden_items) == len(datasets_ids)
# Unhide datasets_ids[0] and datasets_ids[3]
payload = {
"operation": "unhide",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
{
"id": datasets_ids[3],
"history_content_type": "dataset",
},
],
}
expected_unhidden_count = len(payload["items"])
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, expected_unhidden_count)
for item in history_contents:
if item["id"] in [datasets_ids[0], datasets_ids[3]]:
assert item["visible"] is True
# Delete all hidden datasets (total dataset - 2 previously unhidden)
expected_hidden_item_count = len(datasets_ids) - expected_unhidden_count
payload = {"operation": "delete"}
query = "q=history_content_type-eq&qv=dataset&q=visible&qv=False"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
hidden_items = self._get_hidden_items_from_history_contents(history_contents)
self._assert_bulk_success(bulk_operation_result, expected_hidden_item_count)
for item in hidden_items:
assert item["deleted"] is True
# Undelete all items in history
payload = {
"operation": "undelete",
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, len(history_contents))
for item in history_contents:
assert item["deleted"] is False
# Purge datasets_ids[0] and collection_ids[0]
payload = {
"operation": "purge",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
{
"id": collection_ids[0],
"history_content_type": "dataset_collection",
},
],
}
expected_purged_count = len(payload["items"])
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, expected_purged_count)
purged_dataset = self._get_dataset_with_id_from_history_contents(history_contents, datasets_ids[0])
self.dataset_populator.wait_for_purge(history_id=history_id, content_id=purged_dataset["id"])
assert purged_dataset["deleted"] is True
purged_collection = self._get_collection_with_id_from_history_contents(history_contents, collection_ids[0])
# collections don't have a `purged` attribute but they should be marked deleted on purge
assert purged_collection["deleted"] is True
# Un-deleting a purged dataset should not have any effect and raise an error
payload = {
"operation": "undelete",
"items": [
{
"id": datasets_ids[0],
"history_content_type": "dataset",
},
],
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
history_contents = self._get_history_contents(history_id)
assert bulk_operation_result["success_count"] == 0
assert len(bulk_operation_result["errors"]) == 1
error = bulk_operation_result["errors"][0]
assert error["item"]["id"] == datasets_ids[0]
purged_dataset = self._get_dataset_with_id_from_history_contents(history_contents, datasets_ids[0])
assert purged_dataset["deleted"] is True
assert purged_dataset["purged"] is True
[docs] def test_purging_collection_should_purge_contents(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
# Purge all collections
payload = {"operation": "purge"}
query = "q=history_content_type-eq&qv=dataset_collection"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
history_contents = self._get_history_contents(history_id)
self._assert_bulk_success(bulk_operation_result, len(collection_ids))
for item in history_contents:
assert item["deleted"] is True
if item["history_content_type"] == "dataset":
self.dataset_populator.wait_for_purge(history_id=history_id, content_id=item["id"])
[docs] def test_deleting_collection_should_delete_contents(self):
with self.dataset_populator.test_history() as history_id:
num_expected_datasets = 2
# Create collection and datasets
collection_ids = self._create_collection_in_history(history_id, num_collections=1)
original_collection_id = collection_ids[0]
# Check datasets are hidden and not deleted
history_contents = self._get_history_contents(history_id)
datasets = list(filter(lambda item: item["history_content_type"] == "dataset", history_contents))
assert len(datasets) == num_expected_datasets
for dataset in datasets:
assert dataset["deleted"] is False
assert dataset["visible"] is False
# Delete the collection
payload = {
"operation": "delete",
"items": [
{
"id": original_collection_id,
"history_content_type": "dataset_collection",
},
],
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, 1)
# We expect the original collection and the datasets to be deleted
num_expected_history_contents = num_expected_datasets + 1
history_contents = self._get_history_contents(history_id)
assert len(history_contents) == num_expected_history_contents
for item in history_contents:
assert item["deleted"] is True
[docs] @requires_new_user
def test_only_owner_can_apply_bulk_operations(self):
with self.dataset_populator.test_history() as history_id:
self._create_test_history_contents(history_id)
with self._different_user():
payload = {"operation": "hide"}
bulk_operation_result = self._apply_bulk_operation(history_id, payload, expected_status_code=403)
assert bulk_operation_result["err_msg"]
[docs] def test_bulk_tag_changes(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
expected_tags = ["cool_tag", "tag01"]
# Add same tag to all items
payload = {
"operation": "add_tags",
"params": {
"type": "add_tags",
"tags": expected_tags,
},
}
expected_success_count = len(history_contents)
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id)
for item in history_contents:
for expected_tag in expected_tags:
assert expected_tag in item["tags"]
# Remove tag from all collections
payload = {
"operation": "remove_tags",
"params": {
"type": "remove_tags",
"tags": expected_tags,
},
}
query = "q=history_content_type-eq&qv=dataset_collection"
expected_success_count = len(collection_ids)
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id)
for item in history_contents:
if item["history_content_type"] == "dataset_collection":
assert not item["tags"]
else:
for expected_tag in expected_tags:
assert expected_tag in item["tags"]
[docs] @requires_celery
def test_bulk_dbkey_change(self):
with self.dataset_populator.test_history() as history_id:
_, _, history_contents = self._create_test_history_contents(history_id)
expected_dbkey = "apiMel3"
# Change dbkey of all items
payload = {
"operation": "change_dbkey",
"params": {
"type": "change_dbkey",
"dbkey": expected_dbkey,
},
}
# All items should succeed
expected_success_count = len(history_contents)
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey")
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["dbkey"] == expected_dbkey
[docs] @requires_celery
def test_bulk_dbkey_change_dataset_collection(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
expected_dbkey = "apiMel3"
# Change dbkey of all items
payload = {
"operation": "change_dbkey",
"params": {
"type": "change_dbkey",
"dbkey": expected_dbkey,
},
}
# All items should succeed
expected_success_count = len(collection_ids)
query = "q=history_content_type-eq&qv=dataset_collection"
bulk_operation_result = self._apply_bulk_operation(history_id, payload, query)
self._assert_bulk_success(bulk_operation_result, expected_success_count)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=dbkey")
# now verify that datasets within collections have the expected dbkey
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["dbkey"] == expected_dbkey
[docs] def test_bulk_datatype_change(self):
with self.dataset_populator.test_history() as history_id:
num_datasets = 3
dataset_ids = []
for _ in range(num_datasets):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
dataset_ids.append(hda_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
for item in history_contents:
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
assert "metadata_column_names" not in item
self.dataset_populator.wait_for_history_jobs(history_id)
expected_datatype = "tabular"
# Change datatype of all datasets
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": expected_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count=num_datasets)
# Wait for celery tasks to finish
self.dataset_populator.wait_for_history(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
for item in history_contents:
assert item["extension"] == "tabular"
assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
assert "metadata_column_names" in item
[docs] def test_bulk_datatype_change_collection(self):
with self.dataset_populator.test_history() as history_id:
_, collection_ids, history_contents = self._create_test_history_contents(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
original_collection_update_times = []
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
assert "metadata_column_names" not in item
if item["history_content_type"] == "dataset_collection":
original_collection_update_times.append(item["update_time"])
expected_datatype = "tabular"
# Change datatype of all datasets
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": expected_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(
history_id, payload, query="q=history_content_type-eq&qv=dataset_collection"
)
self._assert_bulk_success(bulk_operation_result, expected_success_count=len(collection_ids))
# Wait for celery tasks to finish
self.dataset_populator.wait_for_history(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
new_collection_update_times = []
for item in history_contents:
if item["history_content_type"] == "dataset":
assert item["extension"] == "tabular"
assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
assert "metadata_column_names" in item
if item["history_content_type"] == "dataset_collection":
new_collection_update_times.append(item["update_time"])
assert original_collection_update_times != new_collection_update_times
[docs] def test_bulk_datatype_change_should_skip_set_metadata_on_deferred_data(self):
with self.dataset_populator.test_history() as history_id:
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
)
assert details["state"] == "deferred"
assert details["extension"] == "bed"
assert details["data_type"] == "galaxy.datatypes.interval.Bed"
assert "metadata_columns" in details
assert "metadata_delimiter" in details
assert "metadata_comment_lines" in details
new_datatype = "txt"
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": new_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count=1)
history_contents = self._get_history_contents(history_id, query="?v=dev&view=detailed")
for item in history_contents:
assert item["state"] == "deferred"
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
# It should discard old metadata
assert "metadata_columns" not in item
assert "metadata_delimiter" not in item
assert "metadata_comment_lines" not in item
[docs] @skip_without_tool("cat_data_and_sleep")
def test_bulk_datatype_change_errors(self):
with self.dataset_populator.test_history() as history_id:
num_datasets = 3
dataset_ids = []
for _ in range(num_datasets):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
dataset_ids.append(hda_id)
self.dataset_populator.wait_for_history_jobs(history_id)
# Run tool on last dataset
input_hda_id = hda_id
inputs = {
"input1": {"src": "hda", "id": input_hda_id},
"sleep_time": 10,
}
run_response = self.dataset_populator.run_tool_raw(
"cat_data_and_sleep",
inputs,
history_id,
)
output_hda_id = run_response.json()["outputs"][0]["id"]
num_datasets += 1 # the new output dataset
dataset_ids_in_use = [input_hda_id, output_hda_id]
expected_datatype = "tabular"
# Change datatype of all datasets (4 in total)
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": expected_datatype,
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
# First 2 datasets are ok
assert bulk_operation_result["success_count"] == 2
# Last 2 are in use (input and output) and must fail
assert len(bulk_operation_result["errors"]) == 2
for error in bulk_operation_result["errors"]:
assert error["item"]["id"] in dataset_ids_in_use
[docs] def test_bulk_datatype_change_auto(self):
with self.dataset_populator.test_history() as history_id:
tabular_contents = "1\t2\t3\na\tb\tc\n"
dataset_ids = [
self.dataset_populator.new_dataset(history_id, content=tabular_contents)["id"],
self.dataset_populator.new_dataset(history_id, content=tabular_contents)["id"],
]
self.dataset_populator.wait_for_history_jobs(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
for item in history_contents:
assert item["extension"] == "txt"
assert item["data_type"] == "galaxy.datatypes.data.Text"
assert "metadata_delimiter" not in item
# Change datatype of all datasets to auto
payload = {
"operation": "change_datatype",
"params": {
"type": "change_datatype",
"datatype": "auto",
},
}
bulk_operation_result = self._apply_bulk_operation(history_id, payload)
self._assert_bulk_success(bulk_operation_result, expected_success_count=len(dataset_ids))
# Wait for celery tasks to finish
self.dataset_populator.wait_for_history(history_id)
history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata")
# Should be detected as `tabular` and set the metadata correctly
for item in history_contents:
assert item["extension"] == "tabular"
assert item["data_type"] == "galaxy.datatypes.tabular.Tabular"
assert "metadata_delimiter" in item
assert item["metadata_delimiter"] == "\t"
[docs] def test_index_returns_expected_total_matches(self):
with self.dataset_populator.test_history() as history_id:
datasets_ids, collection_ids, history_contents = self._create_test_history_contents(history_id)
self._test_index_total_matches(history_id, expected_total_matches=len(history_contents))
self._test_index_total_matches(
history_id,
search_query="&q=history_content_type-eq&qv=dataset_collection",
expected_total_matches=len(collection_ids),
)
self._test_index_total_matches(
history_id,
search_query="&q=history_content_type-eq&qv=dataset",
expected_total_matches=len(datasets_ids),
)
[docs] def test_index_with_stats_fails_with_non_orm_filters(self):
with self.dataset_populator.test_history() as history_id:
self._create_test_history_contents(history_id)
invalid_filter_keys_with_stats = ["data_type", "annotation"]
for filter_key in invalid_filter_keys_with_stats:
response = self._get_contents_with_stats(
history_id,
search_query=f"&q={filter_key}-contains&qv=anything",
)
self._assert_status_code_is(response, 400)
def _get_contents_with_stats(self, history_id: str, search_query: str = ""):
headers = {"accept": "application/vnd.galaxy.history.contents.stats+json"}
search_response = self._get(f"histories/{history_id}/contents?v=dev{search_query}", headers=headers)
return search_response
def _test_index_total_matches(self, history_id: str, expected_total_matches: int, search_query: str = ""):
search_response = self._get_contents_with_stats(history_id, search_query)
self._assert_status_code_is(search_response, 200)
self._assert_total_matches_is(search_response.json(), expected_total_matches)
def _assert_total_matches_is(self, response, expected_total_matches: int):
assert response["stats"]
assert response["stats"]["total_matches"]
assert response["stats"]["total_matches"] == expected_total_matches
def _create_test_history_contents(self, history_id) -> Tuple[List[str], List[str], List[Any]]:
"""Creates 3 collections (pairs) and their corresponding datasets (6 in total)
Returns a tuple with the list of ids for the datasets and the collections and the
complete history contents
"""
num_expected_collections = 3
num_expected_datasets = num_expected_collections * 2
collection_ids = self._create_collection_in_history(history_id, num_expected_collections)
history_contents = self._get_history_contents(history_id)
datasets = filter(lambda item: item["history_content_type"] == "dataset", history_contents)
datasets_ids = [dataset["id"] for dataset in datasets]
assert len(history_contents) == num_expected_datasets + num_expected_collections
assert len(datasets_ids) == num_expected_datasets
for dataset_id in datasets_ids:
self._put(f"histories/{history_id}/contents/{dataset_id}", {"visible": True}, json=True).json()
# All items are visible
history_contents = self._get_history_contents(history_id)
for item in history_contents:
assert item["visible"]
return datasets_ids, collection_ids, history_contents
def _create_collection_in_history(self, history_id, num_collections=1) -> List[str]:
collection_ids = []
for _ in range(num_collections):
collection_id = self.dataset_collection_populator.create_pair_in_history(
history_id=history_id, wait=True
).json()["outputs"][0]["id"]
collection_ids.append(collection_id)
return collection_ids
def _get_history_contents(self, history_id: str, query: str = ""):
return self._get(f"histories/{history_id}/contents{query}").json()
def _get_hidden_items_from_history_contents(self, history_contents) -> List[Any]:
return [content for content in history_contents if not content["visible"]]
def _get_collection_with_id_from_history_contents(self, history_contents, collection_id: str) -> Optional[Any]:
return self._get_item_with_id_from_history_contents(history_contents, "dataset_collection", collection_id)
def _get_dataset_with_id_from_history_contents(self, history_contents, dataset_id: str) -> Optional[Any]:
return self._get_item_with_id_from_history_contents(history_contents, "dataset", dataset_id)
def _get_item_with_id_from_history_contents(
self, history_contents, history_content_type: str, dataset_id: str
) -> Optional[Any]:
for item in history_contents:
if item["history_content_type"] == history_content_type and item["id"] == dataset_id:
return item
return None
def _apply_bulk_operation(self, history_id: str, payload, query: str = "", expected_status_code: int = 200):
original_history_update_time = self._get_history_update_time(history_id)
if query:
query = f"?{query}"
response = self._put(
f"histories/{history_id}/contents/bulk{query}",
data=payload,
json=True,
)
self._assert_status_code_is(response, expected_status_code)
result = response.json()
if "err_msg" in result or result.get("success_count", 0) == 0:
# We don't need to check the history update time if there was an error or no items were updated
return result
# After a successful operation, history update time should be updated so the changes can be detected by the frontend
after_bulk_operation_history_update_time = self._get_history_update_time(history_id)
assert after_bulk_operation_history_update_time > original_history_update_time
return result
def _assert_bulk_success(self, bulk_operation_result, expected_success_count: int):
assert bulk_operation_result["success_count"] == expected_success_count, bulk_operation_result
assert not bulk_operation_result["errors"]
def _get_history_update_time(self, history_id: str):
history = self._get(f"histories/{history_id}").json()
return history.get("update_time")