Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_datasets

import textwrap
import zipfile
from io import BytesIO
from typing import (
    Dict,
    List,
)

from galaxy.model.unittest_utils.store_fixtures import (
    deferred_hda_model_store_dict,
    one_hda_model_store_dict,
    TEST_SOURCE_URI,
)
from galaxy.util.unittest_utils import skip_if_github_down
from galaxy_test.base.api_asserts import assert_has_keys
from galaxy_test.base.decorators import (
    requires_admin,
    requires_new_history,
)
from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    skip_without_datatype,
    skip_without_tool,
)
from ._framework import ApiTestCase

COMPOSITE_DATA_FETCH_REQUEST_1 = {
    "src": "composite",
    "ext": "velvet",
    "composite": {
        "items": [
            {"src": "pasted", "paste_content": "sequences content"},
            {"src": "pasted", "paste_content": "roadmaps content"},
            {"src": "pasted", "paste_content": "log content"},
        ]
    },
}


[docs]class TestDatasetsApi(ApiTestCase): dataset_populator: DatasetPopulator
[docs] def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] def test_index(self): index_response = self._get("datasets") self._assert_status_code_is(index_response, 200)
[docs] def test_index_using_keys(self, history_id): expected_keys = "id" self.dataset_populator.new_dataset(history_id) index_response = self._get(f"datasets?keys={expected_keys}") self._assert_status_code_is(index_response, 200) datasets = index_response.json() for dataset in datasets: assert len(dataset) == 1 self._assert_has_keys(dataset, "id")
[docs] @requires_new_history def test_index_order_by_size(self): num_datasets = 3 history_id = self.dataset_populator.new_history() dataset_ids_ordered_by_size_asc = [] for index in range(num_datasets): dataset_content = (index + 1) * "content" hda = self.dataset_populator.new_dataset(history_id, content=dataset_content) dataset_ids_ordered_by_size_asc.append(hda["id"]) dataset_ids_ordered_by_size = dataset_ids_ordered_by_size_asc[::-1] self.dataset_populator.wait_for_history(history_id) self._assert_history_datasets_ordered( history_id, order_by="size", expected_ids_order=dataset_ids_ordered_by_size ) self._assert_history_datasets_ordered( history_id, order_by="size-asc", expected_ids_order=dataset_ids_ordered_by_size_asc )
def _assert_history_datasets_ordered(self, history_id, order_by: str, expected_ids_order: List[str]): datasets_response = self._get(f"histories/{history_id}/contents?v=dev&keys=size&order={order_by}") self._assert_status_code_is(datasets_response, 200) datasets = datasets_response.json() assert len(datasets) == len(expected_ids_order) for index, dataset in enumerate(datasets): assert dataset["id"] == expected_ids_order[index]
[docs] @requires_new_history def test_search_datasets(self): with self.dataset_populator.test_history_for(self.test_search_datasets) as history_id: hda_id = self.dataset_populator.new_dataset(history_id)["id"] payload = {"limit": 1, "offset": 0, "history_id": history_id} index_response = self._get("datasets", payload).json() assert len(index_response) == 1 assert index_response[0]["id"] == hda_id fetch_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=["1\n2\n3"] ).json() hdca_id = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)["id"] index_payload_1 = {"limit": 3, "offset": 0, "order": "hid", "history_id": history_id} index_response = self._get("datasets", index_payload_1).json() assert len(index_response) == 3 assert index_response[0]["hid"] == 3 assert index_response[1]["hid"] == 2 assert index_response[2]["hid"] == 1 assert index_response[2]["history_content_type"] == "dataset" assert index_response[2]["id"] == hda_id assert index_response[1]["history_content_type"] == "dataset_collection" assert index_response[1]["id"] == hdca_id index_payload_2 = {"limit": 2, "offset": 0, "q": ["history_content_type"], "qv": ["dataset"]} index_response = self._get("datasets", index_payload_2).json() assert index_response[1]["id"] == hda_id
[docs] @requires_new_history def test_search_by_tag(self): with self.dataset_populator.test_history_for(self.test_search_by_tag) as history_id: hda_id = self.dataset_populator.new_dataset(history_id)["id"] update_payload = { "tags": ["cool:new_tag", "cool:another_tag"], } updated_hda = self._put(f"histories/{history_id}/contents/{hda_id}", update_payload, json=True).json() assert "cool:new_tag" in updated_hda["tags"] assert "cool:another_tag" in updated_hda["tags"] payload = { "limit": 10, "offset": 0, "q": ["history_content_type", "tag"], "qv": ["dataset", "cool:new_tag"], "history_id": history_id, } index_response = self._get("datasets", payload).json() assert len(index_response) == 1 payload = { "limit": 10, "offset": 0, "q": ["history_content_type", "tag-contains"], "qv": ["dataset", "new_tag"], "history_id": history_id, } index_response = self._get("datasets", payload).json() assert len(index_response) == 1 payload = { "limit": 10, "offset": 0, "q": ["history_content_type", "tag-contains"], "qv": ["dataset", "notag"], "history_id": history_id, } index_response = self._get("datasets", payload).json() assert len(index_response) == 0
[docs] @requires_new_history def test_search_by_tag_case_insensitive(self): with self.dataset_populator.test_history_for(self.test_search_by_tag_case_insensitive) as history_id: hda_id = self.dataset_populator.new_dataset(history_id)["id"] update_payload = { "tags": ["name:new_TAG", "cool:another_TAG"], } updated_hda = self._put(f"histories/{history_id}/contents/{hda_id}", update_payload, json=True).json() assert "name:new_TAG" in updated_hda["tags"] assert "cool:another_TAG" in updated_hda["tags"] payload = { "limit": 10, "offset": 0, "q": ["history_content_type", "tag"], "qv": ["dataset", "name:new_tag"], "history_id": history_id, } index_response = self._get("datasets", payload).json() assert len(index_response) == 1 payload = { "limit": 10, "offset": 0, "q": ["history_content_type", "tag-contains"], "qv": ["dataset", "new_tag"], "history_id": history_id, } index_response = self._get("datasets", payload).json() assert len(index_response) == 1 payload = { "limit": 10, "offset": 0, "q": ["history_content_type", "tag-contains"], "qv": ["dataset", "notag"], "history_id": history_id, } index_response = self._get("datasets", payload).json() assert len(index_response) == 0
[docs] @requires_new_history def test_search_by_tool_id(self): with self.dataset_populator.test_history_for(self.test_search_by_tool_id) as history_id: self.dataset_populator.new_dataset(history_id) payload = { "limit": 1, "offset": 0, "q": ["history_content_type", "tool_id"], "qv": ["dataset", "__DATA_FETCH__"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 1 payload = { "limit": 1, "offset": 0, "q": ["history_content_type", "tool_id"], "qv": ["dataset", "__DATA_FETCH__X"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 0 payload = { "limit": 1, "offset": 0, "q": ["history_content_type", "tool_id-contains"], "qv": ["dataset", "ATA_FETCH"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 1 self.dataset_collection_populator.create_list_in_history( history_id, name="search by tool id", contents=["1\n2\n3"], wait=True ) payload = { "limit": 10, "offset": 0, "q": ["name", "tool_id"], "qv": ["search by tool id", "__DATA_FETCH__"], "history_id": history_id, } result = self._get("datasets", payload).json() assert result[0]["name"] == "search by tool id", result payload = { "limit": 1, "offset": 0, "q": ["history_content_type", "tool_id"], "qv": ["dataset_collection", "uploadX"], "history_id": history_id, } result = self._get("datasets", payload).json() assert len(result) == 0
[docs] @requires_new_history def test_search_by_extension(self): with self.dataset_populator.test_history_for(self.test_search_by_extension) as history_id: self.dataset_populator.new_dataset(history_id, wait=True) payload = { "q": ["extension"], "qv": ["txt"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 1 payload = { "q": ["extension"], "qv": ["bam"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 0 payload = { "q": ["extension-in"], "qv": ["bam,txt"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 1 payload = { "q": ["extension-like"], "qv": ["t%t"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 1 payload = { "q": ["extension-like"], "qv": ["b%m"], "history_id": history_id, } assert len(self._get("datasets", payload).json()) == 0
[docs] def test_search_returns_only_accessible(self, history_id): hda_id = self.dataset_populator.new_dataset(history_id)["id"] with self._different_user(): payload = {"limit": 10, "offset": 0, "q": ["history_content_type"], "qv": ["dataset"]} index_response = self._get("datasets", payload).json() for item in index_response: assert hda_id != item["id"]
[docs] def test_show(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id) show_response = self._get(f"datasets/{hda1['id']}") self._assert_status_code_is(show_response, 200) self.__assert_matches_hda(hda1, show_response.json())
[docs] def test_show_permission_denied(self, history_id): hda = self.dataset_populator.new_dataset(history_id) self.dataset_populator.make_private(history_id=history_id, dataset_id=hda["id"]) with self._different_user(): show_response = self._get(f"datasets/{hda['id']}") self._assert_status_code_is(show_response, 403)
[docs] @requires_admin def test_admin_can_update_permissions(self, history_id): # Create private dataset hda = self.dataset_populator.new_dataset(history_id) dataset_id = hda["id"] self.dataset_populator.make_private(history_id=history_id, dataset_id=dataset_id) # Admin removes restrictions payload = {"action": "remove_restrictions"} update_response = self._put(f"datasets/{dataset_id}/permissions", payload, admin=True, json=True) self._assert_status_code_is_ok(update_response) # Other users can access the dataset with self._different_user(): show_response = self._get(f"datasets/{hda['id']}") self._assert_status_code_is_ok(show_response)
def __assert_matches_hda(self, input_hda, query_hda): self._assert_has_keys(query_hda, "id", "name") assert input_hda["name"] == query_hda["name"] assert input_hda["id"] == query_hda["id"]
[docs] def test_display(self, history_id): contents = textwrap.dedent( """\ 1 2 3 4 A B C D 10 20 30 40 """ ) hda1 = self.dataset_populator.new_dataset(history_id, content=contents, wait=True) display_response = self._get(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}) self._assert_status_code_is(display_response, 200) assert display_response.text == contents
[docs] def test_head(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id, wait=True) display_response = self._head(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}) self._assert_status_code_is(display_response, 200) assert display_response.text == "" display_response = self._head( f"histories/{history_id}/contents/{hda1['id']}{hda1['id']}/display", {"raw": "True"} ) self._assert_status_code_is(display_response, 400)
[docs] def test_byte_range_support(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id, wait=True) head_response = self._head(f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}) self._assert_status_code_is(head_response, 200) assert head_response.headers["content-length"] == "12" assert head_response.text == "" assert head_response.headers["accept-ranges"] == "bytes" valid_headers = {"range": "bytes=0-0"} display_response = self._get( f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}, headers=valid_headers ) self._assert_status_code_is(display_response, 206) assert len(display_response.text) == 1 assert display_response.headers["content-length"] == "1" assert display_response.headers["content-range"] == "bytes 0-0/12" invalid_headers = {"range": "bytes=-1-1"} display_response = self._get( f"histories/{history_id}/contents/{hda1['id']}/display", {"raw": "True"}, headers=invalid_headers ) self._assert_status_code_is(display_response, 416)
[docs] def test_tag_change(self, history_id): hda_id = self.dataset_populator.new_dataset(history_id)["id"] payload = { "item_id": hda_id, "item_class": "HistoryDatasetAssociation", "item_tags": ["cool:tag_a", "cool:tag_b", "tag_c", "name:tag_d", "#tag_e"], } put_response = self._put("tags", data=payload, json=True) self._assert_status_code_is_ok(put_response) updated_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json() assert "cool:tag_a" in updated_hda["tags"] assert "cool:tag_b" in updated_hda["tags"] assert "tag_c" in updated_hda["tags"] assert "name:tag_d" in updated_hda["tags"] assert "name:tag_e" in updated_hda["tags"]
[docs] @skip_without_tool("cat_data_and_sleep") def test_update_datatype(self, history_id): hda_id = self.dataset_populator.new_dataset(history_id)["id"] original_hda = self._get(f"histories/{history_id}/contents/{hda_id}").json() assert original_hda["extension"] == "txt" assert original_hda["data_type"] == "galaxy.datatypes.data.Text" inputs = { "input1": {"src": "hda", "id": hda_id}, "sleep_time": 10, } run_response = self.dataset_populator.run_tool_raw( "cat_data_and_sleep", inputs, history_id, ) queued_id = run_response.json()["outputs"][0]["id"] update_while_incomplete_response = self._put( # try updating datatype while used as output of a running job f"histories/{history_id}/contents/{queued_id}", data={"datatype": "tabular"}, json=True ) self._assert_status_code_is(update_while_incomplete_response, 400) self.dataset_populator.wait_for_history_jobs(history_id) # now wait for upload to complete successful_updated_hda_response = self._put( f"histories/{history_id}/contents/{hda_id}", data={"datatype": "tabular"}, json=True ).json() assert successful_updated_hda_response["extension"] == "tabular" assert successful_updated_hda_response["data_type"] == "galaxy.datatypes.tabular.Tabular" invalidly_updated_hda_response = self._put( # try updating with invalid datatype f"histories/{history_id}/contents/{hda_id}", data={"datatype": "invalid"}, json=True ) self._assert_status_code_is(invalidly_updated_hda_response, 400)
[docs] @skip_without_tool("cat_data_and_sleep") def test_delete_cancels_job(self, history_id): self._run_cancel_job(history_id, use_query_params=False)
[docs] @skip_without_tool("cat_data_and_sleep") def test_delete_cancels_job_with_query_params(self, history_id): self._run_cancel_job(history_id, use_query_params=True)
def _run_cancel_job(self, history_id: str, use_query_params: bool = False): hda_id = self.dataset_populator.new_dataset(history_id)["id"] inputs = { "input1": {"src": "hda", "id": hda_id}, "sleep_time": 10, } run_response = self.dataset_populator.run_tool_raw( "cat_data_and_sleep", inputs, history_id, ).json() output_hda_id = run_response["outputs"][0]["id"] job_id = run_response["jobs"][0]["id"] job_details = self.dataset_populator.get_job_details(job_id).json() assert job_details["state"] in ("new", "queued", "running"), job_details # Use stop_job to cancel the creating job delete_response = self.dataset_populator.delete_dataset( history_id, output_hda_id, stop_job=True, use_query_params=use_query_params ) self._assert_status_code_is_ok(delete_response) deleted_hda = delete_response.json() assert deleted_hda["deleted"], deleted_hda # The job should be cancelled deleted_job_details = self.dataset_populator.get_job_details(job_id).json() assert deleted_job_details["state"] in ("deleting", "deleted"), deleted_job_details
[docs] def test_delete_batch(self): num_datasets = 4 dataset_map: Dict[int, str] = {} history_id = self.dataset_populator.new_history() for index in range(num_datasets): hda = self.dataset_populator.new_dataset(history_id) dataset_map[index] = hda["id"] self.dataset_populator.wait_for_history(history_id) expected_deleted_source_ids = [ {"id": dataset_map[1], "src": "hda"}, {"id": dataset_map[2], "src": "hda"}, ] delete_payload = {"datasets": expected_deleted_source_ids} deleted_result = self._delete_batch_with_payload(delete_payload) assert deleted_result["success_count"] == len(expected_deleted_source_ids) for deleted_source_id in expected_deleted_source_ids: dataset = self._get(f"histories/{history_id}/contents/{deleted_source_id['id']}").json() assert dataset["deleted"] is True expected_purged_source_ids = [ {"id": dataset_map[0], "src": "hda"}, {"id": dataset_map[2], "src": "hda"}, ] purge_payload = {"purge": True, "datasets": expected_purged_source_ids} deleted_result = self._delete_batch_with_payload(purge_payload) assert deleted_result["success_count"] == len(expected_purged_source_ids) for purged_source_id in expected_purged_source_ids: self.dataset_populator.wait_for_purge(history_id, purged_source_id["id"])
[docs] def test_delete_batch_error(self): num_datasets = 4 dataset_map: Dict[int, str] = {} with self._different_user(): history_id = self.dataset_populator.new_history() for index in range(num_datasets): hda = self.dataset_populator.new_dataset(history_id) dataset_map[index] = hda["id"] # Trying to delete datasets of wrong type will error expected_errored_source_ids = [ {"id": dataset_map[0], "src": "ldda"}, {"id": dataset_map[3], "src": "ldda"}, ] delete_payload = {"datasets": expected_errored_source_ids} deleted_result = self._delete_batch_with_payload(delete_payload) assert deleted_result["success_count"] == 0 assert len(deleted_result["errors"]) == len(expected_errored_source_ids) # Trying to delete datasets that we don't own will error expected_errored_source_ids = [ {"id": dataset_map[1], "src": "hda"}, {"id": dataset_map[2], "src": "hda"}, ] delete_payload = {"datasets": expected_errored_source_ids} deleted_result = self._delete_batch_with_payload(delete_payload) assert deleted_result["success_count"] == 0 assert len(deleted_result["errors"]) == len(expected_errored_source_ids) for error in deleted_result["errors"]: self._assert_has_keys(error, "dataset", "error_message") self._assert_has_keys(error["dataset"], "id", "src")
def _delete_batch_with_payload(self, payload): delete_response = self._delete("datasets", data=payload, json=True) self._assert_status_code_is_ok(delete_response) deleted_result = delete_response.json() return deleted_result
[docs] @skip_without_datatype("velvet") def test_composite_datatype_download(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) response = self._get(f"histories/{history_id}/contents/{output['id']}/display?to_ext=zip") self._assert_status_code_is(response, 200) archive = zipfile.ZipFile(BytesIO(response.content)) namelist = archive.namelist() assert len(namelist) == 4, f"Expected 3 elements in [{namelist}]"
[docs] def test_compute_md5_on_primary_dataset(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) assert "hashes" in hda_details, str(hda_details.keys()) hashes = hda_details["hashes"] assert len(hashes) == 0 self.dataset_populator.compute_hash(hda["id"]) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")
[docs] def test_compute_sha1_on_composite_dataset(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) assert "hashes" in hda_details, str(hda_details.keys()) hashes = hda_details["hashes"] assert len(hashes) == 0 self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps") hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) self.assert_hash_value( hda_details, "3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22", "SHA-256", extra_files_path="Roadmaps", )
[docs] def test_duplicated_hash_requests_on_primary(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) assert "hashes" in hda_details, str(hda_details.keys()) hashes = hda_details["hashes"] assert len(hashes) == 0 self.dataset_populator.compute_hash(hda["id"]) self.dataset_populator.compute_hash(hda["id"]) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")
[docs] def test_duplicated_hash_requests_on_extra_files(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) assert "hashes" in hda_details, str(hda_details.keys()) hashes = hda_details["hashes"] assert len(hashes) == 0 # 4 unique requests, but make them twice... for _ in range(2): self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps") self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-1", extra_files_path="Roadmaps") self.dataset_populator.compute_hash(hda_details["id"], hash_function="MD5", extra_files_path="Roadmaps") self.dataset_populator.compute_hash( hda_details["id"], hash_function="SHA-256", extra_files_path="Sequences" ) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) self.assert_hash_value(hda_details, "ce0c0ef1073317ff96c896c249b002dc", "MD5", extra_files_path="Roadmaps") self.assert_hash_value( hda_details, "fe2e06cdd03922a1ddf3fe6c7e0d299c8044fc8e", "SHA-1", extra_files_path="Roadmaps" ) self.assert_hash_value( hda_details, "3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22", "SHA-256", extra_files_path="Roadmaps", ) self.assert_hash_value( hda_details, "4688dca47fe3214516c35acd284a79d97bd6df2bc1c55981b556d995495b91b6", "SHA-256", extra_files_path="Sequences", )
[docs] def assert_hash_value(self, dataset_details, expected_hash_value, hash_function, extra_files_path=None): assert "hashes" in dataset_details, str(dataset_details.keys()) hashes = dataset_details["hashes"] matching_hashes = [ h for h in hashes if h["extra_files_path"] == extra_files_path and h["hash_function"] == hash_function ] assert len(matching_hashes) == 1 hash_value = matching_hashes[0]["hash_value"] assert expected_hash_value == hash_value
[docs] def test_storage_show(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) dataset_id = hda_details["dataset_id"] storage_info_dict = self.dataset_populator.dataset_storage_info(dataset_id) assert_has_keys(storage_info_dict, "object_store_id", "name", "description")
[docs] def test_storage_show_on_discarded(self, history_id): as_list = self.dataset_populator.create_contents_from_store( history_id, store_dict=one_hda_model_store_dict(), ) assert len(as_list) == 1 hda_id = as_list[0]["id"] storage_info_dict = self.dataset_populator.dataset_storage_info(hda_id) assert_has_keys(storage_info_dict, "object_store_id", "name", "description", "sources", "hashes") assert storage_info_dict["object_store_id"] is None sources = storage_info_dict["sources"] assert len(sources) == 1 assert sources[0]["source_uri"] == TEST_SOURCE_URI
[docs] def test_storage_show_on_deferred(self, history_id): as_list = self.dataset_populator.create_contents_from_store( history_id, store_dict=deferred_hda_model_store_dict(), ) assert len(as_list) == 1 hda_id = as_list[0]["id"] storage_info_dict = self.dataset_populator.dataset_storage_info(hda_id) assert_has_keys(storage_info_dict, "object_store_id", "name", "description", "sources", "hashes") assert storage_info_dict["object_store_id"] is None sources = storage_info_dict["sources"] assert len(sources) == 1 assert sources[0]["source_uri"] == TEST_SOURCE_URI