Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_tools_upload

import json

from galaxy.tool_util.verify.test_data import TestDataResolver
from galaxy_test.base.constants import (
    ONE_TO_SIX_ON_WINDOWS,
    ONE_TO_SIX_WITH_SPACES,
    ONE_TO_SIX_WITH_TABS,
)
from galaxy_test.base.populators import (
    DatasetPopulator,
    skip_if_site_down,
    skip_without_datatype,
    uses_test_history,
)
from ._framework import ApiTestCase


[docs]class ToolsUploadTestCase(ApiTestCase):
[docs] def setUp(self): super(ToolsUploadTestCase, self).setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
[docs] def test_upload1_paste(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, 'Hello World') create_response = self._post("tools", data=payload) self._assert_has_keys(create_response.json(), 'outputs')
[docs] def test_upload1_paste_bad_datatype(self): # Check that you get a nice message if you upload an incorrect datatype with self.dataset_populator.test_history() as history_id: file_type = "johnsawesomebutfakedatatype" payload = self.dataset_populator.upload_payload(history_id, 'Hello World', file_type=file_type) create = self._post("tools", data=payload).json() self._assert_has_keys(create, 'err_msg') assert file_type in create['err_msg']
# upload1 rewrites content with posix lines by default but this can be disabled by setting # to_posix_lines=None in the request. Newer fetch API does not do this by default prefering # to keep content unaltered if possible but it can be enabled with a simple JSON boolean switch # of the same name (to_posix_lines).
[docs] def test_upload_posix_newline_fixes_by_default(self): windows_content = ONE_TO_SIX_ON_WINDOWS result_content = self._upload_and_get_content(windows_content) self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_fetch_posix_unaltered(self): windows_content = ONE_TO_SIX_ON_WINDOWS result_content = self._upload_and_get_content(windows_content, api="fetch") self.assertEqual(result_content, ONE_TO_SIX_ON_WINDOWS)
[docs] def test_upload_disable_posix_fix(self): windows_content = ONE_TO_SIX_ON_WINDOWS result_content = self._upload_and_get_content(windows_content, to_posix_lines=None) self.assertEqual(result_content, windows_content)
[docs] def test_fetch_post_lines_option(self): windows_content = ONE_TO_SIX_ON_WINDOWS result_content = self._upload_and_get_content(windows_content, api="fetch", to_posix_lines=True) self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_upload_tab_to_space_off_by_default(self): table = ONE_TO_SIX_WITH_SPACES result_content = self._upload_and_get_content(table) self.assertEqual(result_content, table)
[docs] def test_fetch_tab_to_space_off_by_default(self): table = ONE_TO_SIX_WITH_SPACES result_content = self._upload_and_get_content(table, api='fetch') self.assertEqual(result_content, table)
[docs] def test_upload_tab_to_space(self): table = ONE_TO_SIX_WITH_SPACES result_content = self._upload_and_get_content(table, space_to_tab="Yes") self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_fetch_tab_to_space(self): table = ONE_TO_SIX_WITH_SPACES result_content = self._upload_and_get_content(table, api="fetch", space_to_tab=True) self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_fetch_compressed_with_explicit_type(self): fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz") with open(fastqgz_path, "rb") as fh: details = self._upload_and_get_details(fh, api="fetch", ext="fastqsanger.gz") assert details["state"] == "ok" assert details["file_ext"] == "fastqsanger.gz"
[docs] def test_fetch_compressed_default(self): fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz") with open(fastqgz_path, "rb") as fh: details = self._upload_and_get_details(fh, api="fetch", assert_ok=False) assert details["state"] == "ok" assert details["file_ext"] == "fastqsanger.gz", details
[docs] @uses_test_history(require_new=True) def test_fetch_compressed_auto_decompress_target(self, history_id): # TODO: this should definitely be fixed to allow auto decompression via that API. fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz") with open(fastqgz_path, "rb") as fh: details = self._upload_and_get_details(fh, api="fetch", history_id=history_id, assert_ok=False, auto_decompress=True) assert details["state"] == "ok" assert details["file_ext"] == "fastqsanger.gz", details
[docs] def test_upload_decompress_off_with_auto_by_default(self): # UNSTABLE_FLAG: This might default to a bed.gz datatype in the future. bedgz_path = TestDataResolver().get_filename("4.bed.gz") with open(bedgz_path, "rb") as fh: details = self._upload_and_get_details(fh, file_type="auto") assert details["state"] == "ok" assert details["file_ext"] == "bed", details
[docs] def test_upload_decompresses_if_uncompressed_type_selected(self): fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz") with open(fastqgz_path, "rb") as fh: details = self._upload_and_get_details(fh, file_type="fastqsanger") assert details["state"] == "ok" assert details["file_ext"] == "fastqsanger", details assert details["file_size"] == 178, details
[docs] def test_upload_decompress_off_if_compressed_type_selected(self): fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz") with open(fastqgz_path, "rb") as fh: details = self._upload_and_get_details(fh, file_type="fastqsanger.gz") assert details["state"] == "ok" assert details["file_ext"] == "fastqsanger.gz", details assert details["file_size"] == 161, details
[docs] def test_upload_auto_decompress_off(self): # UNSTABLE_FLAG: This might default to a bed.gz datatype in the future. bedgz_path = TestDataResolver().get_filename("4.bed.gz") with open(bedgz_path, "rb") as fh: details = self._upload_and_get_details(fh, file_type="auto", assert_ok=False, auto_decompress=False) assert details["file_ext"] == "binary", details
[docs] @uses_test_history(require_new=True) def test_fetch_compressed_with_auto(self, history_id): # UNSTABLE_FLAG: This might default to a bed.gz datatype in the future. # TODO: this should definitely be fixed to allow auto decompression via that API. bedgz_path = TestDataResolver().get_filename("4.bed.gz") with open(bedgz_path, "rb") as fh: details = self._upload_and_get_details(fh, api="fetch", history_id=history_id, auto_decompress=True, assert_ok=False) assert details["state"] == "ok" assert details["file_ext"] == "bed"
[docs] @skip_without_datatype("rdata") def test_rdata_not_decompressed(self): # Prevent regression of https://github.com/galaxyproject/galaxy/issues/753 rdata_path = TestDataResolver().get_filename("1.RData") with open(rdata_path, "rb") as fh: rdata_metadata = self._upload_and_get_details(fh, file_type="auto") self.assertEqual(rdata_metadata["file_ext"], "rdata")
[docs] @skip_without_datatype("csv") def test_csv_upload(self): csv_path = TestDataResolver().get_filename("1.csv") with open(csv_path, "rb") as fh: csv_metadata = self._upload_and_get_details(fh, file_type="csv") self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("csv") def test_csv_upload_auto(self): csv_path = TestDataResolver().get_filename("1.csv") with open(csv_path, "rb") as fh: csv_metadata = self._upload_and_get_details(fh, file_type="auto") self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("csv") def test_csv_fetch(self): csv_path = TestDataResolver().get_filename("1.csv") with open(csv_path, "rb") as fh: csv_metadata = self._upload_and_get_details(fh, api="fetch", ext="csv", to_posix_lines=True) self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("csv") def test_csv_sniff_fetch(self): csv_path = TestDataResolver().get_filename("1.csv") with open(csv_path, "rb") as fh: csv_metadata = self._upload_and_get_details(fh, api="fetch", ext="auto", to_posix_lines=True) self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("tiff") def test_image_upload_auto(self): tiff_path = TestDataResolver().get_filename("1.tiff") with open(tiff_path, "rb") as fh: tiff_metadata = self._upload_and_get_details(fh, file_type="auto") self.assertEqual(tiff_metadata["file_ext"], "tiff")
[docs] @skip_without_datatype("velvet") def test_composite_datatype(self): with self.dataset_populator.test_history() as history_id: dataset = self._velvet_upload(history_id, extra_inputs={ "files_1|url_paste": "roadmaps content", "files_1|type": "upload_dataset", "files_2|url_paste": "log content", "files_2|type": "upload_dataset", }) roadmaps_content = self._get_roadmaps_content(history_id, dataset) assert roadmaps_content.strip() == "roadmaps content", roadmaps_content
[docs] @skip_without_datatype("velvet") def test_composite_datatype_space_to_tab(self): # Like previous test but set one upload with space_to_tab to True to # verify that works. with self.dataset_populator.test_history() as history_id: dataset = self._velvet_upload(history_id, extra_inputs={ "files_1|url_paste": "roadmaps content", "files_1|type": "upload_dataset", "files_1|space_to_tab": "Yes", "files_2|url_paste": "log content", "files_2|type": "upload_dataset", }) roadmaps_content = self._get_roadmaps_content(history_id, dataset) assert roadmaps_content.strip() == "roadmaps\tcontent", roadmaps_content
[docs] @skip_without_datatype("velvet") def test_composite_datatype_posix_lines(self): # Like previous test but set one upload with space_to_tab to True to # verify that works. with self.dataset_populator.test_history() as history_id: dataset = self._velvet_upload(history_id, extra_inputs={ "files_1|url_paste": "roadmaps\rcontent", "files_1|type": "upload_dataset", "files_1|space_to_tab": "Yes", "files_2|url_paste": "log\rcontent", "files_2|type": "upload_dataset", }) roadmaps_content = self._get_roadmaps_content(history_id, dataset) assert roadmaps_content.strip() == "roadmaps\ncontent", roadmaps_content
[docs] @skip_without_datatype("isa-tab") def test_composite_datatype_isatab(self): isatab_zip_path = TestDataResolver().get_filename("MTBLS6.zip") details = self._upload_and_get_details(open(isatab_zip_path, "rb"), file_type="isa-tab") assert details["state"] == "ok" assert details["file_ext"] == "isa-tab", details assert details["file_size"] == 85, details
[docs] def test_upload_dbkey(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, "Test123", dbkey="hg19") run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert datasets[0].get("genome_build") == "hg19", datasets[0]
[docs] @uses_test_history(require_new=False) def test_fetch_bam_file(self, history_id): bam_path = TestDataResolver().get_filename("1.bam") with open(bam_path, "rb") as fh: details = self._upload_and_get_details(fh, api="fetch", history_id=history_id, assert_ok=False) assert details["state"] == "ok" assert details["file_ext"] == "bam", details
[docs] def test_upload_bam_file(self): bam_path = TestDataResolver().get_filename("1.bam") with open(bam_path, "rb") as fh: details = self._upload_and_get_details(fh, file_type="auto") assert details["state"] == "ok" assert details["file_ext"] == "bam", details
[docs] def test_fetch_metadata(self): table = ONE_TO_SIX_WITH_SPACES details = self._upload_and_get_details(table, api='fetch', dbkey="hg19", info="cool upload", tags=["name:data", "group:type:paired-end"]) assert details.get("genome_build") == "hg19" assert details.get("misc_info") == "cool upload", details tags = details.get("tags") assert len(tags) == 2, details assert "group:type:paired-end" in tags assert "name:data" in tags
[docs] def test_upload_multiple_files_1(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, "Test123", dbkey="hg19", extra_inputs={ "files_1|url_paste": "SecondOutputContent", "files_1|NAME": "SecondOutputName", "files_1|file_type": "tabular", "files_1|dbkey": "hg18", "file_count": "2", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 2, datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0]) assert content.strip() == "Test123" assert datasets[0]["file_ext"] == "txt" assert datasets[0]["genome_build"] == "hg19", datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1]) assert content.strip() == "SecondOutputContent" assert datasets[1]["file_ext"] == "tabular" assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_2(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, "Test123", file_type="tabular", dbkey="hg19", extra_inputs={ "files_1|url_paste": "SecondOutputContent", "files_1|NAME": "SecondOutputName", "files_1|file_type": "txt", "files_1|dbkey": "hg18", "file_count": "2", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 2, datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0]) assert content.strip() == "Test123" assert datasets[0]["file_ext"] == "tabular", datasets assert datasets[0]["genome_build"] == "hg19", datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1]) assert content.strip() == "SecondOutputContent" assert datasets[1]["file_ext"] == "txt" assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_3(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, "Test123", file_type="tabular", dbkey="hg19", extra_inputs={ "files_0|file_type": "txt", "files_0|dbkey": "hg18", "files_1|url_paste": "SecondOutputContent", "files_1|NAME": "SecondOutputName", "files_1|file_type": "txt", "files_1|dbkey": "hg18", "file_count": "2", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 2, datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0]) assert content.strip() == "Test123" assert datasets[0]["file_ext"] == "txt", datasets assert datasets[0]["genome_build"] == "hg18", datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1]) assert content.strip() == "SecondOutputContent" assert datasets[1]["file_ext"] == "txt" assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_no_dbkey(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, "Test123", file_type="tabular", dbkey=None, extra_inputs={ "files_0|file_type": "txt", "files_1|url_paste": "SecondOutputContent", "files_1|NAME": "SecondOutputName", "files_1|file_type": "txt", "file_count": "2", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 2, datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0]) assert content.strip() == "Test123" assert datasets[0]["file_ext"] == "txt", datasets assert datasets[0]["genome_build"] == "?", datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1]) assert content.strip() == "SecondOutputContent" assert datasets[1]["file_ext"] == "txt" assert datasets[1]["genome_build"] == "?", datasets
[docs] def test_upload_multiple_files_space_to_tab(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, content=ONE_TO_SIX_WITH_SPACES, file_type="tabular", dbkey="hg19", extra_inputs={ "files_0|file_type": "txt", "files_0|space_to_tab": "Yes", "files_1|url_paste": ONE_TO_SIX_WITH_SPACES, "files_1|NAME": "SecondOutputName", "files_1|file_type": "txt", "files_2|url_paste": ONE_TO_SIX_WITH_SPACES, "files_2|NAME": "ThirdOutputName", "files_2|file_type": "txt", "files_2|space_to_tab": "Yes", "file_count": "3", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 3, datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0]) assert content == ONE_TO_SIX_WITH_TABS content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1]) assert content == ONE_TO_SIX_WITH_SPACES content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[2]) assert content == ONE_TO_SIX_WITH_TABS
[docs] def test_multiple_files_posix_lines(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, content=ONE_TO_SIX_ON_WINDOWS, file_type="tabular", dbkey="hg19", extra_inputs={ "files_0|file_type": "txt", "files_0|to_posix_lines": "Yes", "files_1|url_paste": ONE_TO_SIX_ON_WINDOWS, "files_1|NAME": "SecondOutputName", "files_1|file_type": "txt", "files_1|to_posix_lines": None, "files_2|url_paste": ONE_TO_SIX_ON_WINDOWS, "files_2|NAME": "ThirdOutputName", "files_2|file_type": "txt", "file_count": "3", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 3, datasets content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0]) assert content == ONE_TO_SIX_WITH_TABS content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1]) assert content == ONE_TO_SIX_ON_WINDOWS content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[2]) assert content == ONE_TO_SIX_WITH_TABS
[docs] def test_upload_force_composite(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.upload_payload(history_id, "Test123", extra_inputs={ "files_1|url_paste": "CompositeContent", "files_1|NAME": "composite", "file_count": "2", "force_composite": "True", } ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) dataset = run_response.json()["outputs"][0] content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset) assert content.strip() == "Test123" extra_files = self.dataset_populator.get_history_dataset_extra_files(history_id, dataset_id=dataset["id"]) assert len(extra_files) == 1, extra_files # [{u'path': u'1', u'class': u'File'}] extra_file = extra_files[0] assert extra_file["path"] == "composite" assert extra_file["class"] == "File"
[docs] @skip_if_site_down("https://usegalaxy.org") def test_upload_from_invalid_url(self): history_id, new_dataset = self._upload('https://usegalaxy.org/bla123', assert_ok=False) dataset_details = self.dataset_populator.get_history_dataset_details(history_id, dataset_id=new_dataset["id"], assert_ok=False) assert dataset_details['state'] == 'error', "expected dataset state to be 'error', but got '%s'" % dataset_details['state']
[docs] @skip_if_site_down("https://usegalaxy.org") def test_upload_from_valid_url(self): history_id, new_dataset = self._upload('https://usegalaxy.org/api/version') self.dataset_populator.get_history_dataset_details(history_id, dataset_id=new_dataset["id"], assert_ok=True)
[docs] def test_upload_and_validate_invalid(self): path = TestDataResolver().get_filename("1.fastqsanger") with open(path, "rb") as fh: metadata = self._upload_and_get_details(fh, file_type="fastqcssanger") assert "validated_state" in metadata assert metadata['validated_state'] == 'unknown' history_id = metadata["history_id"] dataset_id = metadata["id"] terminal_validated_state = self.dataset_populator.validate_dataset_and_wait(history_id, dataset_id) assert terminal_validated_state == 'invalid', terminal_validated_state
[docs] def test_upload_and_validate_valid(self): path = TestDataResolver().get_filename("1.fastqsanger") with open(path, "rb") as fh: metadata = self._upload_and_get_details(fh, file_type="fastqsanger") assert "validated_state" in metadata assert metadata['validated_state'] == 'unknown' history_id = metadata["history_id"] dataset_id = metadata["id"] terminal_validated_state = self.dataset_populator.validate_dataset_and_wait(history_id, dataset_id) assert terminal_validated_state == 'ok', terminal_validated_state
def _velvet_upload(self, history_id, extra_inputs): payload = self.dataset_populator.upload_payload( history_id, "sequences content", file_type="velvet", extra_inputs=extra_inputs, ) run_response = self.dataset_populator.tools_post(payload) self.dataset_populator.wait_for_tool_run(history_id, run_response) datasets = run_response.json()["outputs"] assert len(datasets) == 1 dataset = datasets[0] return dataset def _get_roadmaps_content(self, history_id, dataset): roadmaps_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset, filename="Roadmaps") return roadmaps_content def _upload_and_get_content(self, content, **upload_kwds): history_id, new_dataset = self._upload(content, **upload_kwds) return self.dataset_populator.get_history_dataset_content(history_id, dataset=new_dataset) def _upload_and_get_details(self, content, **upload_kwds): history_id, new_dataset = self._upload(content, **upload_kwds) assert_ok = upload_kwds.get("assert_ok", True) return self.dataset_populator.get_history_dataset_details(history_id, dataset=new_dataset, assert_ok=assert_ok) def _upload(self, content, api="upload1", history_id=None, **upload_kwds): assert_ok = upload_kwds.get("assert_ok", True) history_id = history_id or self.dataset_populator.new_history() if api == "upload1": new_dataset = self.dataset_populator.new_dataset(history_id, content=content, **upload_kwds) else: assert api == "fetch" element = dict(src="files", **upload_kwds) target = { "destination": {"type": "hdas"}, "elements": [element], } targets = json.dumps([target]) payload = { "history_id": history_id, "targets": targets, "__files": {"files_0|file_data": content} } new_dataset = self.dataset_populator.fetch(payload, assert_ok=assert_ok).json()["outputs"][0] self.dataset_populator.wait_for_history(history_id, assert_ok=assert_ok) return history_id, new_dataset