Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.linters.tests

"""This module contains a linting functions for tool tests."""

from io import StringIO
from typing import (
    Iterator,
    List,
    Tuple,
    TYPE_CHECKING,
)

from galaxy.tool_util.lint import Linter
from galaxy.tool_util.parameters import validate_test_cases_for_tool_source
from galaxy.tool_util.verify.assertion_models import assertion_list
from galaxy.util import asbool
from ._util import is_datasource

if TYPE_CHECKING:
    from galaxy.tool_util.lint import LintContext
    from galaxy.tool_util.parser.interface import ToolSource
    from galaxy.util.etree import Element

lint_tool_types = ["default", "data_source", "manage_data"]


[docs]class TestsMissing(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") root = tool_xml.find("./tests") if root is None: root = tool_xml.getroot() if len(tests) == 0 and not is_datasource(tool_xml): lint_ctx.warn("No tests found, most tools should define test cases.", linter=cls.name(), node=root)
[docs]class TestsMissingDatasource(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") root = tool_xml.find("./tests") if root is None: root = tool_xml.getroot() if len(tests) == 0 and is_datasource(tool_xml): lint_ctx.info("No tests found, that should be OK for data_sources.", linter=cls.name(), node=root)
[docs]class TestsAssertsMultiple(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): # TODO same would be nice also for assert_contents for ta in ("assert_stdout", "assert_stderr", "assert_command"): if len(test.findall(ta)) > 1: lint_ctx.error( f"Test {test_idx}: More than one {ta} found. Only the first is considered.", linter=cls.name(), node=test, )
[docs]class TestsAssertsHasNQuant(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for a in test.xpath( ".//*[self::assert_contents or self::assert_stdout or self::assert_stderr or self::assert_command]//*" ): if a.tag not in ["has_n_lines", "has_n_columns"]: continue if not (set(a.attrib) & {"n", "min", "max"}): lint_ctx.error( f"Test {test_idx}: '{a.tag}' needs to specify 'n', 'min', or 'max'", linter=cls.name(), node=a )
[docs]class TestsAssertsHasSizeQuant(Linter): """ has_size needs at least one of size (or value), min, max """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for a in test.xpath( ".//*[self::assert_contents or self::assert_stdout or self::assert_stderr or self::assert_command]//*" ): if a.tag != "has_size": continue if len(set(a.attrib) & {"value", "size", "min", "max"}) == 0: lint_ctx.error( f"Test {test_idx}: '{a.tag}' needs to specify 'size', 'min', or 'max'", linter=cls.name(), node=a, )
[docs]class TestsAssertsHasSizeOrValueQuant(Linter): """ has_size should not have size and value """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for a in test.xpath( ".//*[self::assert_contents or self::assert_stdout or self::assert_stderr or self::assert_command]//*" ): if a.tag != "has_size": continue if "value" in a.attrib and "size" in a.attrib: lint_ctx.error( f"Test {test_idx}: '{a.tag}' must not specify 'value' and 'size'", linter=cls.name(), node=a, )
[docs]class TestsAssertionValidation(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): try: raw_tests_dict = tool_source.parse_tests_to_dict() except Exception: lint_ctx.warn("Failed to parse test dictionaries from tool - cannot lint assertions") return assert "tests" in raw_tests_dict for test_idx, test in enumerate(raw_tests_dict["tests"], start=1): # TODO: validate command, command_version, element tests. What about children? for output in test["outputs"]: asserts_raw = output.get("attributes", {}).get("assert_list") or [] to_yaml_assertions = [] for raw_assert in asserts_raw: to_yaml_assertions.append({"that": raw_assert["tag"], **raw_assert.get("attributes", {})}) try: assertion_list.model_validate(to_yaml_assertions) except Exception as e: error_str = _cleanup_pydantic_error(e) lint_ctx.warn( f"Test {test_idx}: failed to validate assertions. Validation errors are [{error_str}]" )
[docs]class TestsCaseValidation(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): try: validation_results = validate_test_cases_for_tool_source(tool_source, use_latest_profile=True) except Exception as e: lint_ctx.warn( f"Serious problem parsing tool source or tests - cannot validate test cases. The exception is [{e}]", linter=cls.name(), ) return for test_idx, validation_result in enumerate(validation_results, start=1): error = validation_result.validation_error if error: error_str = _cleanup_pydantic_error(error) lint_ctx.warn( f"Test {test_idx}: failed to validate test parameters against inputs - tests won't run on a modern Galaxy tool profile version. Validation errors are [{error_str}]", linter=cls.name(), )
def _cleanup_pydantic_error(error) -> str: full_validation_error = f"{error}" new_error = StringIO("") for line in full_validation_error.splitlines(): # this repeated over and over isn't useful in the context of how we're building the dynamic models, # tool authors should not be looking up pydantic docs on models they cannot even really inspect if line.strip().startswith("For further information visit https://errors.pydantic"): continue else: new_error.write(f"{line}\n") return new_error.getvalue().strip()
[docs]class TestsExpectNumOutputs(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): # check if expect_num_outputs is set if there are outputs with filters # (except for tests with expect_failure .. which can't have test outputs) has_no_filter = ( tool_xml.find("./outputs/data/filter") is None and tool_xml.find("./outputs/collection/filter") is None ) if not ( has_no_filter or "expect_num_outputs" in test.attrib or asbool(test.attrib.get("expect_failure", False)) ): lint_ctx.warn( f"Test {test_idx}: should specify 'expect_num_outputs' if outputs have filters", linter=cls.name(), node=test, )
[docs]class TestsParamInInputs(Linter): """ really simple linter that test parameters are also present in the inputs """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for param in test.findall("param"): name = param.attrib.get("name", None) if not name: continue name = name.split("|")[-1] xpaths = [f"@name='{name}'", f"@argument='{name}'", f"@argument='-{name}'", f"@argument='--{name}'"] if "_" in name: xpaths += [f"@argument='-{name.replace('_', '-')}'", f"@argument='--{name.replace('_', '-')}'"] found = False for xp in xpaths: inxpath = f".//inputs//param[{xp}]" inparam = tool_xml.findall(inxpath) if len(inparam) > 0: found = True break if not found: lint_ctx.error( f"Test {test_idx}: Test param {name} not found in the inputs", linter=cls.name(), node=param )
[docs]class TestsOutputName(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): # note output_collections are covered by xsd, but output is not required to have one by xsd for output in test.findall("output"): if not output.attrib.get("name", None): lint_ctx.error( f"Test {test_idx}: Found {output.tag} tag without a name defined.", linter=cls.name(), node=output, )
[docs]class TestsOutputDefined(Linter):
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return output_data_or_collection = _collect_output_names(tool_xml) tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for output in test.findall("output") + test.findall("output_collection"): name = output.attrib.get("name", None) if not name: continue if name not in output_data_or_collection: lint_ctx.error( f"Test {test_idx}: Found {output.tag} tag with unknown name [{name}], valid names {list(output_data_or_collection)}", linter=cls.name(), node=output, )
[docs]class TestsOutputCorresponding(Linter): """ Linter checking if test/output corresponds to outputs/data """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return output_data_or_collection = _collect_output_names(tool_xml) tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for output in test.findall("output") + test.findall("output_collection"): name = output.attrib.get("name", None) if not name: continue if name not in output_data_or_collection: continue # - test/collection to outputs/output_collection corresponding_output = output_data_or_collection[name] if output.tag == "output" and corresponding_output.tag != "data": lint_ctx.error( f"Test {test_idx}: test output {name} does not correspond to a 'data' output, but a '{corresponding_output.tag}'", linter=cls.name(), node=output, )
[docs]class TestsOutputCollectionCorresponding(Linter): """ Linter checking if test/collection corresponds to outputs/output_collection """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return output_data_or_collection = _collect_output_names(tool_xml) tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for output in test.findall("output") + test.findall("output_collection"): name = output.attrib.get("name", None) if not name: continue if name not in output_data_or_collection: continue # - test/collection to outputs/output_collection corresponding_output = output_data_or_collection[name] if output.tag == "output_collection" and corresponding_output.tag != "collection": lint_ctx.error( f"Test {test_idx}: test collection output '{name}' does not correspond to a 'output_collection' output, but a '{corresponding_output.tag}'", linter=cls.name(), node=output, )
[docs]class TestsOutputCompareAttrib(Linter): """ Linter checking compatibility of output attributes with the value of the compare attribute """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") COMPARE_COMPATIBILITY = { "sort": ["diff", "re_match", "re_match_multiline"], "lines_diff": ["diff", "re_match", "contains"], "decompress": ["diff"], "delta": ["sim_size"], "delta_frac": ["sim_size"], "metric": ["image_diff"], "eps": ["image_diff"], } for test_idx, test in enumerate(tests, start=1): for output in test.xpath(".//*[self::output or self::element or self::discovered_dataset]"): compare = output.get("compare", "diff") for attrib in COMPARE_COMPATIBILITY: if attrib in output.attrib and compare not in COMPARE_COMPATIBILITY[attrib]: lint_ctx.error( f'Test {test_idx}: Attribute {attrib} is incompatible with compare="{compare}".', linter=cls.name(), node=output, )
[docs]class TestsOutputCheckDiscovered(Linter): """ Linter checking that discovered elements of outputs are tested with a count attribute or listing some discovered_dataset """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return output_data_or_collection = _collect_output_names(tool_xml) tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for output in test.findall("output"): name = output.attrib.get("name", None) if not name: continue if name not in output_data_or_collection: continue # - test/collection to outputs/output_collection corresponding_output = output_data_or_collection[name] discover_datasets = corresponding_output.find(".//discover_datasets") if discover_datasets is None: continue if "count" not in output.attrib and output.find("./discovered_dataset") is None: lint_ctx.error( f"Test {test_idx}: test output '{name}' must have a 'count' attribute and/or 'discovered_dataset' children", linter=cls.name(), node=output, )
[docs]class TestsOutputCollectionCheckDiscovered(Linter): """ Linter checking that discovered elements of output collections are tested with a count attribute or listing some elements """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return output_data_or_collection = _collect_output_names(tool_xml) tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for output in test.findall("output_collection"): name = output.attrib.get("name", None) if not name: continue if name not in output_data_or_collection: continue # - test/collection to outputs/output_collection corresponding_output = output_data_or_collection[name] discover_datasets = corresponding_output.find(".//discover_datasets") if discover_datasets is None: continue if "count" not in output.attrib and output.find("./element") is None: lint_ctx.error( f"Test {test_idx}: test collection '{name}' must have a 'count' attribute or 'element' children", linter=cls.name(), node=output, )
[docs]class TestsOutputCollectionCheckDiscoveredNested(Linter): """ """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return output_data_or_collection = _collect_output_names(tool_xml) tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): for output in test.findall("output_collection"): name = output.attrib.get("name", None) if not name: continue if name not in output_data_or_collection: continue # - test/collection to outputs/output_collection corresponding_output = output_data_or_collection[name] if corresponding_output.find(".//discover_datasets") is None: continue if corresponding_output.get("type", "") in ["list:list", "list:paired"]: nested_elements = output.find("./element/element") element_with_count = output.find("./element[@count]") if nested_elements is None and element_with_count is None: lint_ctx.error( f"Test {test_idx}: test collection '{name}' must contain nested 'element' tags and/or element children with a 'count' attribute", linter=cls.name(), node=output, )
[docs]class TestsOutputFailing(Linter): """ """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): if not asbool(test.attrib.get("expect_failure", False)): continue if test.find("output") is not None or test.find("output_collection") is not None: lint_ctx.error( f"Test {test_idx}: Cannot specify outputs in a test expecting failure.", linter=cls.name(), node=test, )
[docs]class TestsExpectNumOutputsFailing(Linter): """ """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in enumerate(tests, start=1): if not asbool(test.attrib.get("expect_failure", False)): continue if test.find("output") is not None or test.find("output_collection") is not None: continue if "expect_num_outputs" in test.attrib: lint_ctx.error( f"Test {test_idx}: Cannot make assumptions on the number of outputs in a test expecting failure.", linter=cls.name(), node=test, )
[docs]class TestsHasExpectations(Linter): """ """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return tests = tool_xml.findall("./tests/test") for test_idx, test in _iter_tests(tests, valid=False): lint_ctx.warn( f"Test {test_idx}: No outputs or expectations defined for tests, this test is likely invalid.", linter=cls.name(), node=test, )
[docs]class TestsNoValid(Linter): """ """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return general_node = tool_xml.find("./tests") if general_node is None: general_node = tool_xml.getroot() tests = tool_xml.findall("./tests/test") if not tests: return num_valid_tests = len(list(_iter_tests(tests, valid=True))) if num_valid_tests or is_datasource(tool_xml): lint_ctx.valid(f"{num_valid_tests} test(s) found.", linter=cls.name(), node=general_node)
[docs]class TestsValid(Linter): """ """
[docs] @classmethod def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"): tool_xml = getattr(tool_source, "xml_tree", None) if not tool_xml: return general_node = tool_xml.find("./tests") if general_node is None: general_node = tool_xml.getroot() tests = tool_xml.findall("./tests/test") if not tests: return num_valid_tests = len(list(_iter_tests(tests, valid=True))) if not (num_valid_tests or is_datasource(tool_xml)): lint_ctx.warn("No valid test(s) found.", linter=cls.name(), node=general_node)
def _iter_tests(tests: List["Element"], valid: bool) -> Iterator[Tuple[int, "Element"]]: for test_idx, test in enumerate(tests, start=1): is_valid = False is_valid |= bool(set(test.attrib) & {"expect_failure", "expect_exit_code", "expect_num_outputs"}) for ta in ("assert_stdout", "assert_stderr", "assert_command"): if test.find(ta) is not None: is_valid = True found_output_test = test.find("output") is not None or test.find("output_collection") is not None if asbool(test.attrib.get("expect_failure", False)): if found_output_test or "expect_num_outputs" in test.attrib: continue is_valid |= found_output_test if is_valid == valid: yield (test_idx, test) def _collect_output_names(tool_xml): """ determine dict mapping the names of data and collection outputs to the corresponding nodes """ output_data_or_collection = {} outputs = tool_xml.findall("./outputs") if len(outputs) == 1: for output in list(outputs[0]): name = output.attrib.get("name", None) if not name: continue output_data_or_collection[name] = output return output_data_or_collection