Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_tools_upload
import json
from galaxy.tool_util.verify.test_data import TestDataResolver
from galaxy_test.base.constants import (
ONE_TO_SIX_ON_WINDOWS,
ONE_TO_SIX_WITH_SPACES,
ONE_TO_SIX_WITH_TABS,
ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE,
)
from galaxy_test.base.populators import (
DatasetPopulator,
skip_if_site_down,
skip_without_datatype,
stage_inputs,
uses_test_history,
)
from ._framework import ApiTestCase
[docs]class ToolsUploadTestCase(ApiTestCase):
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
[docs] def test_upload1_paste(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, 'Hello World')
create_response = self._post("tools", data=payload)
self._assert_has_keys(create_response.json(), 'outputs')
[docs] def test_upload1_paste_bad_datatype(self):
# Check that you get a nice message if you upload an incorrect datatype
with self.dataset_populator.test_history() as history_id:
file_type = "johnsawesomebutfakedatatype"
payload = self.dataset_populator.upload_payload(history_id, 'Hello World', file_type=file_type)
create = self._post("tools", data=payload).json()
self._assert_has_keys(create, 'err_msg')
assert file_type in create['err_msg']
# upload1 rewrites content with posix lines by default but this can be disabled by setting
# to_posix_lines=None in the request. Newer fetch API does not do this by default prefering
# to keep content unaltered if possible but it can be enabled with a simple JSON boolean switch
# of the same name (to_posix_lines).
[docs] def test_upload_posix_newline_fixes_by_default(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content)
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_fetch_posix_unaltered(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content, api="fetch")
self.assertEqual(result_content, ONE_TO_SIX_ON_WINDOWS)
[docs] def test_upload_disable_posix_fix(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content, to_posix_lines=None)
self.assertEqual(result_content, windows_content)
[docs] def test_fetch_post_lines_option(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content, api="fetch", to_posix_lines=True)
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
# Test how trailing new lines are added
# - upload1 adds by default because to_posix_lines is on by default
# - fetch doesn't add by default because to_posix_lines is off by default
# - fetch does add trailing newline if to_posix_lines is enabled
[docs] def test_post_lines_trailing(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content)
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_post_lines_trailing_off(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content, to_posix_lines=False)
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE)
[docs] def test_fetch_post_lines_trailing_off_by_default(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content, api="fetch")
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE)
[docs] def test_fetch_post_lines_trailing_if_to_posix(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content, api="fetch", to_posix_lines=True)
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_upload_tab_to_space_off_by_default(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table)
self.assertEqual(result_content, table)
[docs] def test_fetch_tab_to_space_off_by_default(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table, api='fetch')
self.assertEqual(result_content, table)
[docs] def test_upload_tab_to_space(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table, space_to_tab="Yes")
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_fetch_tab_to_space(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table, api="fetch", space_to_tab=True)
self.assertEqual(result_content, ONE_TO_SIX_WITH_TABS)
[docs] def test_fetch_compressed_with_explicit_type(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", ext="fastqsanger.gz")
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz"
[docs] def test_fetch_compressed_default(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", assert_ok=False)
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz", details
[docs] @uses_test_history(require_new=True)
def test_fetch_compressed_auto_decompress_target(self, history_id):
# TODO: this should definitely be fixed to allow auto decompression via that API.
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh,
api="fetch",
history_id=history_id,
assert_ok=False,
auto_decompress=True)
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz", details
[docs] def test_upload_decompress_off_with_auto_by_default(self):
# UNSTABLE_FLAG: This might default to a bed.gz datatype in the future.
bedgz_path = TestDataResolver().get_filename("4.bed.gz")
with open(bedgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="auto")
assert details["state"] == "ok"
assert details["file_ext"] == "bed", details
[docs] def test_upload_decompresses_if_uncompressed_type_selected(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="fastqsanger")
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger", details
assert details["file_size"] == 178, details
[docs] def test_upload_decompress_off_if_compressed_type_selected(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="fastqsanger.gz")
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz", details
assert details["file_size"] == 161, details
[docs] def test_upload_auto_decompress_off(self):
# UNSTABLE_FLAG: This might default to a bed.gz datatype in the future.
bedgz_path = TestDataResolver().get_filename("4.bed.gz")
with open(bedgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="auto", assert_ok=False, auto_decompress=False)
assert details["file_ext"] == "binary", details
[docs] @uses_test_history(require_new=True)
def test_fetch_compressed_with_auto(self, history_id):
# UNSTABLE_FLAG: This might default to a bed.gz datatype in the future.
# TODO: this should definitely be fixed to allow auto decompression via that API.
bedgz_path = TestDataResolver().get_filename("4.bed.gz")
with open(bedgz_path, "rb") as fh:
details = self._upload_and_get_details(fh,
api="fetch",
history_id=history_id,
auto_decompress=True,
assert_ok=False)
assert details["state"] == "ok"
assert details["file_ext"] == "bed"
[docs] @skip_without_datatype("rdata")
def test_rdata_not_decompressed(self):
# Prevent regression of https://github.com/galaxyproject/galaxy/issues/753
rdata_path = TestDataResolver().get_filename("1.RData")
with open(rdata_path, "rb") as fh:
rdata_metadata = self._upload_and_get_details(fh, file_type="auto")
self.assertEqual(rdata_metadata["file_ext"], "rdata")
[docs] @skip_without_datatype("csv")
def test_csv_upload(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, file_type="csv")
self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("csv")
def test_csv_upload_auto(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, file_type="auto")
self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("csv")
def test_csv_fetch(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, api="fetch", ext="csv", to_posix_lines=True)
self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("csv")
def test_csv_sniff_fetch(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, api="fetch", ext="auto", to_posix_lines=True)
self.assertEqual(csv_metadata["file_ext"], "csv")
[docs] @skip_without_datatype("tiff")
def test_image_upload_auto(self):
tiff_path = TestDataResolver().get_filename("1.tiff")
with open(tiff_path, "rb") as fh:
tiff_metadata = self._upload_and_get_details(fh, file_type="auto")
self.assertEqual(tiff_metadata["file_ext"], "tiff")
[docs] @uses_test_history(require_new=False)
def test_newlines_stage_fetch(self, history_id):
job = {
"input1": {
"class": "File",
"format": "txt",
"path": "test-data/simple_line_no_newline.txt",
}
}
inputs, datasets = stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False)
dataset = datasets[0][0]
content = self.dataset_populator.get_history_dataset_content(
history_id=history_id,
dataset=dataset
)
# By default this appends the newline.
self.assertEqual(content, "This is a line of text.\n")
[docs] @uses_test_history(require_new=False)
def test_newlines_stage_fetch_configured(self, history_id):
job = {
"input1": {
"class": "File",
"format": "txt",
"path": "test-data/simple_line_no_newline.txt",
}
}
inputs, datasets = stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False, to_posix_lines=False)
dataset = datasets[0][0]
content = self.dataset_populator.get_history_dataset_content(
history_id=history_id,
dataset=dataset
)
# By default this appends the newline.
self.assertEqual(content, "This is a line of text.")
[docs] @uses_test_history(require_new=False)
def test_upload_multiple_mixed_success(self, history_id):
destination = {"type": "hdas"}
targets = [{
"destination": destination,
"items": [
{
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed"
},
{
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/12.bed"
}
]
}]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
assert len(outputs) == 2
output0 = outputs[0]
output1 = outputs[1]
output0 = self.dataset_populator.get_history_dataset_details(history_id, dataset=output0, assert_ok=False)
output1 = self.dataset_populator.get_history_dataset_details(history_id, dataset=output1, assert_ok=False)
assert output0["state"] == "ok"
assert output1["state"] == "error"
[docs] @skip_without_datatype("velvet")
def test_composite_datatype(self):
with self.dataset_populator.test_history() as history_id:
dataset = self._velvet_upload(history_id, extra_inputs={
"files_1|url_paste": "roadmaps content",
"files_1|type": "upload_dataset",
"files_2|url_paste": "log content",
"files_2|type": "upload_dataset",
})
roadmaps_content = self._get_roadmaps_content(history_id, dataset)
assert roadmaps_content.strip() == "roadmaps content", roadmaps_content
[docs] @skip_without_datatype("velvet")
@uses_test_history(require_new=False)
def test_composite_datatype_fetch(self, history_id):
destination = {"type": "hdas"}
targets = [{
"destination": destination,
"items": [{
"src": "composite",
"ext": "velvet",
"composite": {
"items": [
{"src": "pasted", "paste_content": "sequences content"},
{"src": "pasted", "paste_content": "roadmaps content"},
{"src": "pasted", "paste_content": "log content"},
]
},
}],
}]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
assert len(outputs) == 1
output = outputs[0]
roadmaps_content = self._get_roadmaps_content(history_id, output)
assert roadmaps_content.strip() == "roadmaps content", roadmaps_content
[docs] @skip_without_datatype("velvet")
@uses_test_history(require_new=False)
def test_composite_datatype_stage_fetch(self, history_id):
job = {
"input1": {
"class": "File",
"format": "velvet",
"composite_data": [
"test-data/simple_line.txt",
"test-data/simple_line_alternative.txt",
"test-data/simple_line_x2.txt",
]
}
}
inputs, datsets = stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False)
[docs] @skip_without_datatype("velvet")
@uses_test_history(require_new=False)
def test_composite_datatype_stage_upload1(self, history_id):
job = {
"input1": {
"class": "File",
"format": "velvet",
"composite_data": [
"test-data/simple_line.txt",
"test-data/simple_line_alternative.txt",
"test-data/simple_line_x2.txt",
]
}
}
inputs, datsets = stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False, use_fetch_api=False)
[docs] @skip_without_datatype("velvet")
@uses_test_history(require_new=False)
def test_composite_datatype_space_to_tab(self, history_id):
# Like previous test but set one upload with space_to_tab to True to
# verify that works.
dataset = self._velvet_upload(history_id, extra_inputs={
"files_1|url_paste": "roadmaps content",
"files_1|type": "upload_dataset",
"files_1|space_to_tab": "Yes",
"files_2|url_paste": "log content",
"files_2|type": "upload_dataset",
})
roadmaps_content = self._get_roadmaps_content(history_id, dataset)
assert roadmaps_content.strip() == "roadmaps\tcontent", roadmaps_content
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_posix_lines(self):
# Like previous test but set one upload with space_to_tab to True to
# verify that works.
with self.dataset_populator.test_history() as history_id:
dataset = self._velvet_upload(history_id, extra_inputs={
"files_1|url_paste": "roadmaps\rcontent",
"files_1|type": "upload_dataset",
"files_1|space_to_tab": "Yes",
"files_2|url_paste": "log\rcontent",
"files_2|type": "upload_dataset",
})
roadmaps_content = self._get_roadmaps_content(history_id, dataset)
assert roadmaps_content.strip() == "roadmaps\ncontent", roadmaps_content
[docs] @skip_without_datatype("isa-tab")
def test_composite_datatype_isatab(self):
isatab_zip_path = TestDataResolver().get_filename("MTBLS6.zip")
details = self._upload_and_get_details(open(isatab_zip_path, "rb"), file_type="isa-tab")
assert details["state"] == "ok"
assert details["file_ext"] == "isa-tab", details
assert details["file_size"] == 85, details
[docs] @uses_test_history(require_new=False)
def test_upload_composite_as_tar(self, history_id):
tar_path = self.test_data_resolver.get_filename("testdir.tar")
with open(tar_path, "rb") as tar_f:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
extra_inputs={
"files_1|file_data": tar_f,
"files_1|NAME": "composite",
"file_count": "2",
"force_composite": "True",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
dataset = run_response.json()["outputs"][0]
self._check_testdir_composite(dataset, history_id)
[docs] @uses_test_history(require_new=False)
def test_upload_composite_as_tar_fetch(self, history_id):
tar_path = self.test_data_resolver.get_filename("testdir.tar")
with open(tar_path, "rb") as tar_f:
destination = {"type": "hdas"}
targets = [{
"destination": destination,
"items": [{
"src": "pasted",
"paste_content": "Test123\n",
"ext": "txt",
"extra_files": {
"items_from": "archive",
"src": "files",
# Prevent Galaxy from checking for a single file in
# a directory and re-interpreting the archive
"fuzzy_root": False,
}
}],
}]
payload = {
"history_id": history_id,
"targets": json.dumps(targets),
}
payload["__files"] = {"files_0|file_data": tar_f}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
assert len(outputs) == 1
output = outputs[0]
self._check_testdir_composite(output, history_id)
def _check_testdir_composite(self, dataset, history_id):
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset)
assert content.strip() == "Test123"
extra_files = self.dataset_populator.get_history_dataset_extra_files(history_id, dataset_id=dataset["id"])
assert len(extra_files) == 5, extra_files
expected_contents = {
"testdir": "Directory",
"testdir/c": "Directory",
"testdir/a": "File",
"testdir/b": "File",
"testdir/c/d": "File",
}
found_files = set()
for extra_file in extra_files:
path = extra_file["path"]
assert path in expected_contents
assert extra_file["class"] == expected_contents[path]
found_files.add(path)
assert len(found_files) == 5, found_files
[docs] @uses_test_history(require_new=False)
def test_upload_composite_from_bad_tar(self, history_id):
tar_path = self.test_data_resolver.get_filename("unsafe.tar")
with open(tar_path, "rb") as tar_f:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
extra_inputs={
"files_1|file_data": tar_f,
"files_1|NAME": "composite",
"file_count": "2",
"force_composite": "True",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response, assert_ok=False)
dataset = run_response.json()["outputs"][0]
details = self.dataset_populator.get_history_dataset_details(history_id, dataset=dataset, assert_ok=False)
assert details["state"] == "error"
[docs] def test_upload_dbkey(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123", dbkey="hg19")
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert datasets[0].get("genome_build") == "hg19", datasets[0]
[docs] @uses_test_history(require_new=False)
def test_fetch_bam_file(self, history_id):
bam_path = TestDataResolver().get_filename("1.bam")
with open(bam_path, "rb") as fh:
details = self._upload_and_get_details(fh,
api="fetch",
history_id=history_id,
assert_ok=False)
assert details["state"] == "ok"
assert details["file_ext"] == "bam", details
[docs] def test_upload_bam_file(self):
bam_path = TestDataResolver().get_filename("1.bam")
with open(bam_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="auto")
assert details["state"] == "ok"
assert details["file_ext"] == "bam", details
[docs] def test_fetch_metadata(self):
table = ONE_TO_SIX_WITH_SPACES
details = self._upload_and_get_details(table, api='fetch', dbkey="hg19", info="cool upload", tags=["name:data", "group:type:paired-end"])
assert details.get("genome_build") == "hg19"
assert details.get("misc_info") == "cool upload", details
tags = details.get("tags")
assert len(tags) == 2, details
assert "group:type:paired-end" in tags
assert "name:data" in tags
[docs] def test_upload_multiple_files_1(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
dbkey="hg19",
extra_inputs={
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "tabular",
"files_1|dbkey": "hg18",
"file_count": "2",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "txt"
assert datasets[0]["genome_build"] == "hg19", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "tabular"
assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_2(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_1|dbkey": "hg18",
"file_count": "2",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "tabular", datasets
assert datasets[0]["genome_build"] == "hg19", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "txt"
assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_3(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_0|file_type": "txt",
"files_0|dbkey": "hg18",
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_1|dbkey": "hg18",
"file_count": "2",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "txt", datasets
assert datasets[0]["genome_build"] == "hg18", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "txt"
assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_no_dbkey(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
file_type="tabular",
dbkey=None,
extra_inputs={
"files_0|file_type": "txt",
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"file_count": "2",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "txt", datasets
assert datasets[0]["genome_build"] == "?", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "txt"
assert datasets[1]["genome_build"] == "?", datasets
[docs] def test_upload_multiple_files_space_to_tab(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id,
content=ONE_TO_SIX_WITH_SPACES,
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_0|file_type": "txt",
"files_0|space_to_tab": "Yes",
"files_1|url_paste": ONE_TO_SIX_WITH_SPACES,
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_2|url_paste": ONE_TO_SIX_WITH_SPACES,
"files_2|NAME": "ThirdOutputName",
"files_2|file_type": "txt",
"files_2|space_to_tab": "Yes",
"file_count": "3",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 3, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content == ONE_TO_SIX_WITH_TABS
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content == ONE_TO_SIX_WITH_SPACES
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[2])
assert content == ONE_TO_SIX_WITH_TABS
[docs] def test_multiple_files_posix_lines(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id,
content=ONE_TO_SIX_ON_WINDOWS,
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_0|file_type": "txt",
"files_0|to_posix_lines": "Yes",
"files_1|url_paste": ONE_TO_SIX_ON_WINDOWS,
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_1|to_posix_lines": None,
"files_2|url_paste": ONE_TO_SIX_ON_WINDOWS,
"files_2|NAME": "ThirdOutputName",
"files_2|file_type": "txt",
"file_count": "3",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 3, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content == ONE_TO_SIX_WITH_TABS
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content == ONE_TO_SIX_ON_WINDOWS
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[2])
assert content == ONE_TO_SIX_WITH_TABS
[docs] def test_upload_force_composite(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123",
extra_inputs={
"files_1|url_paste": "CompositeContent",
"files_1|NAME": "composite",
"file_count": "2",
"force_composite": "True",
}
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
dataset = run_response.json()["outputs"][0]
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset)
assert content.strip() == "Test123"
extra_files = self.dataset_populator.get_history_dataset_extra_files(history_id, dataset_id=dataset["id"])
assert len(extra_files) == 1, extra_files # [{u'path': u'1', u'class': u'File'}]
extra_file = extra_files[0]
assert extra_file["path"] == "composite"
assert extra_file["class"] == "File"
[docs] @skip_if_site_down("https://usegalaxy.org")
def test_upload_from_invalid_url(self):
history_id, new_dataset = self._upload('https://usegalaxy.org/bla123', assert_ok=False)
dataset_details = self.dataset_populator.get_history_dataset_details(history_id, dataset_id=new_dataset["id"], assert_ok=False)
assert dataset_details['state'] == 'error', "expected dataset state to be 'error', but got '%s'" % dataset_details['state']
[docs] @skip_if_site_down("https://usegalaxy.org")
def test_upload_from_valid_url(self):
history_id, new_dataset = self._upload('https://usegalaxy.org/api/version')
self.dataset_populator.get_history_dataset_details(history_id, dataset_id=new_dataset["id"], assert_ok=True)
[docs] def test_upload_and_validate_invalid(self):
path = TestDataResolver().get_filename("1.fastqsanger")
with open(path, "rb") as fh:
metadata = self._upload_and_get_details(fh, file_type="fastqcssanger")
assert "validated_state" in metadata
assert metadata['validated_state'] == 'unknown'
history_id = metadata["history_id"]
dataset_id = metadata["id"]
terminal_validated_state = self.dataset_populator.validate_dataset_and_wait(history_id, dataset_id)
assert terminal_validated_state == 'invalid', terminal_validated_state
[docs] def test_upload_and_validate_valid(self):
path = TestDataResolver().get_filename("1.fastqsanger")
with open(path, "rb") as fh:
metadata = self._upload_and_get_details(fh, file_type="fastqsanger")
assert "validated_state" in metadata
assert metadata['validated_state'] == 'unknown'
history_id = metadata["history_id"]
dataset_id = metadata["id"]
terminal_validated_state = self.dataset_populator.validate_dataset_and_wait(history_id, dataset_id)
assert terminal_validated_state == 'ok', terminal_validated_state
def _velvet_upload(self, history_id, extra_inputs):
payload = self.dataset_populator.upload_payload(
history_id,
"sequences content",
file_type="velvet",
extra_inputs=extra_inputs,
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 1
dataset = datasets[0]
return dataset
def _get_roadmaps_content(self, history_id, dataset):
roadmaps_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset, filename="Roadmaps")
return roadmaps_content
def _upload_and_get_content(self, content, **upload_kwds):
history_id, new_dataset = self._upload(content, **upload_kwds)
return self.dataset_populator.get_history_dataset_content(history_id, dataset=new_dataset)
def _upload_and_get_details(self, content, **upload_kwds):
history_id, new_dataset = self._upload(content, **upload_kwds)
assert_ok = upload_kwds.get("assert_ok", True)
return self.dataset_populator.get_history_dataset_details(history_id, dataset=new_dataset, assert_ok=assert_ok)
def _upload(self, content, api="upload1", history_id=None, **upload_kwds):
assert_ok = upload_kwds.get("assert_ok", True)
history_id = history_id or self.dataset_populator.new_history()
if api == "upload1":
new_dataset = self.dataset_populator.new_dataset(history_id, content=content, **upload_kwds)
else:
assert api == "fetch"
element = dict(src="files", **upload_kwds)
target = {
"destination": {"type": "hdas"},
"elements": [element],
}
targets = json.dumps([target])
payload = {
"history_id": history_id,
"targets": targets,
"__files": {"files_0|file_data": content}
}
new_dataset = self.dataset_populator.fetch(payload, assert_ok=assert_ok).json()["outputs"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=assert_ok)
return history_id, new_dataset