Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_jobs
import datetime
import json
import os
import time
import urllib.parse
from operator import itemgetter
import requests
from dateutil.parser import isoparse
from galaxy_test.api.test_tools import TestsTools
from galaxy_test.base.api_asserts import assert_status_code_is_ok
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
skip_without_tool,
uses_test_history,
wait_on,
wait_on_state,
WorkflowPopulator,
)
from ._framework import ApiTestCase
[docs]class JobsApiTestCase(ApiTestCase, TestsTools):
[docs] def setUp(self):
super().setUp()
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] @uses_test_history(require_new=True)
def test_index(self, history_id):
# Create HDA to ensure at least one job exists...
self.__history_with_new_dataset(history_id)
jobs = self.__jobs_index()
assert "__DATA_FETCH__" in map(itemgetter("tool_id"), jobs)
[docs] @uses_test_history(require_new=True)
def test_system_details_admin_only(self, history_id):
self.__history_with_new_dataset(history_id)
jobs = self.__jobs_index(admin=False)
job = jobs[0]
self._assert_not_has_keys(job, "external_id")
jobs = self.__jobs_index(admin=True)
job = jobs[0]
self._assert_has_keys(job, "command_line", "external_id")
[docs] @uses_test_history(require_new=True)
def test_admin_job_list(self, history_id):
self.__history_with_new_dataset(history_id)
jobs_response = self._get("jobs?view=admin_job_list", admin=False)
assert jobs_response.status_code == 403
assert jobs_response.json()["err_msg"] == "Only admins can use the admin_job_list view"
jobs = self._get("jobs?view=admin_job_list", admin=True).json()
job = jobs[0]
self._assert_has_keys(job, "command_line", "external_id", "handler")
[docs] @uses_test_history(require_new=True)
def test_index_state_filter(self, history_id):
# Initial number of ok jobs
original_count = len(self.__uploads_with_state("ok"))
# Run through dataset upload to ensure num uplaods at least greater
# by 1.
self.__history_with_ok_dataset(history_id)
# Verify number of ok jobs is actually greater.
count_increased = False
for _ in range(10):
new_count = len(self.__uploads_with_state("ok"))
if original_count < new_count:
count_increased = True
break
time.sleep(0.1)
if not count_increased:
template = "Jobs in ok state did not increase (was %d, now %d)"
message = template % (original_count, new_count)
raise AssertionError(message)
[docs] @uses_test_history(require_new=True)
def test_index_date_filter(self, history_id):
two_weeks_ago = (datetime.datetime.utcnow() - datetime.timedelta(14)).isoformat()
last_week = (datetime.datetime.utcnow() - datetime.timedelta(7)).isoformat()
before = datetime.datetime.utcnow().isoformat()
today = before[:10]
tomorrow = (datetime.datetime.utcnow() + datetime.timedelta(1)).isoformat()[:10]
self.__history_with_new_dataset(history_id)
after = datetime.datetime.utcnow().isoformat()
# Test using dates
jobs = self.__jobs_index(data={"date_range_min": today, "date_range_max": tomorrow})
assert len(jobs) > 0
today_job = jobs[0]
today_job_id = today_job["id"]
# Test using datetimes
jobs = self.__jobs_index(data={"date_range_min": before, "date_range_max": after})
assert today_job_id in map(itemgetter("id"), jobs), f"before: {before}, after: {after}, job: {today_job}"
jobs = self.__jobs_index(data={"date_range_min": two_weeks_ago, "date_range_max": last_week})
assert today_job_id not in map(itemgetter("id"), jobs)
[docs] @uses_test_history(require_new=True)
def test_index_history(self, history_id):
self.__history_with_new_dataset(history_id)
jobs = self.__jobs_index(data={"history_id": history_id})
assert len(jobs) > 0
with self.dataset_populator.test_history() as other_history_id:
jobs = self.__jobs_index(data={"history_id": other_history_id})
assert len(jobs) == 0
[docs] @uses_test_history(require_new=True)
@skip_without_tool("cat1")
def test_index_workflow_and_invocation_filter(self, history_id):
workflow_simple = """
class: GalaxyWorkflow
name: Simple Workflow
inputs:
input1: data
outputs:
wf_output_1:
outputSource: first_cat/out_file1
steps:
first_cat:
tool_id: cat1
in:
input1: input1
"""
summary = self.workflow_populator.run_workflow(
workflow_simple, history_id=history_id, test_data={"input1": "hello world"}
)
invocation_id = summary.invocation_id
workflow_id = self._get(f"invocations/{invocation_id}").json()["workflow_id"]
self.workflow_populator.wait_for_invocation(workflow_id, invocation_id)
jobs1 = self.__jobs_index(data={"workflow_id": workflow_id})
assert len(jobs1) == 1
jobs2 = self.__jobs_index(data={"invocation_id": invocation_id})
assert len(jobs2) == 1
assert jobs1 == jobs2
[docs] @uses_test_history(require_new=True)
@skip_without_tool("multi_data_optional")
def test_index_workflow_filter_implicit_jobs(self, history_id):
workflow_id = self.workflow_populator.upload_yaml_workflow(
"""
class: GalaxyWorkflow
inputs:
input_datasets: collection
steps:
multi_data_optional:
tool_id: multi_data_optional
in:
input1: input_datasets
"""
)
hdca_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id).json()
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
inputs = {
"0": self.dataset_populator.ds_entry(hdca_id),
}
invocation_id = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, history_id=history_id, inputs=inputs
).json()["id"]
jobs1 = self.__jobs_index(data={"workflow_id": workflow_id})
jobs2 = self.__jobs_index(data={"invocation_id": invocation_id})
assert len(jobs1) == len(jobs2) == 1
second_invocation_id = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, history_id=history_id, inputs=inputs
).json()["id"]
workflow_jobs = self.__jobs_index(data={"workflow_id": workflow_id})
second_invocation_jobs = self.__jobs_index(data={"invocation_id": second_invocation_id})
assert len(workflow_jobs) == 2
assert len(second_invocation_jobs) == 1
[docs] @uses_test_history(require_new=True)
def test_index_limit_and_offset_filter(self, history_id):
self.__history_with_new_dataset(history_id)
jobs = self.__jobs_index(data={"history_id": history_id})
assert len(jobs) > 0
length = len(jobs)
jobs = self.__jobs_index(data={"history_id": history_id, "offset": 1})
assert len(jobs) == length - 1
jobs = self.__jobs_index(data={"history_id": history_id, "limit": 0})
assert len(jobs) == 0
[docs] @uses_test_history(require_new=True)
def test_index_search_filter_tool_id(self, history_id):
self.__history_with_new_dataset(history_id)
jobs = self.__jobs_index(data={"history_id": history_id})
assert len(jobs) > 0
length = len(jobs)
jobs = self.__jobs_index(data={"history_id": history_id, "search": "emptyresult"})
assert len(jobs) == 0
jobs = self.__jobs_index(data={"history_id": history_id, "search": "FETCH"})
assert len(jobs) == length
jobs = self.__jobs_index(data={"history_id": history_id, "search": "tool:'FETCH'"})
assert len(jobs) == 0
[docs] @uses_test_history(require_new=True)
def test_index_search_filter_email(self, history_id):
self.__history_with_new_dataset(history_id)
jobs = self.__jobs_index(data={"history_id": history_id, "search": "FETCH"})
user_email = self.dataset_populator.user_email()
jobs = self.__jobs_index(data={"history_id": history_id, "search": user_email})
assert len(jobs) == 0
# we can search on email...
jobs = self.__jobs_index(
data={"history_id": history_id, "search": user_email, "user_details": True}, admin=True
)
assert len(jobs) == 1
# but only if user details are joined in.
jobs = self.__jobs_index(
data={"history_id": history_id, "search": user_email, "user_details": False}, admin=True
)
assert len(jobs) == 0
[docs] def test_index_user_filter(self):
test_user_email = "user_for_jobs_index_test@bx.psu.edu"
user = self._setup_user(test_user_email)
with self._different_user(email=test_user_email):
# User should be able to jobs for their own ID.
jobs = self.__jobs_index(data={"user_id": user["id"]})
assert jobs == []
# Admin should be able to see jobs of another user.
jobs = self.__jobs_index(data={"user_id": user["id"]}, admin=True)
assert jobs == []
# Normal user should not be able to see jobs of another user.
jobs_response = self._get("jobs", data={"user_id": user["id"]})
self._assert_status_code_is(jobs_response, 403)
assert jobs_response.json() == {"err_msg": "Only admins can index the jobs of others", "err_code": 403006}
[docs] @uses_test_history(require_new=True)
def test_index_handler_runner_filters(self, history_id):
self.__history_with_new_dataset(history_id)
jobs = self._get(f"jobs?view=admin_job_list&history_id={history_id}", admin=True).json()
job = jobs[0]
handler = job["handler"]
assert handler
runner = job["job_runner_name"]
assert runner
# Free text search includes handler and runner for admin list view.
jobs = self._get(f"jobs?view=admin_job_list&history_id={history_id}&search={handler}", admin=True).json()
assert jobs
jobs = self._get(
f"jobs?view=admin_job_list&history_id={history_id}&search={handler}suffixnotfound", admin=True
).json()
assert not jobs
jobs = self._get(f"jobs?view=admin_job_list&history_id={history_id}&search={runner}", admin=True).json()
assert jobs
jobs = self._get(
f"jobs?view=admin_job_list&history_id={history_id}&search={runner}suffixnotfound", admin=True
).json()
assert not jobs
# Test tags for runner and handler specifically.
assert runner != handler
jobs = self._get(
f"jobs?view=admin_job_list&history_id={history_id}&search=handler:%27{handler}%27", admin=True
).json()
assert jobs
jobs = self._get(
f"jobs?view=admin_job_list&history_id={history_id}&search=runner:%27{handler}%27", admin=True
).json()
assert not jobs
jobs = self._get(
f"jobs?view=admin_job_list&history_id={history_id}&search=runner:%27{runner}%27", admin=True
).json()
assert jobs
jobs = self._get(
f"jobs?view=admin_job_list&history_id={history_id}&search=handler:%27{runner}%27", admin=True
).json()
assert not jobs
[docs] @uses_test_history(require_new=True)
def test_index_multiple_states_filter(self, history_id):
# Initial number of ok jobs
original_count = len(self.__uploads_with_state("ok", "new"))
# Run through dataset upload to ensure num uploads at least greater
# by 1.
self.__history_with_ok_dataset(history_id)
# Verify number of ok jobs is actually greater.
new_count = len(self.__uploads_with_state("new", "ok"))
assert original_count < new_count, new_count
[docs] @uses_test_history(require_new=True)
def test_show(self, history_id):
job_properties_tool_run = self.dataset_populator.run_tool(
tool_id="job_properties",
inputs={},
history_id=history_id,
)
first_job = self.__jobs_index()[0]
self._assert_has_key(first_job, "id", "state", "exit_code", "update_time", "create_time")
job_id = job_properties_tool_run["jobs"][0]["id"]
show_jobs_response = self.dataset_populator.get_job_details(job_id)
self._assert_status_code_is(show_jobs_response, 200)
job_details = show_jobs_response.json()
self._assert_has_key(job_details, "id", "state", "exit_code", "update_time", "create_time")
show_jobs_response = self.dataset_populator.get_job_details(job_id, full=True)
self._assert_status_code_is(show_jobs_response, 200)
job_details = show_jobs_response.json()
self._assert_has_key(
job_details,
"create_time",
"exit_code",
"id",
"job_messages",
"job_stderr",
"job_stdout",
"state",
"stderr",
"stdout",
"tool_stderr",
"tool_stdout",
"update_time",
)
self.dataset_populator.wait_for_job(job_id, assert_ok=True)
show_jobs_response = self.dataset_populator.get_job_details(job_id, full=True)
job_details = show_jobs_response.json()
assert "The bool is not true\n" not in job_details["job_stdout"]
assert "The bool is very not true\n" not in job_details["job_stderr"]
assert job_details["tool_stdout"] == "The bool is not true\n"
assert job_details["tool_stderr"] == "The bool is very not true\n"
assert "The bool is not true\n" in job_details["stdout"]
assert "The bool is very not true\n" in job_details["stderr"]
[docs] @uses_test_history(require_new=True)
def test_show_security(self, history_id):
self.__history_with_new_dataset(history_id)
jobs_response = self._get("jobs", data={"history_id": history_id})
job = jobs_response.json()[0]
job_id = job["id"]
job_lock_response = self._get("job_lock", admin=True)
job_lock_response.raise_for_status()
assert not job_lock_response.json()["active"]
show_jobs_response = self._get(f"jobs/{job_id}", admin=False)
self._assert_not_has_keys(show_jobs_response.json(), "external_id")
# TODO: Re-activate test case when API accepts privacy settings
# with self._different_user():
# show_jobs_response = self._get( "jobs/%s" % job_id, admin=False )
# self._assert_status_code_is( show_jobs_response, 200 )
show_jobs_response = self._get(f"jobs/{job_id}", admin=True)
self._assert_has_keys(show_jobs_response.json(), "command_line", "external_id")
def _run_detect_errors(self, history_id, inputs):
payload = self.dataset_populator.run_tool_payload(
tool_id="detect_errors_aggressive",
inputs=inputs,
history_id=history_id,
)
return self._post("tools", data=payload).json()
[docs] @skip_without_tool("detect_errors_aggressive")
def test_unhide_on_error(self):
with self.dataset_populator.test_history() as history_id:
inputs = {"error_bool": "true"}
run_response = self._run_detect_errors(history_id=history_id, inputs=inputs)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id)
job = self.dataset_populator.get_job_details(job_id).json()
assert job["state"] == "error"
dataset = self.dataset_populator.get_history_dataset_details(
history_id=history_id, dataset_id=run_response["outputs"][0]["id"], assert_ok=False
)
assert dataset["visible"]
def _run_map_over_error(self, history_id):
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=[("sample1-1", "1 2 3")]
).json()
hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
inputs = {
"error_bool": "true",
"dataset": {
"batch": True,
"values": [{"src": "hdca", "id": hdca1["id"]}],
},
}
return self._run_detect_errors(history_id=history_id, inputs=inputs)
[docs] @skip_without_tool("detect_errors_aggressive")
def test_no_unhide_on_error_if_mapped_over(self):
with self.dataset_populator.test_history() as history_id:
run_response = self._run_map_over_error(history_id)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id)
job = self.dataset_populator.get_job_details(job_id).json()
assert job["state"] == "error"
dataset = self.dataset_populator.get_history_dataset_details(
history_id=history_id, dataset_id=run_response["outputs"][0]["id"], assert_ok=False
)
assert not dataset["visible"]
[docs] def test_no_hide_on_rerun(self):
with self.dataset_populator.test_history() as history_id:
run_response = self._run_map_over_error(history_id)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id)
failed_hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id,
content_id=run_response["implicit_collections"][0]["id"],
assert_ok=False,
)
first_update_time = failed_hdca["update_time"]
assert failed_hdca["visible"]
rerun_params = self._get(f"jobs/{job_id}/build_for_rerun").json()
inputs = rerun_params["state_inputs"]
inputs["rerun_remap_job_id"] = job_id
rerun_response = self._run_detect_errors(history_id=history_id, inputs=inputs)
rerun_job_id = rerun_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(rerun_job_id)
# Verify source hdca is still visible
hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id,
content_id=run_response["implicit_collections"][0]["id"],
assert_ok=False,
)
assert hdca["visible"]
assert isoparse(hdca["update_time"]) > (isoparse(first_update_time))
[docs] @skip_without_tool("empty_output")
def test_common_problems(self):
with self.dataset_populator.test_history() as history_id:
empty_run_response = self.dataset_populator.run_tool(
tool_id="empty_output",
inputs={},
history_id=history_id,
)
empty_hda = empty_run_response["outputs"][0]
cat_empty_twice_run_response = self.dataset_populator.run_tool(
tool_id="cat1",
inputs={
"input1": {"src": "hda", "id": empty_hda["id"]},
"queries_0|input2": {"src": "hda", "id": empty_hda["id"]},
},
history_id=history_id,
)
empty_output_job = empty_run_response["jobs"][0]
cat_empty_job = cat_empty_twice_run_response["jobs"][0]
empty_output_common_problems_response = self._get(f"jobs/{empty_output_job['id']}/common_problems").json()
cat_empty_common_problems_response = self._get(f"jobs/{cat_empty_job['id']}/common_problems").json()
self._assert_has_keys(empty_output_common_problems_response, "has_empty_inputs", "has_duplicate_inputs")
self._assert_has_keys(cat_empty_common_problems_response, "has_empty_inputs", "has_duplicate_inputs")
assert not empty_output_common_problems_response["has_empty_inputs"]
assert cat_empty_common_problems_response["has_empty_inputs"]
assert not empty_output_common_problems_response["has_duplicate_inputs"]
assert cat_empty_common_problems_response["has_duplicate_inputs"]
[docs] @skip_without_tool("detect_errors_aggressive")
def test_report_error(self):
with self.dataset_populator.test_history() as history_id:
self._run_error_report(history_id)
[docs] @skip_without_tool("detect_errors_aggressive")
def test_report_error_anon(self):
with self._different_user(anon=True):
history_id = self._get(urllib.parse.urljoin(self.url, "history/current_history_json")).json()["id"]
self._run_error_report(history_id)
def _run_error_report(self, history_id):
payload = self.dataset_populator.run_tool_payload(
tool_id="detect_errors_aggressive",
inputs={"error_bool": "true"},
history_id=history_id,
)
run_response = self._post("tools", data=payload).json()
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id)
dataset_id = run_response["outputs"][0]["id"]
response = self._post(f"jobs/{job_id}/error", data={"dataset_id": dataset_id})
assert response.status_code == 200, response.text
[docs] @skip_without_tool("detect_errors_aggressive")
def test_report_error_bootstrap_admin(self):
with self.dataset_populator.test_history() as history_id:
payload = self.dataset_populator.run_tool_payload(
tool_id="detect_errors_aggressive",
inputs={"error_bool": "true"},
history_id=history_id,
)
run_response = self._post("tools", data=payload, key=self.master_api_key)
self._assert_status_code_is(run_response, 400)
[docs] @skip_without_tool("create_2")
@uses_test_history(require_new=True)
def test_deleting_output_keep_running_until_all_deleted(self, history_id):
job_state, outputs = self._setup_running_two_output_job(history_id, 120)
self._hack_to_skip_test_if_state_ok(job_state)
# Delete one of the two outputs and make sure the job is still running.
self._raw_update_history_item(history_id, outputs[0]["id"], {"deleted": True})
self._hack_to_skip_test_if_state_ok(job_state)
time.sleep(1)
self._hack_to_skip_test_if_state_ok(job_state)
state = job_state().json()["state"]
assert state == "running", state
# Delete the second output and make sure the job is cancelled.
self._raw_update_history_item(history_id, outputs[1]["id"], {"deleted": True})
final_state = wait_on_state(job_state, assert_ok=False, timeout=15)
assert final_state in ["deleting", "deleted"], final_state
[docs] @skip_without_tool("create_2")
@uses_test_history(require_new=True)
def test_purging_output_keep_running_until_all_purged(self, history_id):
job_state, outputs = self._setup_running_two_output_job(history_id, 120)
# Pretty much right away after the job is running, these paths should be populated -
# if they are grab them and make sure they are deleted at the end of the job.
dataset_1 = self._get_history_item_as_admin(history_id, outputs[0]["id"])
dataset_2 = self._get_history_item_as_admin(history_id, outputs[1]["id"])
if "file_name" in dataset_1:
output_dataset_paths = [dataset_1["file_name"], dataset_2["file_name"]]
# This may or may not exist depending on if the test is local or not.
output_dataset_paths_exist = os.path.exists(output_dataset_paths[0])
else:
output_dataset_paths = []
output_dataset_paths_exist = False
self._hack_to_skip_test_if_state_ok(job_state)
current_state = job_state().json()["state"]
assert current_state == "running", current_state
# Purge one of the two outputs and make sure the job is still running.
self._raw_update_history_item(history_id, outputs[0]["id"], {"purged": True})
time.sleep(1)
self._hack_to_skip_test_if_state_ok(job_state)
current_state = job_state().json()["state"]
assert current_state == "running", current_state
# Purge the second output and make sure the job is cancelled.
self._raw_update_history_item(history_id, outputs[1]["id"], {"purged": True})
final_state = wait_on_state(job_state, assert_ok=False, timeout=15)
assert final_state in ["deleting", "deleted"], final_state
def paths_deleted():
if not os.path.exists(output_dataset_paths[0]) and not os.path.exists(output_dataset_paths[1]):
return True
if output_dataset_paths_exist:
wait_on(paths_deleted, "path deletion")
[docs] @skip_without_tool("create_2")
@uses_test_history(require_new=True)
def test_purging_output_cleaned_after_ok_run(self, history_id):
job_state, outputs = self._setup_running_two_output_job(history_id, 10)
# Pretty much right away after the job is running, these paths should be populated -
# if they are grab them and make sure they are deleted at the end of the job.
dataset_1 = self._get_history_item_as_admin(history_id, outputs[0]["id"])
dataset_2 = self._get_history_item_as_admin(history_id, outputs[1]["id"])
if "file_name" in dataset_1:
output_dataset_paths = [dataset_1["file_name"], dataset_2["file_name"]]
# This may or may not exist depending on if the test is local or not.
output_dataset_paths_exist = os.path.exists(output_dataset_paths[0])
else:
output_dataset_paths = []
output_dataset_paths_exist = False
if not output_dataset_paths_exist:
# Given this Galaxy configuration - there is nothing more to be tested here.
# Consider throwing a skip instead.
return
# Purge one of the two outputs and wait for the job to complete.
self._raw_update_history_item(history_id, outputs[0]["id"], {"purged": True})
wait_on_state(job_state, assert_ok=True)
if output_dataset_paths_exist:
time.sleep(0.5)
# Make sure the non-purged dataset is on disk and the purged one is not.
assert os.path.exists(output_dataset_paths[1])
assert not os.path.exists(output_dataset_paths[0])
def _hack_to_skip_test_if_state_ok(self, job_state):
from nose.plugins.skip import SkipTest
if job_state().json()["state"] == "ok":
message = "Job state switch from running to ok too quickly - the rest of the test requires the job to be in a running state. Skipping test."
raise SkipTest(message)
def _setup_running_two_output_job(self, history_id, sleep_time):
payload = self.dataset_populator.run_tool_payload(
tool_id="create_2",
inputs=dict(
sleep_time=sleep_time,
),
history_id=history_id,
)
run_response = self._post("tools", data=payload)
run_response.raise_for_status()
run_object = run_response.json()
outputs = run_object["outputs"]
jobs = run_object["jobs"]
assert len(outputs) == 2
assert len(jobs) == 1
def job_state():
jobs_response = self._get(f"jobs/{jobs[0]['id']}")
return jobs_response
# Give job some time to get up and running.
time.sleep(2)
running_state = wait_on_state(job_state, skip_states=["queued", "new"], assert_ok=False, timeout=15)
assert running_state == "running", running_state
return job_state, outputs
def _raw_update_history_item(self, history_id, item_id, data):
update_url = self._api_url(f"histories/{history_id}/contents/{item_id}", use_key=True)
update_response = requests.put(update_url, json=data)
assert_status_code_is_ok(update_response)
return update_response
[docs] @skip_without_tool("cat_data_and_sleep")
@uses_test_history(require_new=True)
def test_resume_job(self, history_id):
hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n")
hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t40.0\n")
# Submit first job
payload = self.dataset_populator.run_tool_payload(
tool_id="cat_data_and_sleep",
inputs={
"sleep_time": 15,
"input1": {"src": "hda", "id": hda2["id"]},
"queries_0|input2": {"src": "hda", "id": hda2["id"]},
},
history_id=history_id,
)
run_response = self._post("tools", data=payload).json()
output = run_response["outputs"][0]
# Submit second job that waits on job1
payload = self.dataset_populator.run_tool_payload(
tool_id="cat1",
inputs={"input1": {"src": "hda", "id": hda1["id"]}, "queries_0|input2": {"src": "hda", "id": output["id"]}},
history_id=history_id,
)
run_response = self._post("tools", data=payload).json()
job_id = run_response["jobs"][0]["id"]
output = run_response["outputs"][0]
# Delete second jobs input while second job is waiting for first job
delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}")
self._assert_status_code_is(delete_response, 200)
self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=False)
dataset_details = self._get(f"histories/{history_id}/contents/{output['id']}").json()
assert dataset_details["state"] == "paused"
# Undelete input dataset
undelete_response = self._put(
f"histories/{history_id}/contents/{hda1['id']}", data={"deleted": False}, json=True
)
self._assert_status_code_is(undelete_response, 200)
resume_response = self._put(f"jobs/{job_id}/resume")
self._assert_status_code_is(resume_response, 200)
self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=True)
dataset_details = self._get(f"histories/{history_id}/contents/{output['id']}").json()
assert dataset_details["state"] == "ok"
def _get_history_item_as_admin(self, history_id, item_id):
response = self._get(f"histories/{history_id}/contents/{item_id}?view=detailed", admin=True)
assert_status_code_is_ok(response)
return response.json()
[docs] @uses_test_history(require_new=True)
def test_search(self, history_id):
dataset_id = self.__history_with_ok_dataset(history_id)
# We first copy the datasets, so that the update time is lower than the job creation time
new_history_id = self.dataset_populator.new_history()
copy_payload = {"content": dataset_id, "source": "hda", "type": "dataset"}
copy_response = self._post(f"histories/{new_history_id}/contents", data=copy_payload, json=True)
self._assert_status_code_is(copy_response, 200)
inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}})
self._job_search(tool_id="cat1", history_id=history_id, inputs=inputs)
# We test that a job can be found even if the dataset has been copied to another history
new_dataset_id = copy_response.json()["id"]
copied_inputs = json.dumps({"input1": {"src": "hda", "id": new_dataset_id}})
search_payload = self._search_payload(history_id=history_id, tool_id="cat1", inputs=copied_inputs)
self._search(search_payload, expected_search_count=1)
# Now we delete the original input HDA that was used -- we should still be able to find the job
delete_respone = self._delete(f"histories/{history_id}/contents/{dataset_id}")
self._assert_status_code_is(delete_respone, 200)
self._search(search_payload, expected_search_count=1)
# Now we also delete the copy -- we shouldn't find a job
delete_respone = self._delete(f"histories/{new_history_id}/contents/{new_dataset_id}")
self._assert_status_code_is(delete_respone, 200)
self._search(search_payload, expected_search_count=0)
[docs] @uses_test_history(require_new=True)
def test_search_handle_identifiers(self, history_id):
# Test that input name and element identifier of a jobs' output must match for a job to be returned.
dataset_id = self.__history_with_ok_dataset(history_id)
inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}})
self._job_search(tool_id="identifier_single", history_id=history_id, inputs=inputs)
dataset_details = self._get(f"histories/{history_id}/contents/{dataset_id}").json()
dataset_details["name"] = "Renamed Test Dataset"
dataset_update_response = self._put(
f"histories/{history_id}/contents/{dataset_id}", data=dict(name="Renamed Test Dataset"), json=True
)
self._assert_status_code_is(dataset_update_response, 200)
assert dataset_update_response.json()["name"] == "Renamed Test Dataset"
search_payload = self._search_payload(history_id=history_id, tool_id="identifier_single", inputs=inputs)
self._search(search_payload, expected_search_count=0)
[docs] @uses_test_history(require_new=True)
def test_search_delete_outputs(self, history_id):
dataset_id = self.__history_with_ok_dataset(history_id)
inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}})
tool_response = self._job_search(tool_id="cat1", history_id=history_id, inputs=inputs)
output_id = tool_response.json()["outputs"][0]["id"]
delete_respone = self._delete(f"histories/{history_id}/contents/{output_id}")
self._assert_status_code_is(delete_respone, 200)
search_payload = self._search_payload(history_id=history_id, tool_id="cat1", inputs=inputs)
self._search(search_payload, expected_search_count=0)
[docs] @uses_test_history(require_new=True)
def test_search_with_hdca_list_input(self, history_id):
list_id_a = self.__history_with_ok_collection(collection_type="list", history_id=history_id)
list_id_b = self.__history_with_ok_collection(collection_type="list", history_id=history_id)
inputs = json.dumps(
{
"f1": {"src": "hdca", "id": list_id_a},
"f2": {"src": "hdca", "id": list_id_b},
}
)
tool_response = self._job_search(tool_id="multi_data_param", history_id=history_id, inputs=inputs)
# We switch the inputs, this should not return a match
inputs_switched = json.dumps(
{
"f2": {"src": "hdca", "id": list_id_a},
"f1": {"src": "hdca", "id": list_id_b},
}
)
search_payload = self._search_payload(history_id=history_id, tool_id="multi_data_param", inputs=inputs_switched)
self._search(search_payload, expected_search_count=0)
# We delete the ouput (this is a HDA, as multi_data_param reduces collections)
# and use the correct input job definition, the job should not be found
output_id = tool_response.json()["outputs"][0]["id"]
delete_respone = self._delete(f"histories/{history_id}/contents/{output_id}")
self._assert_status_code_is(delete_respone, 200)
search_payload = self._search_payload(history_id=history_id, tool_id="multi_data_param", inputs=inputs)
self._search(search_payload, expected_search_count=0)
[docs] @uses_test_history(require_new=True)
def test_search_delete_hdca_output(self, history_id):
list_id_a = self.__history_with_ok_collection(collection_type="list", history_id=history_id)
inputs = json.dumps(
{
"input1": {"src": "hdca", "id": list_id_a},
}
)
tool_response = self._job_search(tool_id="collection_creates_list", history_id=history_id, inputs=inputs)
output_id = tool_response.json()["outputs"][0]["id"]
# We delete a single tool output, no job should be returned
delete_respone = self._delete(f"histories/{history_id}/contents/{output_id}")
self._assert_status_code_is(delete_respone, 200)
search_payload = self._search_payload(history_id=history_id, tool_id="collection_creates_list", inputs=inputs)
self._search(search_payload, expected_search_count=0)
tool_response = self._job_search(tool_id="collection_creates_list", history_id=history_id, inputs=inputs)
output_collection_id = tool_response.json()["output_collections"][0]["id"]
# We delete a collection output, no job should be returned
delete_respone = self._delete(f"histories/{history_id}/contents/dataset_collections/{output_collection_id}")
self._assert_status_code_is(delete_respone, 200)
search_payload = self._search_payload(history_id=history_id, tool_id="collection_creates_list", inputs=inputs)
self._search(search_payload, expected_search_count=0)
[docs] @uses_test_history(require_new=True)
def test_search_with_hdca_pair_input(self, history_id):
list_id_a = self.__history_with_ok_collection(collection_type="pair", history_id=history_id)
inputs = json.dumps(
{
"f1": {"src": "hdca", "id": list_id_a},
"f2": {"src": "hdca", "id": list_id_a},
}
)
self._job_search(tool_id="multi_data_param", history_id=history_id, inputs=inputs)
# We test that a job can be found even if the collection has been copied to another history
new_history_id = self.dataset_populator.new_history()
copy_payload = {"content": list_id_a, "source": "hdca", "type": "dataset_collection"}
copy_response = self._post(f"histories/{new_history_id}/contents", data=copy_payload, json=True)
self._assert_status_code_is(copy_response, 200)
new_list_a = copy_response.json()["id"]
copied_inputs = json.dumps(
{
"f1": {"src": "hdca", "id": new_list_a},
"f2": {"src": "hdca", "id": new_list_a},
}
)
search_payload = self._search_payload(
history_id=new_history_id, tool_id="multi_data_param", inputs=copied_inputs
)
self._search(search_payload, expected_search_count=1)
# Now we delete the original input HDCA that was used -- we should still be able to find the job
delete_respone = self._delete(f"histories/{history_id}/contents/dataset_collections/{list_id_a}")
self._assert_status_code_is(delete_respone, 200)
self._search(search_payload, expected_search_count=1)
# Now we also delete the copy -- we shouldn't find a job
delete_respone = self._delete(f"histories/{history_id}/contents/dataset_collections/{new_list_a}")
self._assert_status_code_is(delete_respone, 200)
self._search(search_payload, expected_search_count=0)
[docs] @uses_test_history(require_new=True)
def test_search_with_hdca_list_pair_input(self, history_id):
list_id_a = self.__history_with_ok_collection(collection_type="list:pair", history_id=history_id)
inputs = json.dumps(
{
"f1": {"src": "hdca", "id": list_id_a},
"f2": {"src": "hdca", "id": list_id_a},
}
)
self._job_search(tool_id="multi_data_param", history_id=history_id, inputs=inputs)
[docs] @uses_test_history(require_new=True)
def test_search_with_hdca_list_pair_collection_mapped_over_pair_input(self, history_id):
list_id_a = self.__history_with_ok_collection(collection_type="list:pair", history_id=history_id)
inputs = json.dumps(
{
"f1": {"batch": True, "values": [{"src": "hdca", "id": list_id_a, "map_over_type": "paired"}]},
}
)
self._job_search(tool_id="collection_paired_test", history_id=history_id, inputs=inputs)
def _get_simple_rerun_params(self, history_id, private=False):
list_id_a = self.__history_with_ok_collection(collection_type="list:pair", history_id=history_id)
inputs = {"f1": {"batch": True, "values": [{"src": "hdca", "id": list_id_a, "map_over_type": "paired"}]}}
run_response = self._run(
history_id=history_id,
tool_id="collection_paired_test",
inputs=inputs,
wait_for_job=True,
assert_ok=True,
)
rerun_params = self._get(f"jobs/{run_response['jobs'][0]['id']}/build_for_rerun").json()
# Since we call rerun on the first (and only) job we should get the expanded input
# which is a dataset collection element (and not the list:pair hdca that was used as input to the original
# job).
assert rerun_params["state_inputs"]["f1"]["values"][0]["src"] == "dce"
if private:
hdca = self.dataset_populator.get_history_collection_details(history_id=history_id, content_id=list_id_a)
for element in hdca["elements"][0]["object"]["elements"]:
self.dataset_populator.make_private(history_id, element["object"]["id"])
return rerun_params
[docs] @skip_without_tool("collection_paired_test")
@uses_test_history(require_new=False)
def test_job_build_for_rerun(self, history_id):
rerun_params = self._get_simple_rerun_params(history_id)
self._run(
history_id=history_id,
tool_id="collection_paired_test",
inputs=rerun_params["state_inputs"],
wait_for_job=True,
assert_ok=True,
)
[docs] @skip_without_tool("collection_paired_test")
@uses_test_history(require_new=False)
def test_dce_submission_security(self, history_id):
rerun_params = self._get_simple_rerun_params(history_id, private=True)
with self._different_user():
other_history_id = self.dataset_populator.new_history()
response = self._run(
history_id=other_history_id,
tool_id="collection_paired_test",
inputs=rerun_params["state_inputs"],
wait_for_job=False,
assert_ok=False,
)
assert response.status_code == 403
[docs] @skip_without_tool("identifier_collection")
@uses_test_history(require_new=False)
def test_job_build_for_rerun_list_list(self, history_id):
list_id_a = self.__history_with_ok_collection(collection_type="list", history_id=history_id)
list_id_b = self.__history_with_ok_collection(collection_type="list", history_id=history_id)
list_list = self.dataset_collection_populator.create_nested_collection(
history_id=history_id,
collection_type="list:list",
name="list list collection",
collection=[list_id_a, list_id_b],
).json()
list_list_id = list_list["id"]
first_element = list_list["elements"][0]
assert first_element["element_type"] == "dataset_collection"
assert first_element["element_identifier"] == "test0"
assert first_element["model_class"] == "DatasetCollectionElement"
inputs = {"input1": {"batch": True, "values": [{"src": "hdca", "id": list_list_id, "map_over_type": "list"}]}}
run_response = self._run(
history_id=history_id,
tool_id="identifier_collection",
inputs=inputs,
wait_for_job=True,
assert_ok=True,
)
assert len(run_response["jobs"]) == 2
rerun_params = self._get(f"jobs/{run_response['jobs'][0]['id']}/build_for_rerun").json()
# Since we call rerun on the first (and only) job we should get the expanded input
# which is a dataset collection element (and not the list:list hdca that was used as input to the original
# job).
assert rerun_params["state_inputs"]["input1"]["values"][0]["src"] == "dce"
rerun_response = self._run(
history_id=history_id,
tool_id="identifier_collection",
inputs=rerun_params["state_inputs"],
wait_for_job=True,
assert_ok=True,
)
assert len(rerun_response["jobs"]) == 1
rerun_content = self.dataset_populator.get_history_dataset_content(
history_id=history_id, dataset=rerun_response["outputs"][0]
)
run_content = self.dataset_populator.get_history_dataset_content(
history_id=history_id, dataset=run_response["outputs"][0]
)
assert rerun_content == run_content
def _job_search(self, tool_id, history_id, inputs):
search_payload = self._search_payload(history_id=history_id, tool_id=tool_id, inputs=inputs)
empty_search_response = self._post("jobs/search", data=search_payload)
self._assert_status_code_is(empty_search_response, 200)
self.assertEqual(len(empty_search_response.json()), 0)
tool_response = self._post("tools", data=search_payload)
self.dataset_populator.wait_for_tool_run(history_id, run_response=tool_response)
self._search(search_payload, expected_search_count=1)
return tool_response
def _search_payload(self, history_id, tool_id, inputs, state="ok"):
search_payload = dict(tool_id=tool_id, inputs=inputs, history_id=history_id, state=state)
return search_payload
def _search(self, payload, expected_search_count=1):
# in case job and history aren't updated at exactly the same
# time give time to wait
for _ in range(5):
search_count = self._search_count(payload)
if search_count == expected_search_count:
break
time.sleep(1)
assert search_count == expected_search_count, "expected to find %d jobs, got %d jobs" % (
expected_search_count,
search_count,
)
return search_count
def _search_count(self, search_payload):
search_response = self._post("jobs/search", data=search_payload)
self._assert_status_code_is(search_response, 200)
search_json = search_response.json()
return len(search_json)
def __uploads_with_state(self, *states):
jobs_response = self._get("jobs", data=dict(state=states))
self._assert_status_code_is(jobs_response, 200)
jobs = jobs_response.json()
assert not [j for j in jobs if not j["state"] in states]
return [j for j in jobs if j["tool_id"] == "__DATA_FETCH__"]
def __history_with_new_dataset(self, history_id):
dataset_id = self.dataset_populator.new_dataset(history_id, wait=True)["id"]
return dataset_id
def __history_with_ok_dataset(self, history_id):
dataset_id = self.dataset_populator.new_dataset(history_id, wait=True)["id"]
return dataset_id
def __history_with_ok_collection(self, collection_type="list", history_id=None):
if not history_id:
history_id = self.dataset_populator.new_history()
if collection_type == "list":
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, direct_upload=True
).json()
elif collection_type == "pair":
fetch_response = self.dataset_collection_populator.create_pair_in_history(
history_id, direct_upload=True
).json()
elif collection_type == "list:pair":
fetch_response = self.dataset_collection_populator.create_list_of_pairs_in_history(history_id).json()
self.dataset_collection_populator.wait_for_fetched_collection(fetch_response)
return fetch_response["outputs"][0]["id"]
def __jobs_index(self, **kwds):
jobs_response = self._get("jobs", **kwds)
self._assert_status_code_is(jobs_response, 200)
jobs = jobs_response.json()
assert isinstance(jobs, list)
return jobs