Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.test
import logging
import os
import os.path
from typing import (
Iterable,
List,
Tuple,
Union,
)
import galaxy.tools.parameters.basic
import galaxy.tools.parameters.grouping
from galaxy.tool_util.verify.interactor import (
InvalidToolTestDict,
ToolTestDescription,
ValidToolTestDict,
)
from galaxy.util import (
string_as_bool,
string_as_bool_or_none,
unicodify,
)
log = logging.getLogger(__name__)
[docs]def parse_tests(tool, tests_source) -> Iterable[ToolTestDescription]:
"""
Build ToolTestDescription objects for each "<test>" elements and
return default interactor (if any).
"""
raw_tests_dict = tests_source.parse_tests_to_dict()
tests: List[ToolTestDescription] = []
for i, raw_test_dict in enumerate(raw_tests_dict.get("tests", [])):
test = description_from_tool_object(tool, i, raw_test_dict)
tests.append(test)
return tests
[docs]def description_from_tool_object(tool, test_index, raw_test_dict) -> ToolTestDescription:
required_files: List[Tuple[str, dict]] = []
required_data_tables: List[str] = []
required_loc_files: List[str] = []
num_outputs = raw_test_dict.get("expect_num_outputs", None)
if num_outputs:
num_outputs = int(num_outputs)
maxseconds = raw_test_dict.get("maxseconds", None)
if maxseconds is not None:
maxseconds = int(maxseconds)
processed_test_dict: Union[ValidToolTestDict, InvalidToolTestDict]
try:
processed_inputs = _process_raw_inputs(
tool, tool.inputs, raw_test_dict["inputs"], required_files, required_data_tables, required_loc_files
)
processed_test_dict = ValidToolTestDict(
{
"inputs": processed_inputs,
"outputs": raw_test_dict["outputs"],
"output_collections": raw_test_dict["output_collections"],
"num_outputs": num_outputs,
"command_line": raw_test_dict.get("command", None),
"command_version": raw_test_dict.get("command_version", None),
"stdout": raw_test_dict.get("stdout", None),
"stderr": raw_test_dict.get("stderr", None),
"expect_exit_code": raw_test_dict.get("expect_exit_code", None),
"expect_failure": raw_test_dict.get("expect_failure", False),
"expect_test_failure": raw_test_dict.get("expect_test_failure", False),
"required_files": required_files,
"required_data_tables": required_data_tables,
"required_loc_files": required_loc_files,
"tool_id": tool.id,
"tool_version": tool.version,
"test_index": test_index,
"maxseconds": maxseconds,
"error": False,
}
)
except Exception as e:
log.exception("Failed to load tool test number [%d] for %s" % (test_index, tool.id))
processed_test_dict = InvalidToolTestDict(
{
"tool_id": tool.id,
"tool_version": tool.version,
"test_index": test_index,
"inputs": {},
"error": True,
"exception": unicodify(e),
"maxseconds": maxseconds,
}
)
return ToolTestDescription(processed_test_dict)
def _process_raw_inputs(
tool, tool_inputs, raw_inputs, required_files, required_data_tables, required_loc_files, parent_context=None
):
"""
Recursively expand flat list of inputs into "tree" form of flat list
(| using to nest to new levels) structure and expand dataset
information as proceeding to populate self.required_files.
"""
parent_context = parent_context or RootParamContext()
expanded_inputs = {}
for value in tool_inputs.values():
if isinstance(value, galaxy.tools.parameters.grouping.Conditional):
cond_context = ParamContext(name=value.name, parent_context=parent_context)
assert value.test_param
case_context = ParamContext(name=value.test_param.name, parent_context=cond_context)
raw_input_dict = case_context.extract_value(raw_inputs)
case_value = raw_input_dict["value"] if raw_input_dict else None
case = _matching_case_for_value(tool, value, case_value)
if case:
for input_name, input_value in case.inputs.items():
case_inputs = _process_raw_inputs(
tool,
{input_name: input_value},
raw_inputs,
required_files,
required_data_tables,
required_loc_files,
parent_context=cond_context,
)
expanded_inputs.update(case_inputs)
if not value.type == "text":
expanded_case_value = _split_if_str(case.value)
if case_value is not None:
# A bit tricky here - we are growing inputs with value
# that may be implicit (i.e. not defined by user just
# a default defined in tool). So we do not want to grow
# expanded_inputs and risk repeat block viewing this
# as a new instance with value defined and hence enter
# an infinite loop - hence the "case_value is not None"
# check.
processed_value = _process_simple_value(
value.test_param, expanded_case_value, required_data_tables, required_loc_files
)
expanded_inputs[case_context.for_state()] = processed_value
elif isinstance(value, galaxy.tools.parameters.grouping.Section):
context = ParamContext(name=value.name, parent_context=parent_context)
assert value.inputs
for r_value in value.inputs.values():
expanded_input = _process_raw_inputs(
tool,
{context.for_state(): r_value},
raw_inputs,
required_files,
required_data_tables,
required_loc_files,
parent_context=context,
)
if expanded_input:
expanded_inputs.update(expanded_input)
elif isinstance(value, galaxy.tools.parameters.grouping.Repeat):
repeat_index = 0
while True:
context = ParamContext(name=value.name, index=repeat_index, parent_context=parent_context)
updated = False
assert value.inputs
for r_value in value.inputs.values():
expanded_input = _process_raw_inputs(
tool,
{context.for_state(): r_value},
raw_inputs,
required_files,
required_data_tables,
required_loc_files,
parent_context=context,
)
if expanded_input:
expanded_inputs.update(expanded_input)
updated = True
if not updated:
break
repeat_index += 1
else:
context = ParamContext(name=value.name, parent_context=parent_context)
raw_input_dict = context.extract_value(raw_inputs)
if raw_input_dict:
name = raw_input_dict["name"]
param_value = raw_input_dict["value"]
param_extra = raw_input_dict["attributes"]
location = param_extra.get("location")
if not value.type == "text":
param_value = _split_if_str(param_value)
if isinstance(value, galaxy.tools.parameters.basic.DataToolParameter):
if location and value.multiple:
# We get the input/s from the location which can be a list of urls separated by commas
locations = _split_if_str(location)
param_value = []
for location in locations:
v = os.path.basename(location)
param_value.append(v)
# param_extra should contain only the corresponding location
extra = dict(param_extra)
extra["location"] = location
_add_uploaded_dataset(context.for_state(), v, extra, value, required_files)
else:
if not isinstance(param_value, list):
param_value = [param_value]
for v in param_value:
_add_uploaded_dataset(context.for_state(), v, param_extra, value, required_files)
processed_value = param_value
elif isinstance(value, galaxy.tools.parameters.basic.DataCollectionToolParameter):
assert "collection" in param_extra
collection_def = param_extra["collection"]
for input_dict in collection_def.collect_inputs():
name = input_dict["name"]
value = input_dict["value"]
attributes = input_dict["attributes"]
require_file(name, value, attributes, required_files)
processed_value = collection_def
else:
processed_value = _process_simple_value(
value, param_value, required_data_tables, required_loc_files
)
expanded_inputs[context.for_state()] = processed_value
return expanded_inputs
def _process_simple_value(param, param_value, required_data_tables, required_loc_files):
if isinstance(param, galaxy.tools.parameters.basic.SelectToolParameter):
# Tests may specify values as either raw value or the value
# as they appear in the list - the API doesn't and shouldn't
# accept the text value - so we need to convert the text
# into the form value.
def process_param_value(param_value):
found_value = False
value_for_text = None
for text, opt_value, _ in getattr(param, "static_options", []):
if param_value == opt_value:
found_value = True
if value_for_text is None and param_value == text:
value_for_text = opt_value
if param.options and not isinstance(param, galaxy.tools.parameters.basic.DrillDownSelectToolParameter):
if param.options.tool_data_table_name:
required_data_tables.append(param.options.tool_data_table_name)
elif param.options.index_file:
required_loc_files.append(param.options.index_file)
if not found_value and value_for_text is not None:
processed_value = value_for_text
else:
processed_value = param_value
return processed_value
# Do replacement described above for lists or singleton
# values.
if isinstance(param_value, list):
processed_value = list(map(process_param_value, param_value))
else:
processed_value = process_param_value(param_value)
elif isinstance(param, galaxy.tools.parameters.basic.BooleanToolParameter):
# Like above, tests may use the tool define values of simply
# true/false.
processed_value = _process_bool_param_value(param, param_value)
else:
processed_value = param_value
return processed_value
def _matching_case_for_value(tool, cond, declared_value):
test_param = cond.test_param
if isinstance(test_param, galaxy.tools.parameters.basic.BooleanToolParameter):
if declared_value is None:
# No explicit value for param in test case, determine from default
query_value = test_param.checked
else:
query_value = _process_bool_param_value(test_param, declared_value)
def matches_declared_value(case_value):
return _process_bool_param_value(test_param, case_value) == query_value
elif isinstance(test_param, galaxy.tools.parameters.basic.SelectToolParameter):
if declared_value is not None:
# Test case supplied explicit value to check against.
def matches_declared_value(case_value):
return case_value == declared_value
elif test_param.static_options:
# No explicit value in test case, not much to do if options are dynamic but
# if static options are available can find the one specified as default or
# fallback on top most option (like GUI).
for name, _, selected in test_param.static_options:
if selected:
default_option = name
else:
first_option = test_param.static_options[0]
first_option_value = first_option[1]
default_option = first_option_value
def matches_declared_value(case_value):
return case_value == default_option
else:
# No explicit value for this param and cannot determine a
# default - give up. Previously this would just result in a key
# error exception.
msg = f"Failed to find test parameter value specification required for conditional {cond.name}"
raise Exception(msg)
# Check the tool's defined cases against predicate to determine
# selected or default.
for case in cond.cases:
if matches_declared_value(case.value):
return case
else:
msg_template = "%s - Failed to find case matching value (%s) for test parameter specification for conditional %s. Remainder of test behavior is unspecified."
msg = msg_template % (tool.id, declared_value, cond.name)
log.info(msg)
def _add_uploaded_dataset(name, value, extra, input_parameter, required_files):
if value is None:
assert input_parameter.optional, f"{name} is not optional. You must provide a valid filename."
return value
return require_file(name, value, extra, required_files)
def _split_if_str(value):
split = isinstance(value, str)
if split:
value = value.split(",")
return value
def _process_bool_param_value(param, param_value):
assert isinstance(param, galaxy.tools.parameters.basic.BooleanToolParameter)
was_list = False
if isinstance(param_value, list):
was_list = True
param_value = param_value[0]
if param.truevalue == param_value:
processed_value = True
elif param.falsevalue == param_value:
processed_value = False
else:
if param.optional:
processed_value = string_as_bool_or_none(param_value)
else:
processed_value = string_as_bool(param_value)
return [processed_value] if was_list else processed_value
[docs]def require_file(name, value, extra, required_files):
if (value, extra) not in required_files:
required_files.append((value, extra)) # these files will be uploaded
name_change = [att for att in extra.get("edit_attributes", []) if att.get("type") == "name"]
if name_change:
name_change = name_change[-1].get("value") # only the last name change really matters
value = name_change # change value for select to renamed uploaded file for e.g. composite dataset
else:
for end in [".zip", ".gz"]:
if value.endswith(end):
value = value[: -len(end)]
break
value = os.path.basename(value) # if uploading a file in a path other than root of test-data
return value
[docs]class ParamContext:
[docs] def __init__(self, name, index=None, parent_context=None):
self.parent_context = parent_context
self.name = name
self.index = None if index is None else int(index)
[docs] def for_state(self):
name = self.name if self.index is None else "%s_%d" % (self.name, self.index)
if parent_for_state := self.parent_context.for_state():
return f"{parent_for_state}|{name}"
else:
return name
def __str__(self):
return f"Context[for_state={self.for_state()}]"
[docs] def param_names(self):
for parent_context_param in self.parent_context.param_names():
if self.index is not None:
yield "%s|%s_%d" % (parent_context_param, self.name, self.index)
else:
yield f"{parent_context_param}|{self.name}"
if self.index is not None:
yield "%s_%d" % (self.name, self.index)
else:
yield self.name
[docs] def extract_value(self, raw_inputs):
for param_name in self.param_names():
value = self.__raw_param_found(param_name, raw_inputs)
if value:
return value
return None
def __raw_param_found(self, param_name, raw_inputs):
index = None
for i, raw_input_dict in enumerate(raw_inputs):
if raw_input_dict["name"] == param_name:
index = i
if index is not None:
raw_input_dict = raw_inputs[index]
del raw_inputs[index]
return raw_input_dict
else:
return None