Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.verify.script

#!/usr/bin/env python

import argparse
import datetime as dt
import json
import logging
import os
import sys
import tempfile
from collections import namedtuple
from concurrent.futures import (
    thread,
    ThreadPoolExecutor,
)

import yaml

from galaxy.tool_util.verify.interactor import (
    DictClientTestConfig,
    GalaxyInteractorApi,
    verify_tool,
)

DESCRIPTION = """Script to quickly run a tool test against a running Galaxy instance."""
DEFAULT_SUITE_NAME = "Galaxy Tool Tests"
ALL_TESTS = -1
ALL_TOOLS = "*"
ALL_VERSION = "*"
LATEST_VERSION = None


TestReference = namedtuple("TestReference", ["tool_id", "tool_version", "test_index"])
TestException = namedtuple("TestException", ["tool_id", "exception", "was_recorded"])


[docs]class Results:
[docs] def __init__(self, default_suitename, test_json, append=False, galaxy_url=None): self.test_json = test_json or "-" self.galaxy_url = galaxy_url test_results = [] test_exceptions = [] suitename = default_suitename if append: assert test_json != "-" with open(test_json) as f: previous_results = json.load(f) test_results = previous_results["tests"] if "suitename" in previous_results: suitename = previous_results["suitename"] self.test_results = test_results self.test_exceptions = test_exceptions self.suitename = suitename
[docs] def register_result(self, result): self.test_results.append(result)
[docs] def register_exception(self, test_exception): self.test_exceptions.append(test_exception)
[docs] def already_successful(self, test_reference): test_data = self._previous_test_data(test_reference) if test_data: if "status" in test_data and test_data["status"] == "success": return True return False
[docs] def already_executed(self, test_reference): test_data = self._previous_test_data(test_reference) if test_data: if "status" in test_data and test_data["status"] != "skipped": return True return False
def _previous_test_data(self, test_reference): test_id = _test_id_for_reference(test_reference) for test_result in self.test_results: if test_result.get("id") != test_id: continue has_data = test_result.get("has_data", False) if has_data: test_data = test_result.get("data", {}) return test_data return None
[docs] def write(self): tests = sorted(self.test_results, key=lambda el: el["id"]) n_passed, n_failures, n_skips = 0, 0, 0 n_errors = len([e for e in self.test_exceptions if not e.was_recorded]) for test in tests: has_data = test.get("has_data", False) if has_data: test_data = test.get("data", {}) if "status" not in test_data: raise Exception(f"Test result data {test_data} doesn't contain a status key.") status = test_data["status"] if status == "success": n_passed += 1 elif status == "error": n_errors += 1 elif status == "skip": n_skips += 1 elif status == "failure": n_failures += 1 report_obj = { "version": "0.1", "suitename": self.suitename, "results": { "total": n_passed + n_failures + n_skips + n_errors, "errors": n_errors, "failures": n_failures, "skips": n_skips, }, "tests": tests, } if self.galaxy_url: report_obj["galaxy_url"] = self.galaxy_url if self.test_json == "-": print(json.dumps(report_obj)) else: with open(self.test_json, "w") as f: json.dump(report_obj, f)
[docs] def info_message(self): messages = [] passed_tests = self._tests_with_status("success") messages.append("Passed tool tests ({}): {}".format(len(passed_tests), [t["id"] for t in passed_tests])) failed_tests = self._tests_with_status("failure") messages.append("Failed tool tests ({}): {}".format(len(failed_tests), [t["id"] for t in failed_tests])) skiped_tests = self._tests_with_status("skip") messages.append("Skipped tool tests ({}): {}".format(len(skiped_tests), [t["id"] for t in skiped_tests])) errored_tests = self._tests_with_status("error") messages.append("Errored tool tests ({}): {}".format(len(errored_tests), [t["id"] for t in errored_tests])) return "\n".join(messages)
@property def success_count(self): self._tests_with_status("success") @property def skip_count(self): self._tests_with_status("skip") @property def error_count(self): return self._tests_with_status("error") + len(self.test_exceptions) @property def failure_count(self): return self._tests_with_status("failure") def _tests_with_status(self, status): return [t for t in self.test_results if t.get("data", {}).get("status") == status]
[docs]def test_tools( galaxy_interactor, test_references, results, log=None, parallel_tests=1, history_per_test_case=False, history_name=None, no_history_reuse=False, no_history_cleanup=False, publish_history=False, retries=0, verify_kwds=None, ): """Run through tool tests and write report. Refactor this into Galaxy in 21.01. """ verify_kwds = (verify_kwds or {}).copy() tool_test_start = dt.datetime.now() history_created = False test_history = None if not history_per_test_case: if not history_name: history_name = f"History for {results.suitename}" if log: log.info(f"History name is '{history_name}'") if not no_history_reuse: history = galaxy_interactor.get_history(history_name=history_name) if history: test_history = history["id"] if log: log.info(f"Using existing history with id '{test_history}', last updated: {history['update_time']}") if not test_history: history_created = True test_history = galaxy_interactor.new_history(history_name=history_name, publish_history=publish_history) if log: log.info(f"History created with id '{test_history}'") verify_kwds.update( { "no_history_cleanup": no_history_cleanup, "test_history": test_history, } ) with ThreadPoolExecutor(max_workers=parallel_tests) as executor: try: for test_reference in test_references: _test_tool( executor=executor, test_reference=test_reference, results=results, galaxy_interactor=galaxy_interactor, log=log, retries=retries, verify_kwds=verify_kwds, publish_history=publish_history, ) finally: # Always write report, even if test was cancelled. try: executor.shutdown(wait=True) except KeyboardInterrupt: executor._threads.clear() thread._threads_queues.clear() results.write() if log: if results.test_json == "-": destination = "standard output" else: destination = os.path.abspath(results.test_json) log.info(f"Report written to '{destination}'") log.info(results.info_message()) log.info(f"Total tool test time: {dt.datetime.now() - tool_test_start}") if history_created and not no_history_cleanup: galaxy_interactor.delete_history(test_history)
def _test_id_for_reference(test_reference): tool_id = test_reference.tool_id tool_version = test_reference.tool_version test_index = test_reference.test_index if tool_version and tool_id.endswith(f"/{tool_version}"): tool_id = tool_id[: -len(f"/{tool_version}")] label_base = tool_id if tool_version: label_base += f"/{str(tool_version)}" test_id = f"{label_base}-{str(test_index)}" return test_id def _test_tool( executor, test_reference, results, galaxy_interactor, log, retries, publish_history, verify_kwds, ): tool_id = test_reference.tool_id tool_version = test_reference.tool_version test_index = test_reference.test_index # If given a tool_id with a version suffix, strip it off so we can treat tool_version # correctly at least in client_test_config. if tool_version and tool_id.endswith(f"/{tool_version}"): tool_id = tool_id[: -len(f"/{tool_version}")] test_id = _test_id_for_reference(test_reference) def run_test(): run_retries = retries job_data = None job_exception = None def register(job_data_): nonlocal job_data job_data = job_data_ try: while run_retries >= 0: job_exception = None try: if log: log.info("Executing test '%s'", test_id) verify_tool( tool_id, galaxy_interactor, test_index=test_index, tool_version=tool_version, register_job_data=register, publish_history=publish_history, **verify_kwds, ) if log: log.info("Test '%s' passed", test_id) break except Exception as e: if log: log.warning("Test '%s' failed", test_id, exc_info=True) job_exception = e run_retries -= 1 finally: if job_data is not None: results.register_result( { "id": test_id, "has_data": True, "data": job_data, } ) if job_exception is not None: was_recorded = job_data is not None test_exception = TestException(tool_id, job_exception, was_recorded) results.register_exception(test_exception) executor.submit(run_test)
[docs]def build_case_references( galaxy_interactor, tool_id=ALL_TOOLS, tool_version=LATEST_VERSION, test_index=ALL_TESTS, page_size=0, page_number=0, test_filters=None, log=None, ): test_references = [] if tool_id == ALL_TOOLS: tests_summary = galaxy_interactor.get_tests_summary() for tool_id, tool_versions_dict in tests_summary.items(): for tool_version, summary in tool_versions_dict.items(): for test_index in range(summary["count"]): test_reference = TestReference(tool_id, tool_version, test_index) test_references.append(test_reference) else: assert tool_id tool_test_dicts = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version) or {} for i, tool_test_dict in enumerate(tool_test_dicts): this_tool_version = tool_test_dict.get("tool_version", tool_version) this_test_index = i if test_index == ALL_TESTS or i == test_index: test_reference = TestReference(tool_id, this_tool_version, this_test_index) test_references.append(test_reference) if test_filters is not None and len(test_filters) > 0: filtered_test_references = [] for test_reference in test_references: skip_test = False for test_filter in test_filters: if test_filter(test_reference): if log is not None: log.debug(f"Filtering test for {test_reference}, skipping") skip_test = True if not skip_test: filtered_test_references.append(test_reference) log.info(f"Skipping {len(test_references)-len(filtered_test_references)} out of {len(test_references)} tests.") test_references = filtered_test_references if page_size > 0: slice_start = page_size * page_number slice_end = page_size * (page_number + 1) test_references = test_references[slice_start:slice_end] return test_references
[docs]def main(argv=None): if argv is None: argv = sys.argv[1:] args = arg_parser().parse_args(argv) try: run_tests(args) except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) sys.exit(1)
[docs]def run_tests(args, test_filters=None, log=None): # Split out argument parsing so we can quickly build other scripts - such as a script # to run all tool tests for a workflow by just passing in a custom test_filters. test_filters = test_filters or [] log = log or setup_global_logger(__name__, verbose=args.verbose) client_test_config_path = args.client_test_config if client_test_config_path is not None: log.debug(f"Reading client config path {client_test_config_path}") with open(client_test_config_path) as f: client_test_config = yaml.full_load(f) else: client_test_config = {} def get_option(key): arg_val = getattr(args, key, None) if arg_val is None and key in client_test_config: val = client_test_config.get(key) else: val = arg_val return val output_json_path = get_option("output_json") galaxy_url = get_option("galaxy_url") galaxy_interactor_kwds = { "galaxy_url": galaxy_url, "master_api_key": get_option("admin_key"), "api_key": get_option("key"), "keep_outputs_dir": args.output, "download_attempts": get_option("download_attempts"), "download_sleep": get_option("download_sleep"), "test_data": get_option("test_data"), } tool_id = args.tool_id tool_version = args.tool_version tools_client_test_config = DictClientTestConfig(client_test_config.get("tools")) verbose = args.verbose galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds) results = Results(args.suite_name, output_json_path, append=args.append, galaxy_url=galaxy_url) skip = args.skip if skip == "executed": test_filters.append(results.already_executed) elif skip == "successful": test_filters.append(results.already_successful) test_references = build_case_references( galaxy_interactor, tool_id=tool_id, tool_version=tool_version, test_index=args.test_index, page_size=args.page_size, page_number=args.page_number, test_filters=test_filters, log=log, ) log.debug(f"Built {len(test_references)} test references to executed.") verify_kwds = dict( client_test_config=tools_client_test_config, force_path_paste=args.force_path_paste, skip_with_reference_data=not args.with_reference_data, quiet=not verbose, ) test_tools( galaxy_interactor, test_references, results, log=log, parallel_tests=args.parallel_tests, history_per_test_case=args.history_per_test_case, history_name=args.history_name, no_history_reuse=args.no_history_reuse, no_history_cleanup=args.no_history_cleanup, publish_history=get_option("publish_history"), verify_kwds=verify_kwds, ) exceptions = results.test_exceptions if exceptions: exception = exceptions[0] if hasattr(exception, "exception"): exception = exception.exception raise exception
[docs]def setup_global_logger(name, log_file=None, verbose=False): formatter = logging.Formatter("%(asctime)s %(levelname)-5s - %(message)s") console = logging.StreamHandler() console.setFormatter(formatter) logger = logging.getLogger(name) logger.setLevel(logging.DEBUG if verbose else logging.INFO) logger.addHandler(console) if not log_file: # delete = false is chosen here because it is always nice to have a log file # ready if you need to debug. Not having the "if only I had set a log file" # moment after the fact. temp = tempfile.NamedTemporaryFile(prefix="ephemeris_", delete=False) log_file = temp.name file_handler = logging.FileHandler(log_file) logger.addHandler(file_handler) logger.info(f"Storing log file in: {log_file}") return logger
[docs]def arg_parser(): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument("-u", "--galaxy-url", default="http://localhost:8080", help="Galaxy URL") parser.add_argument("-k", "--key", default=None, help="Galaxy User API Key") parser.add_argument("-a", "--admin-key", default=None, help="Galaxy Admin API Key") parser.add_argument( "--force_path_paste", default=False, action="store_true", help='This requires Galaxy-side config option "allow_path_paste" enabled. Allows for fetching test data locally. Only for admins.', ) parser.add_argument("-t", "--tool-id", default=ALL_TOOLS, help="Tool ID") parser.add_argument( "--tool-version", default=None, help="Tool Version (if tool id supplied). Defaults to just latest version, use * to test all versions", ) parser.add_argument( "-i", "--test-index", default=ALL_TESTS, type=int, help="Tool Test Index (starting at 0) - by default all tests will run.", ) parser.add_argument("-o", "--output", default=None, help="directory to dump outputs to") parser.add_argument( "--append", default=False, action="store_true", help="Extend a test record json (created with --output-json) with additional tests.", ) skip_group = parser.add_mutually_exclusive_group() skip_group.add_argument( "--skip-previously-executed", dest="skip", default="no", action="store_const", const="executed", help="When used with --append, skip any test previously executed.", ) skip_group.add_argument( "--skip-previously-successful", dest="skip", default="no", action="store_const", const="successful", help="When used with --append, skip any test previously executed successfully.", ) parser.add_argument("-j", "--output-json", default=None, help="output metadata json") parser.add_argument("--verbose", default=False, action="store_true", help="Verbose logging.") parser.add_argument("-c", "--client-test-config", default=None, help="Test config YAML to help with client testing") parser.add_argument("--suite-name", default=DEFAULT_SUITE_NAME, help="Suite name for tool test output") parser.add_argument("--with-reference-data", dest="with_reference_data", default=False, action="store_true") parser.add_argument( "--skip-with-reference-data", dest="with_reference_data", action="store_false", help="Skip tests the Galaxy server believes use data tables or loc files.", ) history_per_group = parser.add_mutually_exclusive_group() history_per_group.add_argument( "--history-per-suite", dest="history_per_test_case", default=False, action="store_false", help="Create new history per test suite (all tests in same history).", ) history_per_group.add_argument( "--history-per-test-case", dest="history_per_test_case", action="store_true", help="Create new history per test case.", ) history_per_group.add_argument("--history-name", default=None, help="Override default history name") parser.add_argument( "--no-history-reuse", default=False, action="store_true", help="Do not reuse histories if a matching one already exists.", ) parser.add_argument( "--no-history-cleanup", default=False, action="store_true", help="Perserve histories created for testing." ) parser.add_argument( "--publish-history", default=False, action="store_true", help="Publish test history. Useful for CI testing." ) parser.add_argument("--parallel-tests", default=1, type=int, help="Parallel tests.") parser.add_argument("--retries", default=0, type=int, help="Retry failed tests.") parser.add_argument( "--page-size", default=0, type=int, help="If positive, use pagination and just run one 'page' to tool tests." ) parser.add_argument( "--page-number", default=0, type=int, help="If page size is used, run this 'page' of tests - starts with 0." ) parser.add_argument( "--download-attempts", default=1, type=int, help="Galaxy may return a transient 500 status code for download if test results are written but not yet accessible.", ) parser.add_argument( "--download-sleep", default=1, type=int, help="If download attempts is greater than 1, the amount to sleep between download attempts.", ) parser.add_argument("--test-data", action="append", help="Add local test data path to search for missing test data") return parser
if __name__ == "__main__": main()