Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy_test.api.test_jobs

import datetime
import json
import os
import time
import urllib.parse
from operator import itemgetter
from unittest import SkipTest

import pytest
import requests
from dateutil.parser import isoparse

from galaxy_test.api.test_tools import TestsTools
from galaxy_test.base.api_asserts import assert_status_code_is_ok
from galaxy_test.base.populators import (
    DatasetCollectionPopulator,
    DatasetPopulator,
    skip_without_tool,
    wait_on,
    wait_on_state,
    WorkflowPopulator,
)
from ._framework import ApiTestCase


[docs]class TestJobsApi(ApiTestCase, TestsTools): dataset_populator: DatasetPopulator
[docs] def setUp(self): super().setUp() self.workflow_populator = WorkflowPopulator(self.galaxy_interactor) self.dataset_populator = DatasetPopulator(self.galaxy_interactor) self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
[docs] @pytest.mark.require_new_history def test_index(self, history_id): # Create HDA to ensure at least one job exists... self.__history_with_new_dataset(history_id) jobs = self.__jobs_index() assert "__DATA_FETCH__" in map(itemgetter("tool_id"), jobs)
[docs] @pytest.mark.require_new_history def test_system_details_admin_only(self, history_id): self.__history_with_new_dataset(history_id) jobs = self.__jobs_index(admin=False) job = jobs[0] assert job["external_id"] is None jobs = self.__jobs_index(admin=True) job = jobs[0] assert job["command_line"] assert job["external_id"]
[docs] @pytest.mark.require_new_history def test_admin_job_list(self, history_id): self.__history_with_new_dataset(history_id) jobs_response = self._get("jobs?view=admin_job_list", admin=False) assert jobs_response.status_code == 403 assert jobs_response.json()["err_msg"] == "Only admins can use the admin_job_list view" jobs = self._get("jobs?view=admin_job_list", admin=True).json() job = jobs[0] self._assert_has_keys(job, "command_line", "external_id", "handler")
[docs] @pytest.mark.require_new_history def test_job_list_collection_view(self, history_id): self.__history_with_new_dataset(history_id) jobs_response = self._get("jobs?view=collection") self._assert_status_code_is_ok(jobs_response) jobs = jobs_response.json() job = jobs[0] self._assert_has_keys(job, "id", "tool_id", "state")
[docs] @pytest.mark.require_new_history def test_job_list_default_view(self, history_id): self.__history_with_new_dataset(history_id) jobs_response = self._get(f"jobs?history_id={history_id}") self._assert_status_code_is_ok(jobs_response) jobs = jobs_response.json() job = jobs[0] self._assert_has_keys(job, "id", "tool_id", "state")
[docs] @pytest.mark.require_new_history def test_index_state_filter(self, history_id): # Initial number of ok jobs original_count = len(self.__uploads_with_state("ok")) # Run through dataset upload to ensure num uplaods at least greater # by 1. self.__history_with_ok_dataset(history_id) # Verify number of ok jobs is actually greater. count_increased = False for _ in range(10): new_count = len(self.__uploads_with_state("ok")) if original_count < new_count: count_increased = True break time.sleep(0.1) if not count_increased: template = "Jobs in ok state did not increase (was %d, now %d)" message = template % (original_count, new_count) raise AssertionError(message)
[docs] @pytest.mark.require_new_history def test_index_date_filter(self, history_id): two_weeks_ago = (datetime.datetime.utcnow() - datetime.timedelta(14)).isoformat() last_week = (datetime.datetime.utcnow() - datetime.timedelta(7)).isoformat() before = datetime.datetime.utcnow().isoformat() today = before[:10] tomorrow = (datetime.datetime.utcnow() + datetime.timedelta(1)).isoformat()[:10] self.__history_with_new_dataset(history_id) after = datetime.datetime.utcnow().isoformat() # Test using dates jobs = self.__jobs_index(data={"date_range_min": today, "date_range_max": tomorrow}) assert len(jobs) > 0 today_job = jobs[0] today_job_id = today_job["id"] # Test using datetimes jobs = self.__jobs_index(data={"date_range_min": before, "date_range_max": after}) assert today_job_id in map(itemgetter("id"), jobs), f"before: {before}, after: {after}, job: {today_job}" jobs = self.__jobs_index(data={"date_range_min": two_weeks_ago, "date_range_max": last_week}) assert today_job_id not in map(itemgetter("id"), jobs)
[docs] @pytest.mark.require_new_history def test_index_history(self, history_id): self.__history_with_new_dataset(history_id) jobs = self.__jobs_index(data={"history_id": history_id}) assert len(jobs) > 0 with self.dataset_populator.test_history() as other_history_id: jobs = self.__jobs_index(data={"history_id": other_history_id}) assert len(jobs) == 0
[docs] @pytest.mark.require_new_history @skip_without_tool("cat1") def test_index_workflow_and_invocation_filter(self, history_id): workflow_simple = """ class: GalaxyWorkflow name: Simple Workflow inputs: input1: data outputs: wf_output_1: outputSource: first_cat/out_file1 steps: first_cat: tool_id: cat1 in: input1: input1 """ summary = self.workflow_populator.run_workflow( workflow_simple, history_id=history_id, test_data={"input1": "hello world"} ) invocation_id = summary.invocation_id workflow_id = self._get(f"invocations/{invocation_id}").json()["workflow_id"] self.workflow_populator.wait_for_invocation(workflow_id, invocation_id) jobs1 = self.__jobs_index(data={"workflow_id": workflow_id}) assert len(jobs1) == 1 jobs2 = self.__jobs_index(data={"invocation_id": invocation_id}) assert len(jobs2) == 1 assert jobs1 == jobs2
[docs] @pytest.mark.require_new_history @skip_without_tool("multi_data_optional") def test_index_workflow_filter_implicit_jobs(self, history_id): workflow_id = self.workflow_populator.upload_yaml_workflow( """ class: GalaxyWorkflow inputs: input_datasets: collection steps: multi_data_optional: tool_id: multi_data_optional in: input1: input_datasets """ ) hdca_id = self.dataset_collection_populator.create_list_of_list_in_history(history_id).json() self.dataset_populator.wait_for_history(history_id, assert_ok=True) inputs = { "0": self.dataset_populator.ds_entry(hdca_id), } invocation_id = self.workflow_populator.invoke_workflow_and_wait( workflow_id, history_id=history_id, inputs=inputs ).json()["id"] jobs1 = self.__jobs_index(data={"workflow_id": workflow_id}) jobs2 = self.__jobs_index(data={"invocation_id": invocation_id}) assert len(jobs1) == len(jobs2) == 1 second_invocation_id = self.workflow_populator.invoke_workflow_and_wait( workflow_id, history_id=history_id, inputs=inputs ).json()["id"] workflow_jobs = self.__jobs_index(data={"workflow_id": workflow_id}) second_invocation_jobs = self.__jobs_index(data={"invocation_id": second_invocation_id}) assert len(workflow_jobs) == 2 assert len(second_invocation_jobs) == 1
[docs] @pytest.mark.require_new_history def test_index_limit_and_offset_filter(self, history_id): # create 2 datasets self.__history_with_new_dataset(history_id) self.__history_with_new_dataset(history_id) jobs = self.__jobs_index(data={"history_id": history_id}) assert len(jobs) > 0 length = len(jobs) jobs = self.__jobs_index(data={"history_id": history_id, "offset": 1}) assert len(jobs) == length - 1 jobs = self.__jobs_index(data={"history_id": history_id, "limit": 1}) assert len(jobs) == 1 response = self._get("jobs", data={"history_id": history_id, "limit": -1}) assert response.status_code == 400 assert response.json()["err_msg"] == "Input should be greater than or equal to 1 in ('query', 'limit')"
[docs] @pytest.mark.require_new_history def test_index_search_filter_tool_id(self, history_id): self.__history_with_new_dataset(history_id) jobs = self.__jobs_index(data={"history_id": history_id}) assert len(jobs) > 0 length = len(jobs) jobs = self.__jobs_index(data={"history_id": history_id, "search": "emptyresult"}) assert len(jobs) == 0 jobs = self.__jobs_index(data={"history_id": history_id, "search": "FETCH"}) assert len(jobs) == length jobs = self.__jobs_index(data={"history_id": history_id, "search": "tool:'FETCH'"}) assert len(jobs) == 0
[docs] @pytest.mark.require_new_history def test_index_search_filter_email(self, history_id): self.__history_with_new_dataset(history_id) jobs = self.__jobs_index(data={"history_id": history_id, "search": "FETCH"}) user_email = self.dataset_populator.user_email() jobs = self.__jobs_index(data={"history_id": history_id, "search": user_email}) assert len(jobs) == 0 # we can search on email... jobs = self.__jobs_index( data={"history_id": history_id, "search": user_email, "user_details": True}, admin=True ) assert len(jobs) == 1 # but only if user details are joined in. jobs = self.__jobs_index( data={"history_id": history_id, "search": user_email, "user_details": False}, admin=True ) assert len(jobs) == 0
[docs] def test_index_user_filter(self): test_user_email = "user_for_jobs_index_test@bx.psu.edu" user = self._setup_user(test_user_email) with self._different_user(email=test_user_email): # User should be able to jobs for their own ID. jobs = self.__jobs_index(data={"user_id": user["id"]}) assert jobs == [] # Admin should be able to see jobs of another user. jobs = self.__jobs_index(data={"user_id": user["id"]}, admin=True) assert jobs == [] # Normal user should not be able to see jobs of another user. jobs_response = self._get("jobs", data={"user_id": user["id"]}) self._assert_status_code_is(jobs_response, 403) assert jobs_response.json() == {"err_msg": "Only admins can index the jobs of others", "err_code": 403006}
[docs] @pytest.mark.require_new_history def test_index_handler_runner_filters(self, history_id): self.__history_with_new_dataset(history_id) jobs = self._get(f"jobs?view=admin_job_list&history_id={history_id}", admin=True).json() job = jobs[0] handler = job["handler"] assert handler runner = job["job_runner_name"] assert runner # Free text search includes handler and runner for admin list view. jobs = self._get(f"jobs?view=admin_job_list&history_id={history_id}&search={handler}", admin=True).json() assert jobs jobs = self._get( f"jobs?view=admin_job_list&history_id={history_id}&search={handler}suffixnotfound", admin=True ).json() assert not jobs jobs = self._get(f"jobs?view=admin_job_list&history_id={history_id}&search={runner}", admin=True).json() assert jobs jobs = self._get( f"jobs?view=admin_job_list&history_id={history_id}&search={runner}suffixnotfound", admin=True ).json() assert not jobs # Test tags for runner and handler specifically. assert runner != handler jobs = self._get( f"jobs?view=admin_job_list&history_id={history_id}&search=handler:%27{handler}%27", admin=True ).json() assert jobs jobs = self._get( f"jobs?view=admin_job_list&history_id={history_id}&search=runner:%27{handler}%27", admin=True ).json() assert not jobs jobs = self._get( f"jobs?view=admin_job_list&history_id={history_id}&search=runner:%27{runner}%27", admin=True ).json() assert jobs jobs = self._get( f"jobs?view=admin_job_list&history_id={history_id}&search=handler:%27{runner}%27", admin=True ).json() assert not jobs
[docs] @pytest.mark.require_new_history def test_index_multiple_states_filter(self, history_id): # Initial number of ok jobs original_count = len(self.__uploads_with_state("ok", "new")) # Run through dataset upload to ensure num uploads at least greater # by 1. self.__history_with_ok_dataset(history_id) # Verify number of ok jobs is actually greater. new_count = len(self.__uploads_with_state("new", "ok")) assert original_count < new_count, new_count
[docs] @pytest.mark.require_new_history def test_show(self, history_id): job_properties_tool_run = self.dataset_populator.run_tool( tool_id="job_properties", inputs={}, history_id=history_id, ) first_job = self.__jobs_index()[0] self._assert_has_key(first_job, "id", "state", "exit_code", "update_time", "create_time") job_id = job_properties_tool_run["jobs"][0]["id"] show_jobs_response = self.dataset_populator.get_job_details(job_id) self._assert_status_code_is(show_jobs_response, 200) job_details = show_jobs_response.json() self._assert_has_key(job_details, "id", "state", "exit_code", "update_time", "create_time") show_jobs_response = self.dataset_populator.get_job_details(job_id, full=True) self._assert_status_code_is(show_jobs_response, 200) job_details = show_jobs_response.json() self._assert_has_key( job_details, "create_time", "exit_code", "id", "job_messages", "job_stderr", "job_stdout", "state", "stderr", "stdout", "tool_stderr", "tool_stdout", "update_time", ) self.dataset_populator.wait_for_job(job_id, assert_ok=True) show_jobs_response = self.dataset_populator.get_job_details(job_id, full=True) job_details = show_jobs_response.json() assert "The bool is not true\n" not in job_details["job_stdout"] assert "The bool is very not true\n" not in job_details["job_stderr"] assert job_details["tool_stdout"] == "The bool is not true\n" assert job_details["tool_stderr"] == "The bool is very not true\n" assert "The bool is not true\n" in job_details["stdout"] assert "The bool is very not true\n" in job_details["stderr"]
[docs] @pytest.mark.require_new_history def test_show_security(self, history_id): self.__history_with_new_dataset(history_id) jobs_response = self._get("jobs", data={"history_id": history_id}) job = jobs_response.json()[0] job_id = job["id"] job_lock_response = self._get("job_lock", admin=True) job_lock_response.raise_for_status() assert not job_lock_response.json()["active"] show_jobs_response = self._get(f"jobs/{job_id}", admin=False) assert show_jobs_response.json()["external_id"] is None # TODO: Re-activate test case when API accepts privacy settings # with self._different_user(): # show_jobs_response = self._get( "jobs/%s" % job_id, admin=False ) # self._assert_status_code_is( show_jobs_response, 200 ) show_jobs_response = self._get(f"jobs/{job_id}", admin=True) assert show_jobs_response.json()["external_id"] is not None assert show_jobs_response.json()["command_line"] is not None
def _run_detect_errors(self, history_id, inputs): payload = self.dataset_populator.run_tool_payload( tool_id="detect_errors_aggressive", inputs=inputs, history_id=history_id, ) return self._post("tools", data=payload).json()
[docs] @skip_without_tool("detect_errors_aggressive") def test_unhide_on_error(self): with self.dataset_populator.test_history() as history_id: inputs = {"error_bool": "true"} run_response = self._run_detect_errors(history_id=history_id, inputs=inputs) job_id = run_response["jobs"][0]["id"] self.dataset_populator.wait_for_job(job_id) job = self.dataset_populator.get_job_details(job_id).json() assert job["state"] == "error" dataset = self.dataset_populator.get_history_dataset_details( history_id=history_id, dataset_id=run_response["outputs"][0]["id"], assert_ok=False ) assert dataset["visible"]
def _run_map_over_error(self, history_id): fetch_response = self.dataset_collection_populator.create_list_in_history( history_id, contents=[("sample1-1", "1 2 3")] ).json() hdca1 = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) inputs = { "error_bool": "true", "dataset": { "batch": True, "values": [{"src": "hdca", "id": hdca1["id"]}], }, } return self._run_detect_errors(history_id=history_id, inputs=inputs)
[docs] @skip_without_tool("detect_errors_aggressive") def test_no_unhide_on_error_if_mapped_over(self): with self.dataset_populator.test_history() as history_id: run_response = self._run_map_over_error(history_id) job_id = run_response["jobs"][0]["id"] self.dataset_populator.wait_for_job(job_id) job = self.dataset_populator.get_job_details(job_id).json() assert job["state"] == "error" dataset = self.dataset_populator.get_history_dataset_details( history_id=history_id, dataset_id=run_response["outputs"][0]["id"], assert_ok=False ) assert not dataset["visible"]
[docs] def test_no_hide_on_rerun(self): with self.dataset_populator.test_history() as history_id: run_response = self._run_map_over_error(history_id) job_id = run_response["jobs"][0]["id"] self.dataset_populator.wait_for_job(job_id) failed_hdca = self.dataset_populator.get_history_collection_details( history_id=history_id, content_id=run_response["implicit_collections"][0]["id"], assert_ok=False, ) first_update_time = failed_hdca["update_time"] assert failed_hdca["visible"] rerun_params = self._get(f"jobs/{job_id}/build_for_rerun").json() inputs = rerun_params["state_inputs"] inputs["rerun_remap_job_id"] = job_id rerun_response = self._run_detect_errors(history_id=history_id, inputs=inputs) rerun_job_id = rerun_response["jobs"][0]["id"] self.dataset_populator.wait_for_job(rerun_job_id) # Verify source hdca is still visible hdca = self.dataset_populator.get_history_collection_details( history_id=history_id, content_id=run_response["implicit_collections"][0]["id"], assert_ok=False, ) assert hdca["visible"] assert isoparse(hdca["update_time"]) > (isoparse(first_update_time))
[docs] def test_rerun_exception_handling(self): with self.dataset_populator.test_history() as history_id: other_run_response = self.dataset_populator.run_tool( tool_id="job_properties", inputs={}, history_id=history_id, ) unrelated_job_id = other_run_response["jobs"][0]["id"] run_response = self._run_map_over_error(history_id) job_id = run_response["jobs"][0]["id"] self.dataset_populator.wait_for_job(job_id) failed_hdca = self.dataset_populator.get_history_collection_details( history_id=history_id, content_id=run_response["implicit_collections"][0]["id"], assert_ok=False, ) assert failed_hdca["visible"] rerun_params = self._get(f"jobs/{job_id}/build_for_rerun").json() inputs = rerun_params["state_inputs"] inputs["rerun_remap_job_id"] = unrelated_job_id before_rerun_items = self.dataset_populator.get_history_contents(history_id) rerun_response = self._run_detect_errors(history_id=history_id, inputs=inputs) assert "does not match rerun tool id" in rerun_response["err_msg"] after_rerun_items = self.dataset_populator.get_history_contents(history_id) assert len(before_rerun_items) == len(after_rerun_items)
[docs] @skip_without_tool("empty_output") def test_common_problems(self): with self.dataset_populator.test_history() as history_id: empty_run_response = self.dataset_populator.run_tool( tool_id="empty_output", inputs={}, history_id=history_id, ) empty_hda = empty_run_response["outputs"][0] cat_empty_twice_run_response = self.dataset_populator.run_tool( tool_id="cat1", inputs={ "input1": {"src": "hda", "id": empty_hda["id"]}, "queries_0|input2": {"src": "hda", "id": empty_hda["id"]}, }, history_id=history_id, ) empty_output_job = empty_run_response["jobs"][0] cat_empty_job = cat_empty_twice_run_response["jobs"][0] empty_output_common_problems_response = self._get(f"jobs/{empty_output_job['id']}/common_problems").json() cat_empty_common_problems_response = self._get(f"jobs/{cat_empty_job['id']}/common_problems").json() self._assert_has_keys(empty_output_common_problems_response, "has_empty_inputs", "has_duplicate_inputs") self._assert_has_keys(cat_empty_common_problems_response, "has_empty_inputs", "has_duplicate_inputs") assert not empty_output_common_problems_response["has_empty_inputs"] assert cat_empty_common_problems_response["has_empty_inputs"] assert not empty_output_common_problems_response["has_duplicate_inputs"] assert cat_empty_common_problems_response["has_duplicate_inputs"]
[docs] @skip_without_tool("detect_errors_aggressive") def test_report_error(self): with self.dataset_populator.test_history() as history_id: self._run_error_report(history_id)
[docs] @skip_without_tool("detect_errors_aggressive") def test_report_error_anon(self): with self._different_user(anon=True): history_id = self._get(urllib.parse.urljoin(self.url, "history/current_history_json")).json()["id"] self._run_error_report(history_id)
def _run_error_report(self, history_id): payload = self.dataset_populator.run_tool_payload( tool_id="detect_errors_aggressive", inputs={"error_bool": "true"}, history_id=history_id, ) run_response = self._post("tools", data=payload).json() job_id = run_response["jobs"][0]["id"] self.dataset_populator.wait_for_job(job_id) dataset_id = run_response["outputs"][0]["id"] response = self._post(f"jobs/{job_id}/error", data={"dataset_id": dataset_id}, json=True) assert response.status_code == 200, response.text
[docs] @skip_without_tool("detect_errors_aggressive") def test_report_error_bootstrap_admin(self): with self.dataset_populator.test_history() as history_id: payload = self.dataset_populator.run_tool_payload( tool_id="detect_errors_aggressive", inputs={"error_bool": "true"}, history_id=history_id, ) run_response = self._post("tools", data=payload, key=self.master_api_key) self._assert_status_code_is(run_response, 400)
[docs] @pytest.mark.require_new_history @skip_without_tool("create_2") def test_deleting_output_keep_running_until_all_deleted(self, history_id): job_state, outputs = self._setup_running_two_output_job(history_id, 120) self._hack_to_skip_test_if_state_ok(job_state) # Delete one of the two outputs and make sure the job is still running. self._raw_update_history_item(history_id, outputs[0]["id"], {"deleted": True}) self._hack_to_skip_test_if_state_ok(job_state) time.sleep(1) self._hack_to_skip_test_if_state_ok(job_state) state = job_state().json()["state"] assert state == "running", state # Delete the second output and make sure the job is cancelled. self._raw_update_history_item(history_id, outputs[1]["id"], {"deleted": True}) final_state = wait_on_state(job_state, assert_ok=False, timeout=15) assert final_state in ["deleting", "deleted"], final_state
[docs] @pytest.mark.require_new_history @skip_without_tool("create_2") def test_purging_output_keep_running_until_all_purged(self, history_id): job_state, outputs = self._setup_running_two_output_job(history_id, 120) # Pretty much right away after the job is running, these paths should be populated - # if they are grab them and make sure they are deleted at the end of the job. dataset_1 = self._get_history_item_as_admin(history_id, outputs[0]["id"]) dataset_2 = self._get_history_item_as_admin(history_id, outputs[1]["id"]) if "file_name" in dataset_1: output_dataset_paths = [dataset_1["file_name"], dataset_2["file_name"]] # This may or may not exist depending on if the test is local or not. output_dataset_paths_exist = os.path.exists(output_dataset_paths[0]) else: output_dataset_paths = [] output_dataset_paths_exist = False self._hack_to_skip_test_if_state_ok(job_state) current_state = job_state().json()["state"] assert current_state == "running", current_state # Purge one of the two outputs and make sure the job is still running. self._raw_update_history_item(history_id, outputs[0]["id"], {"purged": True}) time.sleep(1) self._hack_to_skip_test_if_state_ok(job_state) current_state = job_state().json()["state"] assert current_state == "running", current_state # Purge the second output and make sure the job is cancelled. self._raw_update_history_item(history_id, outputs[1]["id"], {"purged": True}) final_state = wait_on_state(job_state, assert_ok=False, timeout=15) assert final_state in ["deleting", "deleted"], final_state def paths_deleted(): if not os.path.exists(output_dataset_paths[0]) and not os.path.exists(output_dataset_paths[1]): return True if output_dataset_paths_exist: wait_on(paths_deleted, "path deletion")
[docs] def test_submission_on_collection_with_deleted_element(self, history_id): hdca = self.dataset_collection_populator.create_list_of_list_in_history(history_id=history_id, wait=True).json() hda_id = hdca["elements"][0]["object"]["elements"][0]["object"]["id"] self.dataset_populator.delete_dataset(history_id=history_id, content_id=hda_id) response = self.dataset_populator.run_tool_raw( "is_of_type", inputs={ "collection": {"batch": True, "values": [{"src": "hdca", "id": hdca["id"], "map_over_type": "list"}]}, }, history_id=history_id, ) assert response.status_code == 400 assert ( response.json()["err_msg"] == "Parameter 'collection': the previously selected dataset collection has elements that are deleted." )
[docs] @pytest.mark.require_new_history @skip_without_tool("create_2") def test_purging_output_cleaned_after_ok_run(self, history_id): job_state, outputs = self._setup_running_two_output_job(history_id, 10) # Pretty much right away after the job is running, these paths should be populated - # if they are grab them and make sure they are deleted at the end of the job. dataset_1 = self._get_history_item_as_admin(history_id, outputs[0]["id"]) dataset_2 = self._get_history_item_as_admin(history_id, outputs[1]["id"]) if "file_name" in dataset_1: output_dataset_paths = [dataset_1["file_name"], dataset_2["file_name"]] # This may or may not exist depending on if the test is local or not. output_dataset_paths_exist = os.path.exists(output_dataset_paths[0]) else: output_dataset_paths = [] output_dataset_paths_exist = False if not output_dataset_paths_exist: # Given this Galaxy configuration - there is nothing more to be tested here. # Consider throwing a skip instead. return # Purge one of the two outputs and wait for the job to complete. self._raw_update_history_item(history_id, outputs[0]["id"], {"purged": True}) wait_on_state(job_state, assert_ok=True) if output_dataset_paths_exist: time.sleep(0.5) # Make sure the non-purged dataset is on disk and the purged one is not. assert os.path.exists(output_dataset_paths[1]) assert not os.path.exists(output_dataset_paths[0])
def _hack_to_skip_test_if_state_ok(self, job_state): if job_state().json()["state"] == "ok": message = "Job state switch from running to ok too quickly - the rest of the test requires the job to be in a running state. Skipping test." raise SkipTest(message) def _setup_running_two_output_job(self, history_id, sleep_time): payload = self.dataset_populator.run_tool_payload( tool_id="create_2", inputs=dict( sleep_time=sleep_time, ), history_id=history_id, ) run_response = self._post("tools", data=payload) run_response.raise_for_status() run_object = run_response.json() outputs = run_object["outputs"] jobs = run_object["jobs"] assert len(outputs) == 2 assert len(jobs) == 1 def job_state(): jobs_response = self._get(f"jobs/{jobs[0]['id']}") return jobs_response # Give job some time to get up and running. time.sleep(2) running_state = wait_on_state(job_state, skip_states=["queued", "new"], assert_ok=False, timeout=15) assert running_state == "running", running_state return job_state, outputs def _raw_update_history_item(self, history_id, item_id, data): update_url = self._api_url(f"histories/{history_id}/contents/{item_id}", use_key=True) update_response = requests.put(update_url, json=data) assert_status_code_is_ok(update_response) return update_response
[docs] @pytest.mark.require_new_history @skip_without_tool("cat_data_and_sleep") def test_resume_job(self, history_id): hda1 = self.dataset_populator.new_dataset(history_id, content="samp1\t10.0\nsamp2\t20.0\n") hda2 = self.dataset_populator.new_dataset(history_id, content="samp1\t30.0\nsamp2\t40.0\n") # Submit first job payload = self.dataset_populator.run_tool_payload( tool_id="cat_data_and_sleep", inputs={ "sleep_time": 15, "input1": {"src": "hda", "id": hda2["id"]}, "queries_0|input2": {"src": "hda", "id": hda2["id"]}, }, history_id=history_id, ) run_response = self._post("tools", data=payload).json() output = run_response["outputs"][0] # Submit second job that waits on job1 payload = self.dataset_populator.run_tool_payload( tool_id="cat1", inputs={"input1": {"src": "hda", "id": hda1["id"]}, "queries_0|input2": {"src": "hda", "id": output["id"]}}, history_id=history_id, ) run_response = self._post("tools", data=payload).json() job_id = run_response["jobs"][0]["id"] output = run_response["outputs"][0] # Delete second jobs input while second job is waiting for first job delete_response = self._delete(f"histories/{history_id}/contents/{hda1['id']}") self._assert_status_code_is(delete_response, 200) self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=False) dataset_details = self._get(f"histories/{history_id}/contents/{output['id']}").json() assert dataset_details["state"] == "paused" # Undelete input dataset undelete_response = self._put( f"histories/{history_id}/contents/{hda1['id']}", data={"deleted": False}, json=True ) self._assert_status_code_is(undelete_response, 200) resume_response = self._put(f"jobs/{job_id}/resume") self._assert_status_code_is(resume_response, 200) self.dataset_populator.wait_for_history_jobs(history_id, assert_ok=True) dataset_details = self._get(f"histories/{history_id}/contents/{output['id']}").json() assert dataset_details["state"] == "ok"
def _get_history_item_as_admin(self, history_id, item_id): response = self._get(f"histories/{history_id}/contents/{item_id}?view=detailed", admin=True) assert_status_code_is_ok(response) return response.json()
[docs] @pytest.mark.require_new_history def test_search_handle_identifiers(self, history_id): # Test that input name and element identifier of a jobs' output must match for a job to be returned. dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) self._job_search(tool_id="identifier_single", history_id=history_id, inputs=inputs) dataset_details = self._get(f"histories/{history_id}/contents/{dataset_id}").json() dataset_details["name"] = "Renamed Test Dataset" dataset_update_response = self._put( f"histories/{history_id}/contents/{dataset_id}", data=dict(name="Renamed Test Dataset"), json=True ) self._assert_status_code_is(dataset_update_response, 200) assert dataset_update_response.json()["name"] == "Renamed Test Dataset" search_payload = self._search_payload(history_id=history_id, tool_id="identifier_single", inputs=inputs) self._search(search_payload, expected_search_count=0)
[docs] @pytest.mark.require_new_history def test_search_delete_outputs(self, history_id): dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) tool_response = self._job_search(tool_id="cat1", history_id=history_id, inputs=inputs) output_id = tool_response.json()["outputs"][0]["id"] delete_respone = self._delete(f"histories/{history_id}/contents/{output_id}") self._assert_status_code_is(delete_respone, 200) search_payload = self._search_payload(history_id=history_id, tool_id="cat1", inputs=inputs) self._search(search_payload, expected_search_count=0)
[docs] def test_implicit_collection_jobs(self, history_id): run_response = self._run_map_over_error(history_id) implicit_collection_id = run_response["implicit_collections"][0]["id"] failed_hdca = self.dataset_populator.get_history_collection_details( history_id=history_id, content_id=implicit_collection_id, assert_ok=False, ) job_id = run_response["jobs"][0]["id"] icj_id = failed_hdca["implicit_collection_jobs_id"] assert icj_id index = self.__jobs_index(data=dict(implicit_collection_jobs_id=icj_id)) assert len(index) == 1 assert index[0]["id"] == job_id assert index[0]["state"] == "error", index
[docs] @pytest.mark.require_new_history def test_search_with_hdca_list_input(self, history_id): list_id_a = self.__history_with_ok_collection(collection_type="list", history_id=history_id) list_id_b = self.__history_with_ok_collection(collection_type="list", history_id=history_id) inputs = json.dumps( { "f1": {"src": "hdca", "id": list_id_a}, "f2": {"src": "hdca", "id": list_id_b}, } ) tool_response = self._job_search(tool_id="multi_data_param", history_id=history_id, inputs=inputs) # We switch the inputs, this should not return a match inputs_switched = json.dumps( { "f2": {"src": "hdca", "id": list_id_a}, "f1": {"src": "hdca", "id": list_id_b}, } ) search_payload = self._search_payload(history_id=history_id, tool_id="multi_data_param", inputs=inputs_switched) self._search(search_payload, expected_search_count=0) # We delete the ouput (this is a HDA, as multi_data_param reduces collections) # and use the correct input job definition, the job should not be found output_id = tool_response.json()["outputs"][0]["id"] delete_respone = self._delete(f"histories/{history_id}/contents/{output_id}") self._assert_status_code_is(delete_respone, 200) search_payload = self._search_payload(history_id=history_id, tool_id="multi_data_param", inputs=inputs) self._search(search_payload, expected_search_count=0)
[docs] @pytest.mark.require_new_history def test_search_delete_hdca_output(self, history_id): list_id_a = self.__history_with_ok_collection(collection_type="list", history_id=history_id) inputs = json.dumps( { "input1": {"src": "hdca", "id": list_id_a}, } ) tool_response = self._job_search(tool_id="collection_creates_list", history_id=history_id, inputs=inputs) output_id = tool_response.json()["outputs"][0]["id"] # We delete a single tool output, no job should be returned delete_respone = self._delete(f"histories/{history_id}/contents/{output_id}") self._assert_status_code_is(delete_respone, 200) search_payload = self._search_payload(history_id=history_id, tool_id="collection_creates_list", inputs=inputs) self._search(search_payload, expected_search_count=0) tool_response = self._job_search(tool_id="collection_creates_list", history_id=history_id, inputs=inputs) output_collection_id = tool_response.json()["output_collections"][0]["id"] # We delete a collection output, no job should be returned delete_respone = self._delete(f"histories/{history_id}/contents/dataset_collections/{output_collection_id}") self._assert_status_code_is(delete_respone, 200) search_payload = self._search_payload(history_id=history_id, tool_id="collection_creates_list", inputs=inputs) self._search(search_payload, expected_search_count=0)
[docs] @pytest.mark.require_new_history def test_search_with_hdca_pair_input(self, history_id): list_id_a = self.__history_with_ok_collection(collection_type="pair", history_id=history_id) inputs = json.dumps( { "f1": {"src": "hdca", "id": list_id_a}, "f2": {"src": "hdca", "id": list_id_a}, } ) self._job_search(tool_id="multi_data_param", history_id=history_id, inputs=inputs) # We test that a job can be found even if the collection has been copied to another history new_history_id = self.dataset_populator.new_history() copy_payload = {"content": list_id_a, "source": "hdca", "type": "dataset_collection"} copy_response = self._post(f"histories/{new_history_id}/contents", data=copy_payload, json=True) self._assert_status_code_is(copy_response, 200) new_list_a = copy_response.json()["id"] copied_inputs = json.dumps( { "f1": {"src": "hdca", "id": new_list_a}, "f2": {"src": "hdca", "id": new_list_a}, } ) search_payload = self._search_payload( history_id=new_history_id, tool_id="multi_data_param", inputs=copied_inputs ) self._search(search_payload, expected_search_count=1) # Now we delete the original input HDCA that was used -- we should still be able to find the job delete_respone = self._delete(f"histories/{history_id}/contents/dataset_collections/{list_id_a}") self._assert_status_code_is(delete_respone, 200) self._search(search_payload, expected_search_count=1) # Now we also delete the copy -- we shouldn't find a job delete_respone = self._delete(f"histories/{history_id}/contents/dataset_collections/{new_list_a}") self._assert_status_code_is(delete_respone, 200) self._search(search_payload, expected_search_count=0)
[docs] @pytest.mark.require_new_history def test_search_with_hdca_list_pair_input(self, history_id): list_id_a = self.__history_with_ok_collection(collection_type="list:pair", history_id=history_id) inputs = json.dumps( { "f1": {"src": "hdca", "id": list_id_a}, "f2": {"src": "hdca", "id": list_id_a}, } ) self._job_search(tool_id="multi_data_param", history_id=history_id, inputs=inputs)
[docs] @pytest.mark.require_new_history def test_search_with_hdca_list_pair_collection_mapped_over_pair_input(self, history_id): list_id_a = self.__history_with_ok_collection(collection_type="list:pair", history_id=history_id) inputs = json.dumps( { "f1": {"batch": True, "values": [{"src": "hdca", "id": list_id_a, "map_over_type": "paired"}]}, } ) self._job_search(tool_id="collection_paired_test", history_id=history_id, inputs=inputs)
def _get_simple_rerun_params(self, history_id, private=False): list_id_a = self.__history_with_ok_collection(collection_type="list:pair", history_id=history_id) inputs = {"f1": {"batch": True, "values": [{"src": "hdca", "id": list_id_a, "map_over_type": "paired"}]}} run_response = self._run( history_id=history_id, tool_id="collection_paired_test", inputs=inputs, wait_for_job=True, assert_ok=True, ) rerun_params = self._get(f"jobs/{run_response['jobs'][0]['id']}/build_for_rerun").json() # Since we call rerun on the first (and only) job we should get the expanded input # which is a dataset collection element (and not the list:pair hdca that was used as input to the original # job). assert rerun_params["state_inputs"]["f1"]["values"][0]["src"] == "dce" if private: hdca = self.dataset_populator.get_history_collection_details(history_id=history_id, content_id=list_id_a) for element in hdca["elements"][0]["object"]["elements"]: self.dataset_populator.make_private(history_id, element["object"]["id"]) return rerun_params
[docs] @skip_without_tool("collection_paired_test") def test_job_build_for_rerun(self, history_id): rerun_params = self._get_simple_rerun_params(history_id) self._run( history_id=history_id, tool_id="collection_paired_test", inputs=rerun_params["state_inputs"], wait_for_job=True, assert_ok=True, )
[docs] @skip_without_tool("collection_paired_test") def test_dce_submission_security(self, history_id): rerun_params = self._get_simple_rerun_params(history_id, private=True) with self._different_user(): other_history_id = self.dataset_populator.new_history() response = self._run( history_id=other_history_id, tool_id="collection_paired_test", inputs=rerun_params["state_inputs"], wait_for_job=False, assert_ok=False, ) assert response.status_code == 403
[docs] @skip_without_tool("identifier_collection") def test_job_build_for_rerun_list_list(self, history_id): list_id_a = self.__history_with_ok_collection(collection_type="list", history_id=history_id) list_id_b = self.__history_with_ok_collection(collection_type="list", history_id=history_id) list_list = self.dataset_collection_populator.create_nested_collection( history_id=history_id, collection_type="list:list", name="list list collection", collection=[list_id_a, list_id_b], ).json() list_list_id = list_list["id"] first_element = list_list["elements"][0] assert first_element["element_type"] == "dataset_collection" assert first_element["element_identifier"] == "test0" assert first_element["model_class"] == "DatasetCollectionElement" inputs = {"input1": {"batch": True, "values": [{"src": "hdca", "id": list_list_id, "map_over_type": "list"}]}} run_response = self._run( history_id=history_id, tool_id="identifier_collection", inputs=inputs, wait_for_job=True, assert_ok=True, ) assert len(run_response["jobs"]) == 2 rerun_params = self._get(f"jobs/{run_response['jobs'][0]['id']}/build_for_rerun").json() # Since we call rerun on the first (and only) job we should get the expanded input # which is a dataset collection element (and not the list:list hdca that was used as input to the original # job). assert rerun_params["state_inputs"]["input1"]["values"][0]["src"] == "dce" rerun_response = self._run( history_id=history_id, tool_id="identifier_collection", inputs=rerun_params["state_inputs"], wait_for_job=True, assert_ok=True, ) assert len(rerun_response["jobs"]) == 1 rerun_content = self.dataset_populator.get_history_dataset_content( history_id=history_id, dataset=rerun_response["outputs"][0] ) run_content = self.dataset_populator.get_history_dataset_content( history_id=history_id, dataset=run_response["outputs"][0] ) assert rerun_content == run_content
[docs] @pytest.mark.require_new_history def test_get_inputs_and_outputs(self, history_id): dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) search_response = self._create_and_search_job(history_id, inputs, tool_id="cat1") job_id = search_response.json()[0]["id"] job_first_output_name, job_first_output_values = list(search_response.json()[0]["outputs"].items())[0] # get the inputs of the job job_response = self._get(f"jobs/{job_id}/inputs") self._assert_status_code_is(job_response, 200) job_first_input = job_response.json()[0] # validate input response assert job_first_input.get("name") == "input1" assert job_first_input.get("dataset") == {"src": "hda", "id": dataset_id} # get the outputs of the job job_response = self._get(f"jobs/{job_id}/outputs") self._assert_status_code_is(job_response, 200) job_first_output = job_response.json()[0] # validate output response assert job_first_output.get("name") == job_first_output_name assert job_first_output.get("dataset").get("id") == job_first_output_values.get("id") assert job_first_output.get("dataset").get("src") == job_first_output_values.get("src")
[docs] @pytest.mark.require_new_history def test_delete_job(self, history_id): dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) search_payload = self._search_payload(history_id=history_id, tool_id="cat1", inputs=inputs) # create a job tool_response = self._post("tools", data=search_payload) job_id = tool_response.json()["jobs"][0]["id"] # delete the job without message delete_job_response = self._delete(f"jobs/{job_id}") self._assert_status_code_is(delete_job_response, 200) assert delete_job_response.json() is True # now that we deleted the job we should not find it anymore search_payload = self._search_payload(history_id=history_id, tool_id="cat1", inputs=inputs) empty_search_response = self._post("jobs/search", data=search_payload, json=True) self._assert_status_code_is(empty_search_response, 200) assert len(empty_search_response.json()) == 0
[docs] @pytest.mark.require_new_history def test_destination_params(self, history_id): dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) search_response = self._create_and_search_job(history_id, inputs, tool_id="cat1") job_id = search_response.json()[0]["id"] destination_params_response = self._get(f"/api/jobs/{job_id}/destination_params", admin=True) self._assert_status_code_is(destination_params_response, 200)
[docs] @pytest.mark.require_new_history def test_job_metrics(self, history_id): dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) search_response = self._create_and_search_job(history_id, inputs, tool_id="cat1") job_id = search_response.json()[0]["id"] metrics_by_job_response = self._get(f"/api/jobs/{job_id}/metrics", data={"hda_ldda": "hda"}) self._assert_status_code_is(metrics_by_job_response, 200) metrics_by_dataset_response = self._get(f"/api/datasets/{dataset_id}/metrics", data={"hda_ldda": "hda"}) self._assert_status_code_is(metrics_by_dataset_response, 200)
[docs] @pytest.mark.require_new_history def test_parameters_display(self, history_id): dataset_id = self.__history_with_ok_dataset(history_id) inputs = json.dumps({"input1": {"src": "hda", "id": dataset_id}}) search_response = self._create_and_search_job(history_id, inputs, tool_id="cat1") job_id = search_response.json()[0]["id"] display_parameters_by_job_response = self._get( f"/api/jobs/{job_id}/parameters_display", data={"hda_ldda": "hda"} ) self._assert_status_code_is(display_parameters_by_job_response, 200) display_parameters_by_dataset_response = self._get( f"/api/datasets/{dataset_id}/parameters_display", data={"hda_ldda": "hda"} ) self._assert_status_code_is(display_parameters_by_dataset_response, 200)
def _create_and_search_job(self, history_id, inputs, tool_id): # create a job search_payload = self._search_payload(history_id=history_id, tool_id=tool_id, inputs=inputs) tool_response = self._post("tools", data=search_payload) self.dataset_populator.wait_for_tool_run(history_id, run_response=tool_response) # search for the job and get the corresponding values search_response = self._post("jobs/search", data=search_payload, json=True) self._assert_status_code_is(search_response, 200) return search_response def _job_search(self, tool_id, history_id, inputs): search_payload = self._search_payload(history_id=history_id, tool_id=tool_id, inputs=inputs) empty_search_response = self._post("jobs/search", data=search_payload, json=True) self._assert_status_code_is(empty_search_response, 200) assert len(empty_search_response.json()) == 0 tool_response = self._post("tools", data=search_payload) self.dataset_populator.wait_for_tool_run(history_id, run_response=tool_response) self._search(search_payload, expected_search_count=1) return tool_response def _search_payload(self, history_id, tool_id, inputs, state="ok"): search_payload = dict(tool_id=tool_id, inputs=inputs, history_id=history_id, state=state) return search_payload def _search(self, payload, expected_search_count=1): # in case job and history aren't updated at exactly the same # time give time to wait for _ in range(5): search_count = self._search_count(payload) if search_count == expected_search_count: break time.sleep(1) assert ( search_count == expected_search_count ), f"expected to find {expected_search_count} jobs, got {search_count} jobs" return search_count def _search_count(self, search_payload): search_response = self._post("jobs/search", data=search_payload, json=True) self._assert_status_code_is(search_response, 200) search_json = search_response.json() return len(search_json) def __uploads_with_state(self, *states): jobs_response = self._get("jobs", data=dict(state=states)) self._assert_status_code_is(jobs_response, 200) jobs = jobs_response.json() assert not [j for j in jobs if j["state"] not in states] return [j for j in jobs if j["tool_id"] == "__DATA_FETCH__"] def __history_with_new_dataset(self, history_id): dataset_id = self.dataset_populator.new_dataset(history_id, wait=True)["id"] return dataset_id def __history_with_ok_dataset(self, history_id): dataset_id = self.dataset_populator.new_dataset(history_id, wait=True)["id"] return dataset_id def __history_with_ok_collection(self, collection_type="list", history_id=None): if not history_id: history_id = self.dataset_populator.new_history() if collection_type == "list": fetch_response = self.dataset_collection_populator.create_list_in_history( history_id, direct_upload=True ).json() elif collection_type == "pair": fetch_response = self.dataset_collection_populator.create_pair_in_history( history_id, direct_upload=True ).json() elif collection_type == "list:pair": fetch_response = self.dataset_collection_populator.create_list_of_pairs_in_history(history_id).json() self.dataset_collection_populator.wait_for_fetched_collection(fetch_response) return fetch_response["outputs"][0]["id"] def __jobs_index(self, **kwds): jobs_response = self._get("jobs", **kwds) self._assert_status_code_is(jobs_response, 200) jobs = jobs_response.json() assert isinstance(jobs, list) return jobs