Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_datasets
import textwrap
import urllib
import zipfile
from io import BytesIO
from typing import (
Dict,
List,
)
from galaxy.model.unittest_utils.store_fixtures import (
deferred_hda_model_store_dict,
one_hda_model_store_dict,
TEST_SOURCE_URI,
)
from galaxy.tool_util.verify.test_data import TestDataResolver
from galaxy.util.unittest_utils import skip_if_github_down
from galaxy_test.base.api_asserts import assert_has_keys
from galaxy_test.base.decorators import (
requires_admin,
requires_new_history,
)
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
skip_without_datatype,
skip_without_tool,
)
from ._framework import ApiTestCase
COMPOSITE_DATA_FETCH_REQUEST_1 = {
"src": "composite",
"ext": "velvet",
"composite": {
"items": [
{"src": "pasted", "paste_content": "sequences content"},
{"src": "pasted", "paste_content": "roadmaps content"},
{"src": "pasted", "paste_content": "log content"},
]
},
}
[docs]class TestDatasetsApi(ApiTestCase):
dataset_populator: DatasetPopulator
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_index(self):
index_response = self._get("datasets")
self._assert_status_code_is(index_response, 200)
[docs] def test_index_using_keys(self, history_id):
expected_keys = "id"
self.dataset_populator.new_dataset(history_id)
index_response = self._get(f"datasets?keys={expected_keys}")
self._assert_status_code_is(index_response, 200)
datasets = index_response.json()
for dataset in datasets:
assert len(dataset) == 1
self._assert_has_keys(dataset, "id")
[docs] @requires_new_history
def test_index_order_by_size(self):
num_datasets = 3
history_id = self.dataset_populator.new_history()
dataset_ids_ordered_by_size_asc = []
for index in range(num_datasets):
dataset_content = (index + 1) * "content"
hda = self.dataset_populator.new_dataset(history_id, content=dataset_content)
dataset_ids_ordered_by_size_asc.append(hda["id"])
dataset_ids_ordered_by_size = dataset_ids_ordered_by_size_asc[::-1]
self.dataset_populator.wait_for_history(history_id)
self._assert_history_datasets_ordered(
history_id, order_by="size", expected_ids_order=dataset_ids_ordered_by_size
)
self._assert_history_datasets_ordered(
history_id, order_by="size-asc", expected_ids_order=dataset_ids_ordered_by_size_asc
)
def _assert_history_datasets_ordered(self, history_id, order_by: str, expected_ids_order: List[str]):
datasets_response = self._get(f"histories/{history_id}/contents?v=dev&keys=size&order={order_by}")
self._assert_status_code_is(datasets_response, 200)
datasets = datasets_response.json()
assert len(datasets) == len(expected_ids_order)
for index, dataset in enumerate(datasets):
assert dataset["id"] == expected_ids_order[index]
[docs] @requires_new_history
def test_search_datasets(self):
with self.dataset_populator.test_history_for(self.test_search_datasets) as history_id:
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
payload = {"limit": 1, "offset": 0, "history_id": history_id}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
assert index_response[0]["id"] == hda_id
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["1\n2\n3"]
).json()
hdca_id = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)["id"]
index_payload_1 = {"limit": 3, "offset": 0, "order": "hid", "history_id": history_id}
index_response = self._get("datasets", index_payload_1).json()
assert len(index_response) == 3
assert index_response[0]["hid"] == 3
assert index_response[1]["hid"] == 2
assert index_response[2]["hid"] == 1
assert index_response[2]["history_content_type"] == "dataset"
assert index_response[2]["id"] == hda_id
assert index_response[1]["history_content_type"] == "dataset_collection"
assert index_response[1]["id"] == hdca_id
index_payload_2 = {"limit": 2, "offset": 0, "q": ["history_content_type"], "qv": ["dataset"]}
index_response = self._get("datasets", index_payload_2).json()
assert index_response[1]["id"] == hda_id
[docs] @requires_new_history
def test_search_by_tag(self):
with self.dataset_populator.test_history_for(self.test_search_by_tag) as history_id:
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
update_payload = {
"tags": ["cool:new_tag", "cool:another_tag"],
}
updated_hda = self._put(f"histories/{history_id}/contents/{hda_id}", update_payload, json=True).json()
assert "cool:new_tag" in updated_hda["tags"]
assert "cool:another_tag" in updated_hda["tags"]
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag"],
"qv": ["dataset", "cool:new_tag"],
"history_id": history_id,
}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag-contains"],
"qv": ["dataset", "new_tag"],
"history_id": history_id,
}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag-contains"],
"qv": ["dataset", "notag"],
"history_id": history_id,
}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 0
[docs] @requires_new_history
def test_search_by_tag_case_insensitive(self):
with self.dataset_populator.test_history_for(self.test_search_by_tag_case_insensitive) as history_id:
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
update_payload = {
"tags": ["name:new_TAG", "cool:another_TAG"],
}
updated_hda = self._put(f"histories/{history_id}/contents/{hda_id}", update_payload, json=True).json()
assert "name:new_TAG" in updated_hda["tags"]
assert "cool:another_TAG" in updated_hda["tags"]
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag"],
"qv": ["dataset", "name:new_tag"],
"history_id": history_id,
}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag-contains"],
"qv": ["dataset", "new_tag"],
"history_id": history_id,
}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag-contains"],
"qv": ["dataset", "notag"],
"history_id": history_id,
}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 0
[docs] @requires_new_history
def test_search_by_tool_id(self):
with self.dataset_populator.test_history_for(self.test_search_by_tool_id) as history_id:
self.dataset_populator.new_dataset(history_id)
payload = {
"limit": 1,
"offset": 0,
"q": ["history_content_type", "tool_id"],
"qv": ["dataset", "__DATA_FETCH__"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 1
payload = {
"limit": 1,
"offset": 0,
"q": ["history_content_type", "tool_id"],
"qv": ["dataset", "__DATA_FETCH__X"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 0
payload = {
"limit": 1,
"offset": 0,
"q": ["history_content_type", "tool_id-contains"],
"qv": ["dataset", "ATA_FETCH"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 1
self.dataset_collection_populator.create_list_in_history(
history_id, name="search by tool id", contents=["1\n2\n3"], wait=True
)
payload = {
"limit": 10,
"offset": 0,
"q": ["name", "tool_id"],
"qv": ["search by tool id", "__DATA_FETCH__"],
"history_id": history_id,
}
result = self._get("datasets", payload).json()
assert result[0]["name"] == "search by tool id", result
payload = {
"limit": 1,
"offset": 0,
"q": ["history_content_type", "tool_id"],
"qv": ["dataset_collection", "uploadX"],
"history_id": history_id,
}
result = self._get("datasets", payload).json()
assert len(result) == 0
[docs] @requires_new_history
def test_search_by_extension(self):
with self.dataset_populator.test_history_for(self.test_search_by_extension) as history_id:
self.dataset_populator.new_dataset(history_id, wait=True)
payload = {
"q": ["extension"],
"qv": ["txt"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 1
payload = {
"q": ["extension"],
"qv": ["bam"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 0
payload = {
"q": ["extension-in"],
"qv": ["bam,txt"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 1
payload = {
"q": ["extension-like"],
"qv": ["t%t"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 1
payload = {
"q": ["extension-like"],
"qv": ["b%m"],
"history_id": history_id,
}
assert len(self._get("datasets", payload).json()) == 0
[docs] def test_invalid_search(self):
payload = {
"limit": 10,
"offset": 0,
"q": ["history_content_type", "tag-invalid_op"],
"qv": ["dataset", "notag"],
}
index_response = self._get("datasets", payload)
self._assert_status_code_is(index_response, 400)
assert index_response.json()["err_msg"] == "bad op in filter"
[docs] def test_search_returns_only_accessible(self, history_id):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
with self._different_user():
payload = {"limit": 10, "offset": 0, "q": ["history_content_type"], "qv": ["dataset"]}
index_response = self._get("datasets", payload).json()
for item in index_response:
assert hda_id != item["id"]
[docs] def test_show(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id)
show_response = self._get(f"datasets/{hda1['id']}")
self._assert_status_code_is(show_response, 200)
self.__assert_matches_hda(hda1, show_response.json())
[docs] def test_show_permission_denied(self, history_id):
hda = self.dataset_populator.new_dataset(history_id)
self.dataset_populator.make_private(history_id=history_id, dataset_id=hda["id"])
with self._different_user():
show_response = self._get(f"datasets/{hda['id']}")
self._assert_status_code_is(show_response, 403)
[docs] @requires_admin
def test_admin_can_update_permissions(self, history_id):
# Create private dataset
hda = self.dataset_populator.new_dataset(history_id)
dataset_id = hda["id"]
self.dataset_populator.make_private(history_id=history_id, dataset_id=dataset_id)
# Admin removes restrictions
payload = {"action": "remove_restrictions"}
update_response = self._put(f"datasets/{dataset_id}/permissions", payload, admin=True, json=True)
self._assert_status_code_is_ok(update_response)
# Other users can access the dataset
with self._different_user():
show_response = self._get(f"datasets/{hda['id']}")
self._assert_status_code_is_ok(show_response)
def __assert_matches_hda(self, input_hda, query_hda):
self._assert_has_keys(query_hda, "id", "name")
assert input_hda["name"] == query_hda["name"]
assert input_hda["id"] == query_hda["id"]
[docs] def test_display(self, history_id):
contents = textwrap.dedent(
"""\
1 2 3 4
A B C D
10 20 30 40
"""
)
hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True)
display_response = self._get(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"})
self._assert_status_code_is(display_response, 200)
assert display_response.text == contents
[docs] def test_display_error_handling(self, history_id):
hda1 = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed"
)
display_response = self._get(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"})
self._assert_status_code_is(display_response, 409)
assert (
display_response.json()["err_msg"]
== "The dataset you are attempting to view has deferred data. You can only use this dataset as input for jobs."
)
[docs] def test_get_content_as_text(self, history_id):
contents = textwrap.dedent(
"""\
1 2 3 4
A B C D
10 20 30 40
"""
)
hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True)
get_content_as_text_response = self._get(f"datasets/{hda1['id']}/get_content_as_text")
self._assert_status_code_is(get_content_as_text_response, 200)
self._assert_has_key(get_content_as_text_response.json(), "item_data")
assert get_content_as_text_response.json().get("item_data") == contents
[docs] def test_get_content_as_text_with_compressed_text_data(self, history_id):
test_data_resolver = TestDataResolver()
with open(test_data_resolver.get_filename("1.fasta.gz"), mode="rb") as fh:
hda1 = self.dataset_populator.new_dataset(history_id, content=fh, ftype="fasta.gz", wait=True)
get_content_as_text_response = self._get(f"datasets/{hda1['id']}/get_content_as_text")
self._assert_status_code_is(get_content_as_text_response, 200)
self._assert_has_key(get_content_as_text_response.json(), "item_data")
assert ">hg17" in get_content_as_text_response.json().get("item_data")
[docs] def test_anon_get_content_as_text(self, history_id):
contents = "accessible data"
hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True)
with self._different_user(anon=True):
get_content_as_text_response = self._get(f"datasets/{hda1['id']}/get_content_as_text")
self._assert_status_code_is(get_content_as_text_response, 200)
[docs] def test_anon_private_get_content_as_text(self, history_id):
contents = "private data"
hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True)
self.dataset_populator.make_private(history_id=history_id, dataset_id=hda1["id"])
with self._different_user(anon=True):
get_content_as_text_response = self._get(f"datasets/{hda1['id']}/get_content_as_text")
self._assert_status_code_is(get_content_as_text_response, 403)
[docs] def test_dataprovider_chunk(self, history_id):
contents = textwrap.dedent(
"""\
1 2 3 4
A B C D
10 20 30 40
"""
)
# test first chunk
hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True)
kwds = {
"data_type": "raw_data",
"provider": "chunk",
"chunk_index": "0",
"chunk_size": "5",
}
display_response = self._get(f"datasets/{hda1['id']}", kwds)
self._assert_status_code_is(display_response, 200)
display = display_response.json()
self._assert_has_key(display, "data")
assert display["data"] == ["1 2"]
# test index
kwds = {
"data_type": "raw_data",
"provider": "chunk",
"chunk_index": "1",
"chunk_size": "5",
}
display_response = self._get(f"datasets/{hda1['id']}", kwds)
self._assert_status_code_is(display_response, 200)
display = display_response.json()
self._assert_has_key(display, "data")
assert display["data"] == [" 3 "]
# test line breaks
kwds = {
"data_type": "raw_data",
"provider": "chunk",
"chunk_index": "0",
"chunk_size": "20",
}
display_response = self._get(f"datasets/{hda1['id']}", kwds)
self._assert_status_code_is(display_response, 200)
display = display_response.json()
self._assert_has_key(display, "data")
assert "\nA" in display["data"][0]
[docs] def test_bam_chunking_through_display_endpoint(self, history_id):
# This endpoint does not use data providers and instead overrides display_data
# in the bam datatype. This is the endpoint is very close to the legacy non-API
# controller endpoint used by the UI to produce these chunks.
bam_dataset = self.dataset_populator.new_bam_dataset(history_id, self.test_data_resolver)
bam_id = bam_dataset["id"]
chunk_1 = self._display_chunk(bam_id, 0, 1)
self._assert_has_keys(chunk_1, "offset", "ck_data")
offset = chunk_1["offset"]
chunk_2 = self._display_chunk(bam_id, offset, 1)
assert chunk_2["offset"] > offset
# chunk_1 just contains all the headers so this check wouldn't work.
assert len(chunk_2["ck_data"].split("\n")) == 1
double_chunk = self._display_chunk(bam_id, offset, 2)
assert len(double_chunk["ck_data"].split("\n")) == 2
def _display_chunk(self, dataset_id: str, offset: int, ck_size: int):
return self.dataset_populator.display_chunk(dataset_id, offset, ck_size)
[docs] def test_tabular_chunking_through_display_endpoint(self, history_id):
contents = textwrap.dedent(
"""\
1 2 3 4
A B C D
10 20 30 40
"""
)
# test first chunk
hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True, file_type="tabular")
dataset_id = hda1["id"]
chunk_1 = self._display_chunk(dataset_id, 0, 1)
self._assert_has_keys(chunk_1, "offset", "ck_data")
assert chunk_1["ck_data"] == "1 2 3 4"
assert chunk_1["offset"] == 14
chunk_2 = self._display_chunk(dataset_id, 14, 1)
assert chunk_2["ck_data"] == "A B C D"
assert chunk_2["offset"] == 28
chunk_3 = self._display_chunk(dataset_id, 28, 1)
assert chunk_3["ck_data"] == "10 20 30 40"
[docs] def test_connectivity_table_chunking_through_display_endpoint(self, history_id):
ct_dataset = self.dataset_populator.new_dataset(
history_id, content=open(self.test_data_resolver.get_filename("1.ct"), "rb"), file_type="ct", wait=True
)
dataset_id = ct_dataset["id"]
chunk_1 = self._display_chunk(dataset_id, 0, 1)
self._assert_has_keys(chunk_1, "offset", "ck_data")
assert chunk_1["ck_data"] == "363 tmRNA"
assert chunk_1["offset"] == 10, chunk_1
chunk_2 = self._display_chunk(dataset_id, 10, 1)
assert chunk_2["ck_data"] == "1 G 0 2 359 1"
[docs] def test_head(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id, wait=True)
display_response = self._head(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"})
self._assert_status_code_is(display_response, 200)
assert display_response.text == ""
display_response = self._head(
f"histories/{history_id}/contents/{hda1['id']}{hda1['id']}/display", {"raw": "True"}
)
self._assert_status_code_is(display_response, 400)
[docs] def test_byte_range_support(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id, wait=True)
head_response = self._head(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"})
self._assert_status_code_is(head_response, 200)
assert head_response.headers["content-length"] == "12"
assert head_response.text == ""
assert head_response.headers["accept-ranges"] == "bytes"
valid_headers = {"range": "bytes=0-0"}
display_response = self._get(
f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}, headers=valid_headers
)
self._assert_status_code_is(display_response, 206)
assert len(display_response.text) == 1
assert display_response.headers["content-length"] == "1"
assert display_response.headers["content-range"] == "bytes 0-0/12"
invalid_headers = {"range": "bytes=-1-1"}
display_response = self._get(
f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}, headers=invalid_headers
)
self._assert_status_code_is(display_response, 416)
[docs] def test_tag_change(self, history_id):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
payload = {
"item_id": hda_id,
"item_class": "HistoryDatasetAssociation",
"item_tags": ["cool:tag_a", "cool:tag_b", "tag_c", "name:tag_d", "#tag_e"],
}
put_response = self._put("tags", data=payload, json=True)
self._assert_status_code_is_ok(put_response)
updated_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert "cool:tag_a" in updated_hda["tags"]
assert "cool:tag_b" in updated_hda["tags"]
assert "tag_c" in updated_hda["tags"]
assert "name:tag_d" in updated_hda["tags"]
assert "name:tag_e" in updated_hda["tags"]
[docs] def test_anon_tag_permissions(self):
with self._different_user(anon=True):
history_id = self._get(urllib.parse.urljoin(self.url, "history/current_history_json")).json()["id"]
hda_id = self.dataset_populator.new_dataset(history_id, content="abc", wait=True)["id"]
payload = {
"item_id": hda_id,
"item_class": "HistoryDatasetAssociation",
"item_tags": ["cool:tag_a", "cool:tag_b", "tag_c", "name:tag_d", "#tag_e"],
}
put_response = self._put("tags", data=payload, json=True)
updated_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert len(updated_hda["tags"]) == 5
# ensure we can remove these tags again
payload = {
"item_id": hda_id,
"item_class": "HistoryDatasetAssociation",
"item_tags": [],
}
put_response = self._put("tags", data=payload, json=True)
put_response.raise_for_status()
updated_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert len(updated_hda["tags"]) == 0
with self._different_user(anon=True):
# another anon user can't modify tags
payload = {
"item_id": hda_id,
"item_class": "HistoryDatasetAssociation",
"item_tags": ["cool:tag_a", "cool:tag_b", "tag_c", "name:tag_d", "#tag_e"],
}
put_response = self._put("tags", data=payload, json=True)
updated_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert len(updated_hda["tags"]) == 0
[docs] @skip_without_tool("cat_data_and_sleep")
def test_update_datatype(self, history_id):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
original_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json()
assert original_hda["extension"] == "txt"
assert original_hda["data_type"] == "galaxy.datatypes.data.Text"
inputs = {
"input1": {"src": "hda", "id": hda_id},
"sleep_time": 10,
}
run_response = self.dataset_populator.run_tool_raw(
"cat_data_and_sleep",
inputs,
history_id,
)
queued_id = run_response.json()["outputs"][0]["id"]
update_while_incomplete_response = self._put( # try updating datatype while used as output of a running job
f"histories/{history_id}/contents/{queued_id}", data={"datatype": "tabular"}, json=True
)
self._assert_status_code_is(update_while_incomplete_response, 400)
self.dataset_populator.wait_for_history_jobs(history_id) # now wait for upload to complete
successful_updated_hda_response = self._put(
f"histories/{history_id}/contents/{hda_id}", data={"datatype": "tabular"}, json=True
).json()
assert successful_updated_hda_response["extension"] == "tabular"
assert successful_updated_hda_response["data_type"] == "galaxy.datatypes.tabular.Tabular"
invalidly_updated_hda_response = self._put( # try updating with invalid datatype
f"histories/{history_id}/contents/{hda_id}", data={"datatype": "invalid"}, json=True
)
self._assert_status_code_is(invalidly_updated_hda_response, 400)
[docs] @skip_without_tool("cat_data_and_sleep")
def test_delete_cancels_job(self, history_id):
self._run_cancel_job(history_id, use_query_params=False)
[docs] @skip_without_tool("cat_data_and_sleep")
def test_delete_cancels_job_with_query_params(self, history_id):
self._run_cancel_job(history_id, use_query_params=True)
def _run_cancel_job(self, history_id: str, use_query_params: bool = False):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
inputs = {
"input1": {"src": "hda", "id": hda_id},
"sleep_time": 10,
}
run_response = self.dataset_populator.run_tool_raw(
"cat_data_and_sleep",
inputs,
history_id,
).json()
output_hda_id = run_response["outputs"][0]["id"]
job_id = run_response["jobs"][0]["id"]
job_details = self.dataset_populator.get_job_details(job_id).json()
assert job_details["state"] in ("new", "queued", "running"), job_details
# Use stop_job to cancel the creating job
delete_response = self.dataset_populator.delete_dataset(
history_id, output_hda_id, stop_job=True, use_query_params=use_query_params
)
self._assert_status_code_is_ok(delete_response)
deleted_hda = delete_response.json()
assert deleted_hda["deleted"], deleted_hda
# The job should be cancelled
deleted_job_details = self.dataset_populator.get_job_details(job_id).json()
assert deleted_job_details["state"] in ("deleting", "deleted"), deleted_job_details
[docs] def test_purge_does_not_reset_file_size(self):
with self.dataset_populator.test_history() as history_id:
dataset = self.dataset_populator.new_dataset(history_id=history_id, content="ABC", wait=True)
assert dataset["file_size"]
self.dataset_populator.delete_dataset(
history_id=history_id, content_id=dataset["id"], purge=True, wait_for_purge=True
)
purged_dataset = self.dataset_populator.get_history_dataset_details(
history_id=history_id, content_id=dataset["id"]
)
assert purged_dataset["purged"]
assert dataset["file_size"] == purged_dataset["file_size"]
[docs] def test_delete_batch(self):
num_datasets = 4
dataset_map: Dict[int, str] = {}
history_id = self.dataset_populator.new_history()
for index in range(num_datasets):
hda = self.dataset_populator.new_dataset(history_id)
dataset_map[index] = hda["id"]
self.dataset_populator.wait_for_history(history_id)
expected_deleted_source_ids = [
{"id": dataset_map[1], "src": "hda"},
{"id": dataset_map[2], "src": "hda"},
]
delete_payload = {"datasets": expected_deleted_source_ids}
deleted_result = self._delete_batch_with_payload(delete_payload)
assert deleted_result["success_count"] == len(expected_deleted_source_ids)
for deleted_source_id in expected_deleted_source_ids:
dataset = self._get(f"histories/{history_id}/contents/{deleted_source_id['id']}").json()
assert dataset["deleted"] is True
expected_purged_source_ids = [
{"id": dataset_map[0], "src": "hda"},
{"id": dataset_map[2], "src": "hda"},
]
purge_payload = {"purge": True, "datasets": expected_purged_source_ids}
deleted_result = self._delete_batch_with_payload(purge_payload)
assert deleted_result["success_count"] == len(expected_purged_source_ids)
for purged_source_id in expected_purged_source_ids:
self.dataset_populator.wait_for_purge(history_id, purged_source_id["id"])
[docs] def test_delete_batch_error(self):
num_datasets = 4
dataset_map: Dict[int, str] = {}
with self._different_user():
history_id = self.dataset_populator.new_history()
for index in range(num_datasets):
hda = self.dataset_populator.new_dataset(history_id)
dataset_map[index] = hda["id"]
# Trying to delete datasets of wrong type will error
expected_errored_source_ids = [
{"id": dataset_map[0], "src": "ldda"},
{"id": dataset_map[3], "src": "ldda"},
]
delete_payload = {"datasets": expected_errored_source_ids}
deleted_result = self._delete_batch_with_payload(delete_payload)
assert deleted_result["success_count"] == 0
assert len(deleted_result["errors"]) == len(expected_errored_source_ids)
# Trying to delete datasets that we don't own will error
expected_errored_source_ids = [
{"id": dataset_map[1], "src": "hda"},
{"id": dataset_map[2], "src": "hda"},
]
delete_payload = {"datasets": expected_errored_source_ids}
deleted_result = self._delete_batch_with_payload(delete_payload)
assert deleted_result["success_count"] == 0
assert len(deleted_result["errors"]) == len(expected_errored_source_ids)
for error in deleted_result["errors"]:
self._assert_has_keys(error, "dataset", "error_message")
self._assert_has_keys(error["dataset"], "id", "src")
def _delete_batch_with_payload(self, payload):
delete_response = self._delete("datasets", data=payload, json=True)
self._assert_status_code_is_ok(delete_response)
deleted_result = delete_response.json()
return deleted_result
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_download(self, history_id):
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
response = self._get(f"histories/{history_id}/contents/{output['id']}/display?to_ext=zip")
self._assert_status_code_is(response, 200)
archive = zipfile.ZipFile(BytesIO(response.content))
namelist = archive.namelist()
assert len(namelist) == 4, f"Expected 3 elements in [{namelist}]"
[docs] def test_compute_md5_on_primary_dataset(self, history_id):
hda = self.dataset_populator.new_dataset(history_id, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0
self.dataset_populator.compute_hash(hda["id"])
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")
[docs] def test_compute_sha1_on_composite_dataset(self, history_id):
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0
self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
self.assert_hash_value(
hda_details,
"3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22",
"SHA-256",
extra_files_path="Roadmaps",
)
[docs] def test_duplicated_hash_requests_on_primary(self, history_id):
hda = self.dataset_populator.new_dataset(history_id, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0
self.dataset_populator.compute_hash(hda["id"])
self.dataset_populator.compute_hash(hda["id"])
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")
[docs] def test_duplicated_hash_requests_on_extra_files(self, history_id):
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
assert "hashes" in hda_details, str(hda_details.keys())
hashes = hda_details["hashes"]
assert len(hashes) == 0
# 4 unique requests, but make them twice...
for _ in range(2):
self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-1", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(hda_details["id"], hash_function="MD5", extra_files_path="Roadmaps")
self.dataset_populator.compute_hash(
hda_details["id"], hash_function="SHA-256", extra_files_path="Sequences"
)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
self.assert_hash_value(hda_details, "ce0c0ef1073317ff96c896c249b002dc", "MD5", extra_files_path="Roadmaps")
self.assert_hash_value(
hda_details, "fe2e06cdd03922a1ddf3fe6c7e0d299c8044fc8e", "SHA-1", extra_files_path="Roadmaps"
)
self.assert_hash_value(
hda_details,
"3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22",
"SHA-256",
extra_files_path="Roadmaps",
)
self.assert_hash_value(
hda_details,
"4688dca47fe3214516c35acd284a79d97bd6df2bc1c55981b556d995495b91b6",
"SHA-256",
extra_files_path="Sequences",
)
[docs] def assert_hash_value(self, dataset_details, expected_hash_value, hash_function, extra_files_path=None):
assert "hashes" in dataset_details, str(dataset_details.keys())
hashes = dataset_details["hashes"]
matching_hashes = [
h for h in hashes if h["extra_files_path"] == extra_files_path and h["hash_function"] == hash_function
]
assert len(matching_hashes) == 1
hash_value = matching_hashes[0]["hash_value"]
assert expected_hash_value == hash_value
[docs] def test_storage_show(self, history_id):
hda = self.dataset_populator.new_dataset(history_id, wait=True)
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
dataset_id = hda_details["dataset_id"]
storage_info_dict = self.dataset_populator.dataset_storage_info(dataset_id)
assert_has_keys(storage_info_dict, "object_store_id", "name", "description")
[docs] def test_storage_show_on_discarded(self, history_id):
as_list = self.dataset_populator.create_contents_from_store(
history_id,
store_dict=one_hda_model_store_dict(),
)
assert len(as_list) == 1
hda_id = as_list[0]["id"]
storage_info_dict = self.dataset_populator.dataset_storage_info(hda_id)
assert_has_keys(storage_info_dict, "object_store_id", "name", "description", "sources", "hashes")
assert storage_info_dict["object_store_id"] is None
sources = storage_info_dict["sources"]
assert len(sources) == 1
assert sources[0]["source_uri"] == TEST_SOURCE_URI
[docs] def test_storage_show_on_deferred(self, history_id):
as_list = self.dataset_populator.create_contents_from_store(
history_id,
store_dict=deferred_hda_model_store_dict(),
)
assert len(as_list) == 1
hda_id = as_list[0]["id"]
storage_info_dict = self.dataset_populator.dataset_storage_info(hda_id)
assert_has_keys(storage_info_dict, "object_store_id", "name", "description", "sources", "hashes")
assert storage_info_dict["object_store_id"] is None
sources = storage_info_dict["sources"]
assert len(sources) == 1
assert sources[0]["source_uri"] == TEST_SOURCE_URI
[docs] @skip_if_github_down
def test_display_application_link(self, history_id):
item = {
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam",
"ext": "bam",
}
output = self.dataset_populator.fetch_hda(history_id, item)
dataset_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output, assert_ok=True)
assert "display_application/" in dataset_details["display_apps"][0]["links"][0]["href"]
[docs] def test_cannot_update_datatype_on_immutable_history(self, history_id):
hda_id = self.dataset_populator.new_dataset(history_id)["id"]
self.dataset_populator.wait_for_history_jobs(history_id)
# once we purge the history, it becomes immutable
self._delete(f"histories/{history_id}", data={"purge": True}, json=True)
# now we can't update the datatype
response = self._put(f"histories/{history_id}/contents/{hda_id}", data={"datatype": "tabular"}, json=True)
self._assert_status_code_is(response, 403)
assert response.json()["err_msg"] == "History is immutable"