Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.verify.script
#!/usr/bin/env python
import argparse
import concurrent.futures.thread
import datetime as dt
import json
import logging
import os
import sys
import tempfile
from concurrent.futures import (
thread,
ThreadPoolExecutor,
)
from typing import (
Any,
Callable,
Dict,
List,
NamedTuple,
Optional,
)
import yaml
from galaxy.tool_util.verify.interactor import (
DictClientTestConfig,
GalaxyInteractorApi,
ToolTestDictsT,
verify_tool,
)
DESCRIPTION = """Script to quickly run a tool test against a running Galaxy instance."""
DEFAULT_SUITE_NAME = "Galaxy Tool Tests"
ALL_TESTS = -1
ALL_TOOLS = "*"
ALL_VERSION = "*"
LATEST_VERSION = None
[docs]class Results:
test_exceptions: List[TestException]
[docs] def __init__(
self, default_suitename: str, test_json: str, append: bool = False, galaxy_url: Optional[str] = None
) -> None:
self.test_json = test_json or "-"
self.galaxy_url = galaxy_url
test_results = []
suitename = default_suitename
if append and os.path.exists(test_json):
assert test_json != "-"
with open(test_json) as f:
previous_results = json.load(f)
if "tests" in previous_results:
test_results = previous_results["tests"]
if "suitename" in previous_results:
suitename = previous_results["suitename"]
self.test_results = test_results
self.test_exceptions = []
self.suitename = suitename
[docs] def register_exception(self, test_exception: TestException) -> None:
self.test_exceptions.append(test_exception)
[docs] def already_successful(self, test_reference: TestReference) -> bool:
test_data = self._previous_test_data(test_reference)
if test_data:
if "status" in test_data and test_data["status"] == "success":
return True
return False
[docs] def already_executed(self, test_reference: TestReference) -> bool:
test_data = self._previous_test_data(test_reference)
if test_data:
if "status" in test_data and test_data["status"] != "skipped":
return True
return False
def _previous_test_data(self, test_reference: TestReference) -> Optional[Dict[str, Any]]:
test_id = _test_id_for_reference(test_reference)
for test_result in self.test_results:
if test_result.get("id") != test_id:
continue
has_data = test_result.get("has_data", False)
if has_data:
test_data = test_result.get("data", {})
return test_data
return None
[docs] def write(self) -> None:
tests = sorted(self.test_results, key=lambda el: el["id"])
n_passed, n_failures, n_skips = 0, 0, 0
n_errors = len([e for e in self.test_exceptions if not e.was_recorded])
for test in tests:
has_data = test.get("has_data", False)
if has_data:
test_data = test.get("data", {})
if "status" not in test_data:
raise Exception(f"Test result data {test_data} doesn't contain a status key.")
status = test_data["status"]
if status == "success":
n_passed += 1
elif status == "error":
n_errors += 1
elif status == "skip":
n_skips += 1
elif status == "failure":
n_failures += 1
report_obj = {
"version": "0.1",
"suitename": self.suitename,
"results": {
"total": n_passed + n_failures + n_skips + n_errors,
"errors": n_errors,
"failures": n_failures,
"skips": n_skips,
},
"tests": tests,
}
if self.galaxy_url:
report_obj["galaxy_url"] = self.galaxy_url
if self.test_json == "-":
print(json.dumps(report_obj))
else:
with open(self.test_json, "w") as f:
json.dump(report_obj, f)
[docs] def info_message(self) -> str:
messages = []
passed_tests = self._tests_with_status("success")
messages.append("Passed tool tests ({}): {}".format(len(passed_tests), [t["id"] for t in passed_tests]))
failed_tests = self._tests_with_status("failure")
messages.append("Failed tool tests ({}): {}".format(len(failed_tests), [t["id"] for t in failed_tests]))
skipped_tests = self._tests_with_status("skip")
messages.append("Skipped tool tests ({}): {}".format(len(skipped_tests), [t["id"] for t in skipped_tests]))
errored_tests = self._tests_with_status("error")
messages.append("Errored tool tests ({}): {}".format(len(errored_tests), [t["id"] for t in errored_tests]))
return "\n".join(messages)
def _tests_with_status(self, status: str) -> List[Dict[str, Any]]:
return [t for t in self.test_results if t.get("data", {}).get("status") == status]
[docs]def test_tools(
galaxy_interactor: GalaxyInteractorApi,
test_references: List[TestReference],
results: Results,
log: Optional[logging.Logger] = None,
parallel_tests: int = 1,
history_per_test_case: bool = False,
history_name: Optional[str] = None,
no_history_reuse: bool = False,
no_history_cleanup: bool = False,
publish_history: bool = False,
retries: int = 0,
verify_kwds: Optional[Dict[str, Any]] = None,
) -> None:
"""Run through tool tests and write report."""
verify_kwds = (verify_kwds or {}).copy()
tool_test_start = dt.datetime.now()
history_created = False
test_history = None
if not history_per_test_case:
if not history_name:
history_name = f"History for {results.suitename}"
if log:
log.info(f"History name is '{history_name}'")
if not no_history_reuse:
history = galaxy_interactor.get_history(history_name=history_name)
if history:
test_history = history["id"]
if log:
log.info(f"Using existing history with id '{test_history}', last updated: {history['update_time']}")
if not test_history:
history_created = True
test_history = galaxy_interactor.new_history(history_name=history_name, publish_history=publish_history)
if log:
log.info(f"History created with id '{test_history}'")
verify_kwds.update(
{
"no_history_cleanup": no_history_cleanup,
"test_history": test_history,
}
)
with ThreadPoolExecutor(max_workers=parallel_tests) as executor:
try:
for test_reference in test_references:
_test_tool(
executor=executor,
test_reference=test_reference,
results=results,
galaxy_interactor=galaxy_interactor,
log=log,
retries=retries,
verify_kwds=verify_kwds,
publish_history=publish_history,
)
finally:
# Always write report, even if test was cancelled.
try:
executor.shutdown(wait=True)
except KeyboardInterrupt:
executor._threads.clear() # type: ignore[attr-defined]
thread._threads_queues.clear() # type: ignore[attr-defined]
results.write()
if log:
if results.test_json == "-":
destination = "standard output"
else:
destination = os.path.abspath(results.test_json)
log.info(f"Report written to '{destination}'")
log.info(results.info_message())
log.info(f"Total tool test time: {dt.datetime.now() - tool_test_start}")
if history_created and not no_history_cleanup:
galaxy_interactor.delete_history(test_history)
def _test_id_for_reference(test_reference: "TestReference") -> str:
tool_id = test_reference.tool_id
tool_version = test_reference.tool_version
test_index = test_reference.test_index
if tool_version and tool_id.endswith(f"/{tool_version}"):
tool_id = tool_id[: -len(f"/{tool_version}")]
label_base = tool_id
if tool_version:
label_base += f"/{str(tool_version)}"
test_id = f"{label_base}-{str(test_index)}"
return test_id
def _test_tool(
executor: concurrent.futures.thread.ThreadPoolExecutor,
test_reference: "TestReference",
results: Results,
galaxy_interactor: GalaxyInteractorApi,
log: Optional[logging.Logger],
retries: int,
publish_history: bool,
verify_kwds: Dict[str, Any],
) -> None:
tool_id = test_reference.tool_id
tool_version = test_reference.tool_version
test_index = test_reference.test_index
# If given a tool_id with a version suffix, strip it off so we can treat tool_version
# correctly at least in client_test_config.
if tool_version and tool_id.endswith(f"/{tool_version}"):
tool_id = tool_id[: -len(f"/{tool_version}")]
test_id = _test_id_for_reference(test_reference)
def run_test() -> None:
run_retries = retries
job_data = None
job_exception = None
def register(job_data_):
nonlocal job_data
job_data = job_data_
try:
while run_retries >= 0:
job_exception = None
try:
if log:
log.info("Executing test '%s'", test_id)
verify_tool(
tool_id,
galaxy_interactor,
test_index=test_index,
tool_version=tool_version,
register_job_data=register,
publish_history=publish_history,
**verify_kwds,
)
if log:
log.info("Test '%s' passed", test_id)
break
except Exception as e:
if log:
log.warning("Test '%s' failed", test_id, exc_info=True)
job_exception = e
run_retries -= 1
finally:
if job_data is not None:
results.register_result(
{
"id": test_id,
"has_data": True,
"data": job_data,
}
)
if job_exception is not None:
was_recorded = job_data is not None
test_exception = TestException(tool_id, job_exception, was_recorded)
results.register_exception(test_exception)
executor.submit(run_test)
[docs]def build_case_references(
galaxy_interactor: GalaxyInteractorApi,
tool_id: str = ALL_TOOLS,
tool_version: Optional[str] = LATEST_VERSION,
test_index: int = ALL_TESTS,
page_size: int = 0,
page_number: int = 0,
test_filters: Optional[List[Callable[[TestReference], bool]]] = None,
log: Optional[logging.Logger] = None,
) -> List[TestReference]:
test_references: List[TestReference] = []
if tool_id == ALL_TOOLS:
tests_summary = galaxy_interactor.get_tests_summary()
for tool_id, tool_versions_dict in tests_summary.items():
for tool_version, summary in tool_versions_dict.items():
for test_index in range(summary["count"]):
test_reference = TestReference(tool_id, tool_version, test_index)
test_references.append(test_reference)
else:
assert tool_id
tool_test_dicts: ToolTestDictsT = galaxy_interactor.get_tool_tests(tool_id, tool_version=tool_version)
for i, tool_test_dict in enumerate(tool_test_dicts):
this_tool_version = tool_test_dict.get("tool_version", tool_version)
this_test_index = i
if test_index == ALL_TESTS or i == test_index:
test_reference = TestReference(tool_id, this_tool_version, this_test_index)
test_references.append(test_reference)
if test_filters is not None and len(test_filters) > 0:
filtered_test_references: List[TestReference] = []
for test_reference in test_references:
skip_test = False
for test_filter in test_filters:
if test_filter(test_reference):
if log is not None:
log.debug(f"Filtering test for {test_reference}, skipping")
skip_test = True
if not skip_test:
filtered_test_references.append(test_reference)
if log is not None:
log.info(
f"Skipping {len(test_references)-len(filtered_test_references)} out of {len(test_references)} tests."
)
test_references = filtered_test_references
if page_size > 0:
slice_start = page_size * page_number
slice_end = page_size * (page_number + 1)
test_references = test_references[slice_start:slice_end]
return test_references
[docs]def main(argv=None) -> None:
if argv is None:
argv = sys.argv[1:]
args = arg_parser().parse_args(argv)
try:
run_tests(args)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
[docs]def run_tests(
args: argparse.Namespace,
test_filters: Optional[List[Callable[[TestReference], bool]]] = None,
log: Optional[logging.Logger] = None,
) -> None:
# Split out argument parsing so we can quickly build other scripts - such as a script
# to run all tool tests for a workflow by just passing in a custom test_filters.
test_filters = test_filters or []
log = log or setup_global_logger(__name__, verbose=args.verbose)
client_test_config_path = args.client_test_config
if client_test_config_path is not None:
log.debug(f"Reading client config path {client_test_config_path}")
with open(client_test_config_path) as f:
client_test_config = yaml.full_load(f)
else:
client_test_config = {}
def get_option(key):
arg_val = getattr(args, key, None)
if arg_val is None and key in client_test_config:
val = client_test_config.get(key)
else:
val = arg_val
return val
output_json_path = get_option("output_json")
galaxy_url = get_option("galaxy_url")
galaxy_interactor_kwds = {
"galaxy_url": galaxy_url,
"master_api_key": get_option("admin_key"),
"api_key": get_option("key"),
"keep_outputs_dir": args.output,
"download_attempts": get_option("download_attempts"),
"download_sleep": get_option("download_sleep"),
"test_data": get_option("test_data"),
}
tool_id = args.tool_id
tool_version = args.tool_version
tools_client_test_config = DictClientTestConfig(client_test_config.get("tools"))
verbose = args.verbose
galaxy_interactor = GalaxyInteractorApi(**galaxy_interactor_kwds)
results = Results(args.suite_name, output_json_path, append=args.append, galaxy_url=galaxy_url)
skip = args.skip
if skip == "executed":
test_filters.append(results.already_executed)
elif skip == "successful":
test_filters.append(results.already_successful)
test_references = build_case_references(
galaxy_interactor,
tool_id=tool_id,
tool_version=tool_version,
test_index=args.test_index,
page_size=args.page_size,
page_number=args.page_number,
test_filters=test_filters,
log=log,
)
log.debug(f"Built {len(test_references)} test references to executed.")
verify_kwds = dict(
client_test_config=tools_client_test_config,
force_path_paste=args.force_path_paste,
skip_with_reference_data=not args.with_reference_data,
quiet=not verbose,
)
test_tools(
galaxy_interactor,
test_references,
results,
log=log,
parallel_tests=args.parallel_tests,
history_per_test_case=args.history_per_test_case,
history_name=args.history_name,
no_history_reuse=args.no_history_reuse,
no_history_cleanup=args.no_history_cleanup,
publish_history=get_option("publish_history"),
verify_kwds=verify_kwds,
)
exceptions = results.test_exceptions
if exceptions:
exception = exceptions[0]
raise exception.exception
[docs]def setup_global_logger(name: str, log_file: Optional[str] = None, verbose: bool = False) -> logging.Logger:
formatter = logging.Formatter("%(asctime)s %(levelname)-5s - %(message)s")
console = logging.StreamHandler()
console.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
logger.addHandler(console)
if not log_file:
# delete = false is chosen here because it is always nice to have a log file
# ready if you need to debug. Not having the "if only I had set a log file"
# moment after the fact.
temp = tempfile.NamedTemporaryFile(prefix="ephemeris_", delete=False)
log_file = temp.name
file_handler = logging.FileHandler(log_file)
logger.addHandler(file_handler)
logger.info(f"Storing log file in: {log_file}")
return logger
[docs]def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("-u", "--galaxy-url", default="http://localhost:8080", help="Galaxy URL")
parser.add_argument("-k", "--key", default=None, help="Galaxy User API Key")
parser.add_argument("-a", "--admin-key", default=None, help="Galaxy Admin API Key")
parser.add_argument(
"--force_path_paste",
default=False,
action="store_true",
help='This requires Galaxy-side config option "allow_path_paste" enabled. Allows for fetching test data locally. Only for admins.',
)
parser.add_argument("-t", "--tool-id", default=ALL_TOOLS, help="Tool ID")
parser.add_argument(
"--tool-version",
default=None,
help="Tool Version (if tool id supplied). Defaults to just latest version, use * to test all versions",
)
parser.add_argument(
"-i",
"--test-index",
default=ALL_TESTS,
type=int,
help="Tool Test Index (starting at 0) - by default all tests will run.",
)
parser.add_argument("-o", "--output", default=None, help="directory to dump outputs to")
parser.add_argument(
"--append",
default=False,
action="store_true",
help="Extend a test record json (created with --output-json) with additional tests.",
)
skip_group = parser.add_mutually_exclusive_group()
skip_group.add_argument(
"--skip-previously-executed",
dest="skip",
default="no",
action="store_const",
const="executed",
help="When used with --append, skip any test previously executed.",
)
skip_group.add_argument(
"--skip-previously-successful",
dest="skip",
default="no",
action="store_const",
const="successful",
help="When used with --append, skip any test previously executed successfully.",
)
parser.add_argument("-j", "--output-json", default=None, help="output metadata json")
parser.add_argument("--verbose", default=False, action="store_true", help="Verbose logging.")
parser.add_argument("-c", "--client-test-config", default=None, help="Test config YAML to help with client testing")
parser.add_argument("--suite-name", default=DEFAULT_SUITE_NAME, help="Suite name for tool test output")
parser.add_argument("--with-reference-data", dest="with_reference_data", default=False, action="store_true")
parser.add_argument(
"--skip-with-reference-data",
dest="with_reference_data",
action="store_false",
help="Skip tests the Galaxy server believes use data tables or loc files.",
)
history_per_group = parser.add_mutually_exclusive_group()
history_per_group.add_argument(
"--history-per-suite",
dest="history_per_test_case",
default=False,
action="store_false",
help="Create new history per test suite (all tests in same history).",
)
history_per_group.add_argument(
"--history-per-test-case",
dest="history_per_test_case",
action="store_true",
help="Create new history per test case.",
)
history_per_group.add_argument("--history-name", default=None, help="Override default history name")
parser.add_argument(
"--no-history-reuse",
default=False,
action="store_true",
help="Do not reuse histories if a matching one already exists.",
)
parser.add_argument(
"--no-history-cleanup", default=False, action="store_true", help="Perserve histories created for testing."
)
parser.add_argument(
"--publish-history", default=False, action="store_true", help="Publish test history. Useful for CI testing."
)
parser.add_argument("--parallel-tests", default=1, type=int, help="Parallel tests.")
parser.add_argument("--retries", default=0, type=int, help="Retry failed tests.")
parser.add_argument(
"--page-size", default=0, type=int, help="If positive, use pagination and just run one 'page' to tool tests."
)
parser.add_argument(
"--page-number", default=0, type=int, help="If page size is used, run this 'page' of tests - starts with 0."
)
parser.add_argument(
"--download-attempts",
default=1,
type=int,
help="Galaxy may return a transient 500 status code for download if test results are written but not yet accessible.",
)
parser.add_argument(
"--download-sleep",
default=1,
type=int,
help="If download attempts is greater than 1, the amount to sleep between download attempts.",
)
parser.add_argument("--test-data", action="append", help="Add local test data path to search for missing test data")
return parser
if __name__ == "__main__":
main()