Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_tools_upload
import json
import os
import tempfile
import urllib.parse
from base64 import b64encode
from typing import cast
import pytest
from tusclient import client
from galaxy.tool_util.verify.test_data import TestDataResolver
from galaxy.util import UNKNOWN
from galaxy.util.compression_utils import decompress_bytes_to_directory
from galaxy.util.hash_util import md5_hash_file
from galaxy.util.unittest_utils import (
skip_if_github_down,
skip_if_site_down,
)
from galaxy_test.base.constants import (
ONE_TO_SIX_ON_WINDOWS,
ONE_TO_SIX_WITH_SPACES,
ONE_TO_SIX_WITH_SPACES_ON_WINDOWS,
ONE_TO_SIX_WITH_TABS,
ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE,
)
from galaxy_test.base.populators import (
DatasetPopulator,
skip_without_datatype,
stage_inputs,
)
from ._framework import ApiTestCase
B64_FOR_1_2_3 = b64encode(b"1 2 3").decode("utf-8")
URI_FOR_1_2_3 = f"base64://{B64_FOR_1_2_3}"
EXPECTED_TAR_CONTENTS = {
"testdir": "Directory",
"testdir/c": "Directory",
"testdir/a": "File",
"testdir/b": "File",
"testdir/c/d": "File",
}
[docs]class TestToolsUpload(ApiTestCase):
dataset_populator: DatasetPopulator
[docs] def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
[docs] def test_upload1_paste(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Hello World")
create_response = self._post("tools", data=payload)
self._assert_has_keys(create_response.json(), "outputs")
[docs] def test_upload1_paste_bad_datatype(self):
# Check that you get a nice message if you upload an incorrect datatype
with self.dataset_populator.test_history() as history_id:
file_type = "johnsawesomebutfakedatatype"
payload = self.dataset_populator.upload_payload(history_id, "Hello World", file_type=file_type)
create = self._post("tools", data=payload).json()
self._assert_has_keys(create, "err_msg")
assert file_type in create["err_msg"]
# upload1 rewrites content with posix lines by default but this can be disabled by setting
# to_posix_lines=None in the request. Newer fetch API does not do this by default prefering
# to keep content unaltered if possible but it can be enabled with a simple JSON boolean switch
# of the same name (to_posix_lines).
[docs] def test_upload_posix_newline_fixes_by_default(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content)
assert result_content == ONE_TO_SIX_WITH_TABS
[docs] def test_fetch_posix_unaltered(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content, api="fetch")
assert result_content == ONE_TO_SIX_ON_WINDOWS
[docs] def test_upload_disable_posix_fix(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content, to_posix_lines=None)
assert result_content == windows_content
[docs] def test_fetch_post_lines_option(self):
windows_content = ONE_TO_SIX_ON_WINDOWS
result_content = self._upload_and_get_content(windows_content, api="fetch", to_posix_lines=True)
assert result_content == ONE_TO_SIX_WITH_TABS
# Test how trailing new lines are added
# - upload1 adds by default because to_posix_lines is on by default
# - fetch doesn't add by default because to_posix_lines is off by default
# - fetch does add trailing newline if to_posix_lines is enabled
[docs] def test_post_lines_trailing(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content)
assert result_content == ONE_TO_SIX_WITH_TABS
[docs] def test_post_lines_trailing_off(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content, to_posix_lines=False)
assert result_content == ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
[docs] def test_fetch_post_lines_trailing_off_by_default(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content, api="fetch")
assert result_content == ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
[docs] def test_fetch_post_lines_trailing_if_to_posix(self):
input_content = ONE_TO_SIX_WITH_TABS_NO_TRAILING_NEWLINE
result_content = self._upload_and_get_content(input_content, api="fetch", to_posix_lines=True)
assert result_content == ONE_TO_SIX_WITH_TABS
[docs] def test_upload_tab_to_space_off_by_default(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table)
assert result_content == table
[docs] def test_fetch_tab_to_space_off_by_default(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table, api="fetch")
assert result_content == table
[docs] def test_upload_tab_to_space(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table, space_to_tab="Yes")
assert result_content == ONE_TO_SIX_WITH_TABS
[docs] def test_fetch_tab_to_space(self):
table = ONE_TO_SIX_WITH_SPACES
result_content = self._upload_and_get_content(table, api="fetch", space_to_tab=True)
assert result_content == ONE_TO_SIX_WITH_TABS
[docs] def test_fetch_tab_to_space_doesnt_swap_newlines(self):
table = ONE_TO_SIX_WITH_SPACES_ON_WINDOWS
result_content = self._upload_and_get_content(table, api="fetch", space_to_tab=True)
assert result_content == ONE_TO_SIX_ON_WINDOWS
[docs] def test_fetch_compressed_with_explicit_type(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", ext="fastqsanger.gz")
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz"
[docs] def test_fetch_compressed_default(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", assert_ok=False)
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz", details
[docs] @pytest.mark.require_new_history
def test_fetch_compressed_auto_decompress_target(self, history_id):
# TODO: this should definitely be fixed to allow auto decompression via that API.
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(
fh, api="fetch", history_id=history_id, assert_ok=False, auto_decompress=True
)
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz", details
[docs] def test_upload_decompress_off_with_auto_by_default(self):
# UNSTABLE_FLAG: This might default to a bed.gz datatype in the future.
bedgz_path = TestDataResolver().get_filename("4.bed.gz")
with open(bedgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="auto")
assert details["state"] == "ok"
assert details["file_ext"] == "bed", details
[docs] def test_upload_decompresses_if_uncompressed_type_selected(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="fastqsanger")
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger", details
assert details["file_size"] == 178, details
[docs] def test_upload_decompress_off_if_compressed_type_selected(self):
fastqgz_path = TestDataResolver().get_filename("1.fastqsanger.gz")
with open(fastqgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="fastqsanger.gz")
assert details["state"] == "ok"
assert details["file_ext"] == "fastqsanger.gz", details
assert details["file_size"] == 161, details
[docs] def test_upload_auto_decompress_off(self):
# UNSTABLE_FLAG: This might default to a bed.gz datatype in the future.
bedgz_path = TestDataResolver().get_filename("4.bed.gz")
with open(bedgz_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="auto", assert_ok=False, auto_decompress=False)
assert details["file_ext"] == "binary", details
[docs] @pytest.mark.require_new_history
def test_fetch_compressed_with_auto(self, history_id):
# UNSTABLE_FLAG: This might default to a bed.gz datatype in the future.
# TODO: this should definitely be fixed to allow auto decompression via that API.
bedgz_path = TestDataResolver().get_filename("4.bed.gz")
with open(bedgz_path, "rb") as fh:
details = self._upload_and_get_details(
fh, api="fetch", history_id=history_id, auto_decompress=True, assert_ok=False
)
assert details["state"] == "ok"
assert details["file_ext"] == "bed"
[docs] @skip_without_datatype("rdata")
def test_rdata_not_decompressed(self):
# Prevent regression of https://github.com/galaxyproject/galaxy/issues/753
rdata_path = TestDataResolver().get_filename("1.RData")
with open(rdata_path, "rb") as fh:
rdata_metadata = self._upload_and_get_details(fh, file_type="auto")
assert rdata_metadata["file_ext"] == "rdata"
[docs] @skip_without_datatype("csv")
def test_csv_upload(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, file_type="csv")
assert csv_metadata["file_ext"] == "csv"
[docs] @skip_without_datatype("csv")
def test_csv_upload_auto(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, file_type="auto")
assert csv_metadata["file_ext"] == "csv"
[docs] @skip_without_datatype("csv")
def test_csv_fetch(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, api="fetch", ext="csv", to_posix_lines=True)
assert csv_metadata["file_ext"] == "csv"
[docs] @skip_without_datatype("csv")
def test_csv_sniff_fetch(self):
csv_path = TestDataResolver().get_filename("1.csv")
with open(csv_path, "rb") as fh:
csv_metadata = self._upload_and_get_details(fh, api="fetch", ext="auto", to_posix_lines=True)
assert csv_metadata["file_ext"] == "csv"
[docs] @skip_without_datatype("tiff")
def test_image_upload_auto(self):
tiff_path = TestDataResolver().get_filename("1.tiff")
with open(tiff_path, "rb") as fh:
tiff_metadata = self._upload_and_get_details(fh, file_type="auto")
assert tiff_metadata["file_ext"] == "tiff"
[docs] def test_newlines_stage_fetch(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "txt",
"path": "test-data/simple_line_no_newline.txt",
"hashes": [{"hash_function": "SHA-1", "hash_value": "f030155d3459c233efd37e13bc1061c1dc744ebf"}],
}
}
inputs, datasets = stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False)
dataset = datasets[0]
content = self.dataset_populator.get_history_dataset_content(history_id=history_id, dataset=dataset)
# By default this appends the newline.
assert content == "This is a line of text.\n"
dataset = self.dataset_populator.get_history_dataset_details(history_id, content_id=dataset["id"])
assert dataset["hashes"][0]["hash_value"] == "f030155d3459c233efd37e13bc1061c1dc744ebf"
[docs] def test_stage_object(self, history_id: str) -> None:
job = {"input1": "randomstr"}
inputs, datasets = stage_inputs(
self.galaxy_interactor, history_id, job, use_path_paste=False, use_fetch_api=False
)
dataset = datasets[0]
content = self.dataset_populator.get_history_dataset_content(history_id=history_id, dataset=dataset)
assert content.strip() == '"randomstr"'
[docs] def test_stage_object_fetch(self, history_id: str) -> None:
job = {"input1": "randomstr"}
inputs, datasets = stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False)
dataset = datasets[0]
content = self.dataset_populator.get_history_dataset_content(history_id=history_id, dataset=dataset)
assert content == '"randomstr"'
[docs] def test_newlines_stage_fetch_configured(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "txt",
"path": "test-data/simple_line_no_newline.txt",
"dbkey": "hg19",
}
}
inputs, datasets = stage_inputs(
self.galaxy_interactor, history_id, job, use_path_paste=False, to_posix_lines=False
)
dataset = datasets[0]
content = self.dataset_populator.get_history_dataset_content(history_id=history_id, dataset=dataset)
# By default this appends the newline, but we disabled with 'to_posix_lines=False' above.
assert content == "This is a line of text."
details = self.dataset_populator.get_history_dataset_details(history_id=history_id, dataset=dataset)
assert details["genome_build"] == "hg19"
[docs] @skip_if_github_down
def test_stage_fetch_decompress_true(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "fasta",
"location": "https://github.com/galaxyproject/galaxy/blob/dev/test-data/1.fasta.gz?raw=true",
"decompress": True,
}
}
inputs, datasets = stage_inputs(
self.galaxy_interactor, history_id, job, use_path_paste=False, to_posix_lines=False
)
dataset = datasets[0]
content = self.dataset_populator.get_history_dataset_content(history_id=history_id, dataset=dataset)
assert content.startswith(">hg17")
[docs] @skip_if_github_down
def test_stage_fetch_decompress_false(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "fasta",
"location": "https://github.com/galaxyproject/galaxy/blob/dev/test-data/1.fasta.gz?raw=true",
"decompress": False,
}
}
inputs, datasets = stage_inputs(
self.galaxy_interactor, history_id, job, use_path_paste=False, to_posix_lines=False
)
dataset = datasets[0]
content = self.dataset_populator.get_history_dataset_content(history_id=history_id, dataset=dataset)
assert not content.startswith(">hg17")
[docs] @skip_if_github_down
def test_upload_multiple_mixed_success(self, history_id):
destination = {"type": "hdas"}
targets = [
{
"destination": destination,
"items": [
{"src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed"},
{
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/12.bed",
},
],
}
]
payload = {
"history_id": history_id,
"targets": targets,
}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
assert len(outputs) == 2
output0 = outputs[0]
output1 = outputs[1]
output0 = self.dataset_populator.get_history_dataset_details(history_id, dataset=output0, assert_ok=False)
output1 = self.dataset_populator.get_history_dataset_details(history_id, dataset=output1, assert_ok=False)
assert output0["state"] == "ok"
assert output1["state"] == "error"
[docs] @skip_if_github_down
def test_fetch_bam_file_from_url_with_extension_set(self, history_id):
item = {
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam",
"ext": "bam",
}
output = self.dataset_populator.fetch_hda(history_id, item)
self.dataset_populator.get_history_dataset_details(history_id, dataset=output, assert_ok=True)
[docs] @skip_if_github_down
def test_fetch_html_from_url(self, history_id):
destination = {"type": "hdas"}
targets = [
{
"destination": destination,
"items": [
{
"src": "url",
"url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/html_file.txt",
},
],
}
]
payload = {
"history_id": history_id,
"targets": targets,
}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
response = fetch_response.json()
output = response["outputs"][0]
job = response["jobs"][0]
self.dataset_populator.wait_for_job(job["id"])
dataset = self.dataset_populator.get_history_dataset_details(history_id, dataset=output, assert_ok=False)
assert dataset["state"] == "error"
assert dataset["name"] == "html_file.txt"
[docs] def test_abort_fetch_job(self, history_id):
# This should probably be an integration test that also verifies
# that the celery chord is properly canceled.
item = {
"src": "url",
"url": "https://httpstat.us/200?sleep=10000",
"ext": "txt",
}
destination = {"type": "hdas"}
targets = [
{
"destination": destination,
"items": [item],
}
]
payload = {
"history_id": history_id,
"targets": targets,
}
fetch_response = self.dataset_populator.fetch(payload, wait=False)
self._assert_status_code_is(fetch_response, 200)
response = fetch_response.json()
job_id = response["jobs"][0]["id"]
# Wait until state is running
self.dataset_populator.wait_for_job(job_id, ok_states=["running"])
cancel_response = self.dataset_populator.cancel_job(job_id)
self._assert_status_code_is(cancel_response, 200)
dataset = self.dataset_populator.get_history_dataset_details(
history_id, dataset_id=response["outputs"][0]["id"], assert_ok=False
)
assert dataset["file_size"] == 0
assert dataset["state"] == "discarded"
[docs] @skip_without_datatype("velvet")
def test_composite_datatype(self):
with self.dataset_populator.test_history() as history_id:
dataset = self._velvet_upload(
history_id,
extra_inputs={
"files_1|url_paste": "roadmaps content",
"files_1|type": "upload_dataset",
"files_2|url_paste": "log content",
"files_2|type": "upload_dataset",
},
)
roadmaps_content = self._get_roadmaps_content(history_id, dataset)
assert roadmaps_content.strip() == "roadmaps content", roadmaps_content
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_fetch(self, history_id):
item = {
"src": "composite",
"ext": "velvet",
"composite": {
"items": [
{"src": "pasted", "paste_content": "sequences content"},
{"src": "pasted", "paste_content": "roadmaps content"},
{"src": "pasted", "paste_content": "log content"},
]
},
}
output = self.dataset_populator.fetch_hda(history_id, item)
roadmaps_content = self._get_roadmaps_content(history_id, output)
assert roadmaps_content.strip() == "roadmaps content", roadmaps_content
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_stage_fetch(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "velvet",
"composite_data": [
"test-data/simple_line.txt",
"test-data/simple_line_alternative.txt",
"test-data/simple_line_x2.txt",
],
}
}
stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_pbed_stage_fetch(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "pbed",
"composite_data": [
"test-data/rgenetics.bim",
"test-data/rgenetics.bed",
"test-data/rgenetics.fam",
],
}
}
stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_stage_upload1(self, history_id: str) -> None:
job = {
"input1": {
"class": "File",
"format": "velvet",
"composite_data": [
"test-data/simple_line.txt",
"test-data/simple_line_alternative.txt",
"test-data/simple_line_x2.txt",
],
}
}
stage_inputs(self.galaxy_interactor, history_id, job, use_path_paste=False, use_fetch_api=False)
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_space_to_tab(self, history_id):
# Like previous test but set one upload with space_to_tab to True to
# verify that works.
dataset = self._velvet_upload(
history_id,
extra_inputs={
"files_1|url_paste": "roadmaps content",
"files_1|type": "upload_dataset",
"files_1|space_to_tab": "Yes",
"files_2|url_paste": "log content",
"files_2|type": "upload_dataset",
},
)
roadmaps_content = self._get_roadmaps_content(history_id, dataset)
assert roadmaps_content.strip() == "roadmaps\tcontent", roadmaps_content
[docs] @skip_without_datatype("velvet")
def test_composite_datatype_posix_lines(self):
# Like previous test but set one upload with space_to_tab to True to
# verify that works.
with self.dataset_populator.test_history() as history_id:
dataset = self._velvet_upload(
history_id,
extra_inputs={
"files_1|url_paste": "roadmaps\rcontent",
"files_1|type": "upload_dataset",
"files_1|space_to_tab": "Yes",
"files_2|url_paste": "log\rcontent",
"files_2|type": "upload_dataset",
},
)
roadmaps_content = self._get_roadmaps_content(history_id, dataset)
assert roadmaps_content.strip() == "roadmaps\ncontent", roadmaps_content
[docs] @skip_without_datatype("isa-tab")
def test_composite_datatype_isatab(self):
isatab_zip_path = TestDataResolver().get_filename("MTBLS6.zip")
details = self._upload_and_get_details(open(isatab_zip_path, "rb"), file_type="isa-tab")
assert details["state"] == "ok"
assert details["file_ext"] == "isa-tab", details
assert details["file_size"] == 85, details
[docs] def test_upload_composite_as_tar(self, history_id):
tar_path = self.test_data_resolver.get_filename("testdir.tar")
with open(tar_path, "rb") as tar_f:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
extra_inputs={
"files_1|file_data": tar_f,
"files_1|NAME": "composite",
"file_count": "2",
"force_composite": "True",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
dataset = run_response.json()["outputs"][0]
self._check_testdir_composite(dataset, history_id)
[docs] def test_upload_composite_as_tar_fetch(self, history_id):
tar_path = self.test_data_resolver.get_filename("testdir.tar")
with open(tar_path, "rb") as tar_f:
destination = {"type": "hdas"}
targets = [
{
"destination": destination,
"items": [
{
"src": "pasted",
"paste_content": "Test123\n",
"ext": "txt",
"extra_files": {
"items_from": "archive",
"src": "files",
# Prevent Galaxy from checking for a single file in
# a directory and re-interpreting the archive
"fuzzy_root": False,
},
}
],
}
]
payload = {
"history_id": history_id,
"targets": targets,
}
payload["__files"] = {"files_0|file_data": tar_f}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
assert len(outputs) == 1
output = outputs[0]
self._check_testdir_composite(output, history_id)
def _check_testdir_composite(self, dataset, history_id):
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset)
assert content.strip() == "Test123"
extra_files = self.dataset_populator.get_history_dataset_extra_files(history_id, dataset_id=dataset["id"])
assert len(extra_files) == 5, extra_files
found_files = set()
for extra_file in extra_files:
path = extra_file["path"]
assert path in EXPECTED_TAR_CONTENTS
assert extra_file["class"] == EXPECTED_TAR_CONTENTS[path]
found_files.add(path)
assert len(found_files) == 5, found_files
[docs] def test_upload_composite_from_bad_tar(self, history_id):
tar_path = self.test_data_resolver.get_filename("unsafe.tar")
with open(tar_path, "rb") as tar_f:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
extra_inputs={
"files_1|file_data": tar_f,
"files_1|NAME": "composite",
"file_count": "2",
"force_composite": "True",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response, assert_ok=False)
dataset = run_response.json()["outputs"][0]
details = self.dataset_populator.get_history_dataset_details(history_id, dataset=dataset, assert_ok=False)
assert details["state"] == "error"
[docs] def test_upload_tar_roundtrip(self, history_id):
testdir = TestDataResolver().get_filename("testdir.tar")
expected_size = os.path.getsize(testdir)
with open(testdir, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", history_id=history_id, assert_ok=True)
assert details["file_ext"] == "tar"
assert details["file_size"] == expected_size
content = cast(
bytes, self.dataset_populator.get_history_dataset_content(history_id, dataset=details, type="bytes")
)
# Make sure we got the expected content size.
assert len(content) == expected_size
# Make sure we get the expected contents.
dir_path = decompress_bytes_to_directory(content)
assert dir_path.endswith("testdir")
for path, entry_class in EXPECTED_TAR_CONTENTS.items():
path = os.path.join(dir_path, os.path.pardir, path)
if entry_class == "Directory":
assert os.path.isdir(path)
else:
assert os.path.isfile(path)
# Make sure the hash of the content matches the hash of the original file.
expected_hash = md5_hash_file(testdir)
assert expected_hash is not None
self._assert_content_matches_hash(content, expected_hash)
def _assert_content_matches_hash(self, content: bytes, expected_hash: str):
with tempfile.NamedTemporaryFile("wb") as temp:
temp.write(content)
temp.flush()
actual_hash = md5_hash_file(temp.name)
assert actual_hash == expected_hash
[docs] def test_upload_zip_roundtrip(self, history_id):
testdir = TestDataResolver().get_filename("testdir1.zip")
expected_size = os.path.getsize(testdir)
with open(testdir, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", history_id=history_id, assert_ok=True)
assert details["file_ext"] == "zip"
assert details["file_size"] == expected_size
content = cast(
bytes, self.dataset_populator.get_history_dataset_content(history_id, dataset=details, type="bytes")
)
# Make sure we got the expected content size.
assert len(content) == expected_size
# Make sure we get the expected contents.
dir_path = decompress_bytes_to_directory(content)
assert dir_path.endswith("testdir1")
EXPECTED_ZIP_CONTENTS = {
"file1": "File",
"file2": "File",
"dir1/": "Directory",
"dir1/file3": "File",
}
for path, entry_class in EXPECTED_ZIP_CONTENTS.items():
path = os.path.join(dir_path, path)
if entry_class == "Directory":
assert os.path.isdir(path)
else:
assert os.path.isfile(path)
# Make sure the hash of the content matches the hash of the original file.
expected_hash = md5_hash_file(testdir)
assert expected_hash is not None
self._assert_content_matches_hash(content, expected_hash)
[docs] def test_upload_dbkey(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(history_id, "Test123", dbkey="hg19")
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert datasets[0].get("genome_build") == "hg19", datasets[0]
[docs] def test_fetch_bam_file(self, history_id):
bam_path = TestDataResolver().get_filename("1.bam")
with open(bam_path, "rb") as fh:
details = self._upload_and_get_details(fh, api="fetch", history_id=history_id, assert_ok=False)
assert details["state"] == "ok"
assert details["file_ext"] == "bam", details
[docs] def test_upload_bam_file(self):
bam_path = TestDataResolver().get_filename("1.bam")
with open(bam_path, "rb") as fh:
details = self._upload_and_get_details(fh, file_type="auto")
assert details["state"] == "ok"
assert details["file_ext"] == "bam", details
[docs] def test_fetch_metadata(self):
table = ONE_TO_SIX_WITH_SPACES
details = self._upload_and_get_details(
table, api="fetch", dbkey="hg19", info="cool upload", tags=["name:data", "group:type:paired-end"]
)
assert details.get("genome_build") == "hg19"
assert details.get("misc_info") == "cool upload", details
tags = details.get("tags")
assert len(tags) == 2, details
assert "group:type:paired-end" in tags
assert "name:data" in tags
[docs] def test_upload_multiple_files_1(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
dbkey="hg19",
extra_inputs={
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "tabular",
"files_1|dbkey": "hg18",
"file_count": "2",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "txt"
assert datasets[0]["genome_build"] == "hg19", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "tabular"
assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_2(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_1|dbkey": "hg18",
"file_count": "2",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "tabular", datasets
assert datasets[0]["genome_build"] == "hg19", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "txt"
assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_3(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_0|file_type": "txt",
"files_0|dbkey": "hg18",
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_1|dbkey": "hg18",
"file_count": "2",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "txt", datasets
assert datasets[0]["genome_build"] == "hg18", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "txt"
assert datasets[1]["genome_build"] == "hg18", datasets
[docs] def test_upload_multiple_files_no_dbkey(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
file_type="tabular",
dbkey=None,
extra_inputs={
"files_0|file_type": "txt",
"files_1|url_paste": "SecondOutputContent",
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"file_count": "2",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 2, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content.strip() == "Test123"
assert datasets[0]["file_ext"] == "txt", datasets
assert datasets[0]["genome_build"] == "?", datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content.strip() == "SecondOutputContent"
assert datasets[1]["file_ext"] == "txt"
assert datasets[1]["genome_build"] == "?", datasets
[docs] def test_upload_multiple_files_space_to_tab(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
content=ONE_TO_SIX_WITH_SPACES,
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_0|file_type": "txt",
"files_0|space_to_tab": "Yes",
"files_1|url_paste": ONE_TO_SIX_WITH_SPACES,
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_2|url_paste": ONE_TO_SIX_WITH_SPACES,
"files_2|NAME": "ThirdOutputName",
"files_2|file_type": "txt",
"files_2|space_to_tab": "Yes",
"file_count": "3",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 3, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content == ONE_TO_SIX_WITH_TABS
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content == ONE_TO_SIX_WITH_SPACES
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[2])
assert content == ONE_TO_SIX_WITH_TABS
[docs] def test_multiple_files_posix_lines(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
content=ONE_TO_SIX_ON_WINDOWS,
file_type="tabular",
dbkey="hg19",
extra_inputs={
"files_0|file_type": "txt",
"files_0|to_posix_lines": "Yes",
"files_1|url_paste": ONE_TO_SIX_ON_WINDOWS,
"files_1|NAME": "SecondOutputName",
"files_1|file_type": "txt",
"files_1|to_posix_lines": None,
"files_2|url_paste": ONE_TO_SIX_ON_WINDOWS,
"files_2|NAME": "ThirdOutputName",
"files_2|file_type": "txt",
"file_count": "3",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 3, datasets
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[0])
assert content == ONE_TO_SIX_WITH_TABS
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[1])
assert content == ONE_TO_SIX_ON_WINDOWS
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=datasets[2])
assert content == ONE_TO_SIX_WITH_TABS
[docs] def test_upload_force_composite(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.upload_payload(
history_id,
"Test123",
extra_inputs={
"files_1|url_paste": "CompositeContent",
"files_1|NAME": "composite",
"file_count": "2",
"force_composite": "True",
},
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
dataset = run_response.json()["outputs"][0]
content = self.dataset_populator.get_history_dataset_content(history_id, dataset=dataset)
assert content.strip() == "Test123"
extra_files = self.dataset_populator.get_history_dataset_extra_files(history_id, dataset_id=dataset["id"])
assert len(extra_files) == 1, extra_files # [{u'path': u'1', u'class': u'File'}]
extra_file = extra_files[0]
assert extra_file["path"] == "composite"
assert extra_file["class"] == "File"
[docs] def test_upload_from_invalid_url(self):
with pytest.raises(AssertionError):
self._upload("https://foo.invalid", assert_ok=False)
[docs] @skip_if_site_down("https://usegalaxy.org")
def test_upload_from_404_url(self):
history_id, new_dataset = self._upload("https://usegalaxy.org/bla123", assert_ok=False)
dataset_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset_id=new_dataset["id"], assert_ok=False
)
assert (
dataset_details["state"] == "error"
), f"expected dataset state to be 'error', but got '{dataset_details['state']}'"
[docs] @skip_if_site_down("https://usegalaxy.org")
def test_upload_from_valid_url(self):
history_id, new_dataset = self._upload("https://usegalaxy.org/api/version")
self.dataset_populator.get_history_dataset_details(history_id, dataset_id=new_dataset["id"], assert_ok=True)
[docs] @skip_if_site_down("https://usegalaxy.org")
def test_upload_from_valid_url_spaces(self):
history_id, new_dataset = self._upload(" https://usegalaxy.org/api/version ")
self.dataset_populator.get_history_dataset_details(history_id, dataset_id=new_dataset["id"], assert_ok=True)
[docs] def test_upload_and_validate_invalid(self):
path = TestDataResolver().get_filename("1.fastqsanger")
with open(path, "rb") as fh:
metadata = self._upload_and_get_details(fh, file_type="fastqcssanger")
assert "validated_state" in metadata
assert metadata["validated_state"] == UNKNOWN
history_id = metadata["history_id"]
dataset_id = metadata["id"]
terminal_validated_state = self.dataset_populator.validate_dataset_and_wait(history_id, dataset_id)
assert terminal_validated_state == "invalid", terminal_validated_state
[docs] def test_upload_and_validate_valid(self):
path = TestDataResolver().get_filename("1.fastqsanger")
with open(path, "rb") as fh:
metadata = self._upload_and_get_details(fh, file_type="fastqsanger")
assert "validated_state" in metadata
assert metadata["validated_state"] == UNKNOWN
history_id = metadata["history_id"]
dataset_id = metadata["id"]
terminal_validated_state = self.dataset_populator.validate_dataset_and_wait(history_id, dataset_id)
assert terminal_validated_state == "ok", terminal_validated_state
[docs] def test_upload_and_validate_hash_valid(self):
with self.dataset_populator.test_history() as history_id:
destination = {"type": "hdas"}
targets = [
{
"destination": destination,
"items": [
{
"src": "url",
"url": URI_FOR_1_2_3,
"hashes": [
{"hash_function": "SHA-1", "hash_value": "65e9d53484d28eef5447bc06fe2d754d1090975a"}
],
},
],
}
]
payload = {
"history_id": history_id,
"targets": targets,
}
fetch_response = self.dataset_populator.fetch(payload)
self._assert_status_code_is(fetch_response, 200)
# history ok implies the dataset upload work
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
[docs] def test_upload_and_validate_hash_invalid(self):
with self.dataset_populator.test_history() as history_id:
destination = {"type": "hdas"}
targets = [
{
"destination": destination,
"items": [
{
"src": "url",
"url": URI_FOR_1_2_3,
"hashes": [{"hash_function": "SHA-1", "hash_value": "invalidhash"}],
},
],
}
]
payload = {
"history_id": history_id,
"targets": targets,
}
fetch_response = self.dataset_populator.fetch(payload, assert_ok=True, wait=False)
self._assert_status_code_is(fetch_response, 200)
outputs = fetch_response.json()["outputs"]
new_dataset = outputs[0]
self.dataset_populator.wait_for_history(history_id, assert_ok=False)
dataset_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=new_dataset, assert_ok=False
)
assert dataset_details["state"] == "error"
def _velvet_upload(self, history_id, extra_inputs):
payload = self.dataset_populator.upload_payload(
history_id,
"sequences content",
file_type="velvet",
extra_inputs=extra_inputs,
)
run_response = self.dataset_populator.tools_post(payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response)
datasets = run_response.json()["outputs"]
assert len(datasets) == 1
dataset = datasets[0]
return dataset
def _get_roadmaps_content(self, history_id, dataset):
roadmaps_content = self.dataset_populator.get_history_dataset_content(
history_id, dataset=dataset, filename="Roadmaps"
)
return roadmaps_content
def _upload_and_get_content(self, content, **upload_kwds):
history_id, new_dataset = self._upload(content, **upload_kwds)
return self.dataset_populator.get_history_dataset_content(history_id, dataset=new_dataset)
def _upload_and_get_details(self, content, **upload_kwds):
assert_ok = upload_kwds.pop("assert_ok", True)
history_id, new_dataset = self._upload(content, **upload_kwds)
return self.dataset_populator.get_history_dataset_details(history_id, dataset=new_dataset, assert_ok=assert_ok)
def _upload(self, content, api="upload1", history_id=None, **upload_kwds):
assert_ok = upload_kwds.get("assert_ok", True)
history_id = history_id or self.dataset_populator.new_history()
if api == "upload1":
new_dataset = self.dataset_populator.new_dataset(
history_id, content=content, fetch_data=False, **upload_kwds
)
else:
assert api == "fetch"
element = dict(src="files", **upload_kwds)
target = {
"destination": {"type": "hdas"},
"elements": [element],
}
targets = [target]
payload = {"history_id": history_id, "targets": targets, "__files": {"files_0|file_data": content}}
new_dataset = self.dataset_populator.fetch(payload, assert_ok=assert_ok).json()["outputs"][0]
self.dataset_populator.wait_for_history(history_id, assert_ok=assert_ok)
return history_id, new_dataset
[docs] def test_upload_dataset_resumable(self):
def upload_file(url, path, api_key, history_id):
filename = os.path.basename(path)
metadata = {
"filename": filename,
"history_id": history_id,
}
my_client = client.TusClient(url, headers={"x-api-key": api_key})
# Upload a file to a tus server.
uploader = my_client.uploader(path, metadata=metadata)
uploader.upload()
assert uploader.url
return uploader.url.rsplit("/", 1)[1] # type: ignore[unreachable]
with self.dataset_populator.test_history() as history_id:
session_id = upload_file(
url=urllib.parse.urljoin(self.url, "api/upload/resumable_upload"),
path=TestDataResolver().get_filename("1.fastqsanger.gz"),
api_key=self.galaxy_interactor.api_key,
history_id=history_id,
)
hda = self._upload_and_get_details(
content=json.dumps({"session_id": session_id}),
api="fetch",
ext="fastqsanger.gz",
name="1.fastqsanger.gz",
)
assert hda["name"] == "1.fastqsanger.gz"
assert hda["file_ext"] == "fastqsanger.gz"
assert hda["state"] == "ok"
[docs] def test_upload_deferred(self, history_id):
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam"
)
assert details["state"] == "deferred"
assert details["file_ext"] == "bam"