Source code for galaxy.tool_util.parser.xml
import json
import logging
import math
import os
import re
import uuid
from typing import (
Any,
cast,
Dict,
Iterable,
List,
Optional,
)
from packaging.version import Version
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parser.util import (
DEFAULT_DELTA,
DEFAULT_DELTA_FRAC,
DEFAULT_EPS,
DEFAULT_METRIC,
DEFAULT_PIN_LABELS,
)
from galaxy.util import (
Element,
ElementTree,
string_as_bool,
xml_text,
xml_to_string,
)
from .interface import (
AssertionList,
InputSource,
PageSource,
PagesSource,
RequiredFiles,
TestCollectionDef,
TestCollectionOutputDef,
ToolSource,
ToolSourceTest,
ToolSourceTests,
)
from .output_actions import ToolOutputActionGroup
from .output_collection_def import dataset_collector_descriptions_from_elem
from .output_objects import (
ChangeFormatModel,
ToolExpressionOutput,
ToolOutput,
ToolOutputCollection,
ToolOutputCollectionStructure,
)
from .stdio import (
aggressive_error_checks,
error_on_exit_code,
StdioErrorLevel,
ToolStdioExitCode,
ToolStdioRegex,
)
log = logging.getLogger(__name__)
[docs]def inject_validates(inject):
if inject == "api_key":
return True
elif inject == "entry_point_path_for_label":
return True
p = re.compile("^oidc_(id|access|refresh)_token_(.*)$")
match = p.match(inject)
return match is not None
[docs]def destroy_tree(tree):
root = tree.getroot()
node_tracker = {root: [0, None]}
for node in root.iterdescendants():
parent = node.getparent()
node_tracker[node] = [node_tracker[parent][0] + 1, parent]
node_tracker = sorted(
[(depth, parent, child) for child, (depth, parent) in node_tracker.items()], key=lambda x: x[0], reverse=True
)
for _, parent, child in node_tracker:
if parent is None:
break
parent.remove(child)
del tree
[docs]def parse_change_format(change_format: Iterable[Element]) -> List[ChangeFormatModel]:
change_models: List[ChangeFormatModel] = []
for change_elem in change_format:
for when_elem in change_elem.findall("when"):
when_elem = cast(Element, when_elem)
value: Optional[str] = when_elem.get("value", None)
format_: Optional[str] = when_elem.get("format", None)
check: Optional[str] = when_elem.get("input", None)
input_dataset: Optional[str] = None
check_attribute: Optional[str] = None
if check is not None:
if "$" not in check:
check = f"${check}"
else:
input_dataset = when_elem.get("input_dataset", None)
check_attribute = when_elem.get("attribute", None)
change_models.append(
ChangeFormatModel(
value=value,
format=format_,
input=check,
input_dataset=input_dataset,
check_attribute=check_attribute,
)
)
return change_models
[docs]class XmlToolSource(ToolSource):
"""Responsible for parsing a tool from classic Galaxy representation."""
language = "xml"
[docs] def __init__(self, xml_tree: ElementTree, source_path=None, macro_paths=None):
self.xml_tree = xml_tree
self.root = self.xml_tree.getroot()
self._source_path = source_path
self._macro_paths = macro_paths or []
self.legacy_defaults = Version(self.parse_profile()) == Version("16.01")
self._string = xml_to_string(self.root)
[docs] def parse_tool_module(self):
root = self.root
if root.find("type") is not None:
type_elem = root.find("type")
module = type_elem.get("module", "galaxy.tools")
cls = type_elem.get("class")
return module, cls
return None
[docs] def parse_action_module(self):
root = self.root
action_elem = root.find("action")
if action_elem is not None:
module = action_elem.get("module")
cls = action_elem.get("class")
return module, cls
else:
return None
[docs] def parse_tool_type(self):
root = self.root
if root.get("tool_type", None) is not None:
return root.get("tool_type")
[docs] def parse_edam_operations(self):
edam_ops = self.root.find("edam_operations")
if edam_ops is None:
return []
return [edam_op.text for edam_op in edam_ops.findall("edam_operation")]
[docs] def parse_edam_topics(self):
edam_topics = self.root.find("edam_topics")
if edam_topics is None:
return []
return [edam_topic.text for edam_topic in edam_topics.findall("edam_topic")]
[docs] def parse_xrefs(self):
xrefs = self.root.find("xrefs")
if xrefs is None:
return []
return [
dict(value=xref.text.strip(), reftype=xref.attrib["type"])
for xref in xrefs.findall("xref")
if xref.get("type")
]
[docs] def parse_display_interface(self, default):
return self._get_attribute_as_bool("display_interface", default)
[docs] def parse_require_login(self, default):
return self._get_attribute_as_bool("require_login", default)
[docs] def parse_request_param_translation_elem(self):
return self.root.find("request_param_translation")
[docs] def parse_command(self):
command_el = self._command_el
return ((command_el is not None) and command_el.text) or None
[docs] def parse_expression(self):
"""Return string containing command to run."""
expression_el = self.root.find("expression")
if expression_el is not None:
expression_type = expression_el.get("type")
if expression_type != "ecma5.1":
raise Exception(f"Unknown expression type [{expression_type}] encountered")
return expression_el.text
return None
[docs] def parse_environment_variables(self):
environment_variables_el = self.root.find("environment_variables")
if environment_variables_el is None:
return []
environment_variables = []
for environment_variable_el in environment_variables_el.findall("environment_variable"):
template = environment_variable_el.text
inject = environment_variable_el.get("inject")
if inject:
assert inject_validates(inject)
if inject == "entry_point_path_for_label":
assert (
template
), 'Environment variable value must contain entry point label when inject="entry_point_path_for_label".'
else:
assert not (template and inject), "Cannot specify inject and environment variable template."
definition = {
"name": environment_variable_el.get("name"),
"template": template,
"inject": inject,
"strip": string_as_bool(environment_variable_el.get("strip", False)),
}
environment_variables.append(definition)
return environment_variables
[docs] def parse_home_target(self):
target = "job_home" if Version(self.parse_profile()) >= Version("18.01") else "shared_home"
command_el = self._command_el
command_legacy = (command_el is not None) and command_el.get("use_shared_home", None)
if command_legacy is not None:
target = "shared_home" if string_as_bool(command_legacy) else "job_home"
return target
[docs] def parse_tmp_target(self):
# Default to not touching TMPDIR et. al. but if job_tmp is set
# in job_conf then do. This is a very conservative approach that shouldn't
# break or modify any configurations by default.
return "job_tmp_if_explicit"
[docs] def parse_interpreter(self):
interpreter = None
command_el = self._command_el
if command_el is not None:
interpreter = command_el.get("interpreter", None)
if interpreter and not self.legacy_defaults:
log.warning("Deprecated interpreter attribute on command element is now ignored.")
interpreter = None
return interpreter
[docs] def parse_version_command(self):
version_cmd = self.root.find("version_command")
if version_cmd is not None:
return version_cmd.text
else:
return None
[docs] def parse_version_command_interpreter(self):
if self.parse_version_command() is not None:
version_cmd = self.root.find("version_command")
version_cmd_interpreter = version_cmd.get("interpreter", None)
if version_cmd_interpreter:
return version_cmd_interpreter
return None
[docs] def parse_parallelism(self):
parallelism = self.root.find("parallelism")
parallelism_info = None
if parallelism is not None and parallelism.get("method"):
return ParallelismInfo(parallelism)
return parallelism_info
[docs] def parse_interactivetool(self):
interactivetool_el = self.root.find("entry_points")
rtt = []
if interactivetool_el is None:
return rtt
for ep_el in interactivetool_el.findall("entry_point"):
port = ep_el.find("port")
assert port is not None, ValueError("A port is required for InteractiveTools")
port = port.text.strip()
url = ep_el.find("url")
if url is not None:
url = url.text.strip()
name = ep_el.get("name", None)
if name:
name = name.strip()
label = ep_el.get("label", None)
if label:
label = label.strip()
requires_domain = string_as_bool(ep_el.attrib.get("requires_domain", False))
requires_path_in_url = string_as_bool(ep_el.attrib.get("requires_path_in_url", False))
requires_path_in_header_named = ep_el.get("requires_path_in_header_named", None)
if requires_path_in_header_named:
requires_path_in_header_named = requires_path_in_header_named.strip()
rtt.append(
dict(
port=port,
url=url,
name=name,
label=label,
requires_domain=requires_domain,
requires_path_in_url=requires_path_in_url,
requires_path_in_header_named=requires_path_in_header_named,
)
)
return rtt
def _get_option_value(self, key, default):
root = self.root
for option_elem in root.findall("options"):
if key in option_elem.attrib:
return string_as_bool(option_elem.get(key))
return default
@property
def _command_el(self):
return self.root.find("command")
def _get_attribute_as_bool(self, attribute, default, elem=None):
if elem is None:
elem = self.root
return string_as_bool(elem.get(attribute, default))
[docs] def parse_required_files(self) -> Optional[RequiredFiles]:
required_files = self.root.find("required_files")
if required_files is None:
return None
def parse_include_exclude_list(tag_name):
as_list = []
for ref in required_files.findall(tag_name):
path = ref.get("path")
assert path is not None, f'"path" must be specified in {tag_name}'
path_type = ref.get("type", "literal")
as_list.append({"path": path, "path_type": path_type})
return as_list
as_dict = {}
as_dict["extend_default_excludes"] = self._get_attribute_as_bool(
"extend_default_excludes", True, elem=required_files
)
as_dict["includes"] = parse_include_exclude_list("include")
as_dict["excludes"] = parse_include_exclude_list("exclude")
return RequiredFiles.from_dict(as_dict)
[docs] def parse_requirements_and_containers(self):
return requirements.parse_requirements_from_xml(self.root, parse_resources=True)
[docs] def parse_provided_metadata_style(self):
style = None
out_elem = self.root.find("outputs")
if out_elem is not None and "provided_metadata_style" in out_elem.attrib:
style = out_elem.attrib["provided_metadata_style"]
if style is None:
style = "legacy" if Version(self.parse_profile()) < Version("17.09") else "default"
assert style in ["legacy", "default"]
return style
[docs] def parse_provided_metadata_file(self):
provided_metadata_file = "galaxy.json"
out_elem = self.root.find("outputs")
if out_elem is not None and "provided_metadata_file" in out_elem.attrib:
provided_metadata_file = out_elem.attrib["provided_metadata_file"]
return provided_metadata_file
[docs] def parse_outputs(self, tool):
out_elem = self.root.find("outputs")
outputs = {}
output_collections = {}
if out_elem is None:
return outputs, output_collections
data_dict = {}
def _parse(data_elem, **kwds):
output_def = self._parse_output(data_elem, tool, **kwds)
data_dict[output_def.name] = output_def
return output_def
for _ in out_elem.findall("data"):
_parse(_)
def _parse_expression(output_elem, **kwds):
output_def = self._parse_expression_output(output_elem, tool, **kwds)
output_def.filters = output_elem.findall("filter")
data_dict[output_def.name] = output_def
return output_def
def _parse_collection(collection_elem):
name = collection_elem.get("name")
label = xml_text(collection_elem, "label")
default_format = collection_elem.get("format", "data")
collection_type = collection_elem.get("type", None)
collection_type_source = collection_elem.get("type_source", None)
collection_type_from_rules = collection_elem.get("type_from_rules", None)
structured_like = collection_elem.get("structured_like", None)
inherit_format = False
inherit_metadata = False
if structured_like:
inherit_format = string_as_bool(collection_elem.get("inherit_format", None))
inherit_metadata = string_as_bool(collection_elem.get("inherit_metadata", None))
default_format_source = collection_elem.get("format_source", None)
default_metadata_source = collection_elem.get("metadata_source", "")
filters = collection_elem.findall("filter")
dataset_collector_descriptions = None
if collection_elem.find("discover_datasets") is not None:
dataset_collector_descriptions = dataset_collector_descriptions_from_elem(collection_elem, legacy=False)
structure = ToolOutputCollectionStructure(
collection_type=collection_type,
collection_type_source=collection_type_source,
collection_type_from_rules=collection_type_from_rules,
structured_like=structured_like,
dataset_collector_descriptions=dataset_collector_descriptions,
)
output_collection = ToolOutputCollection(
name,
structure,
label=label,
filters=filters,
default_format=default_format,
inherit_format=inherit_format,
inherit_metadata=inherit_metadata,
default_format_source=default_format_source,
default_metadata_source=default_metadata_source,
)
outputs[output_collection.name] = output_collection
for data_elem in collection_elem.findall("data"):
_parse(
data_elem,
default_format=default_format,
default_format_source=default_format_source,
default_metadata_source=default_metadata_source,
)
for data_elem in collection_elem.findall("data"):
output_name = data_elem.get("name")
data = data_dict[output_name]
assert data
del data_dict[output_name]
output_collection.outputs[output_name] = data
output_collections[name] = output_collection
for out_child in out_elem:
if out_child.tag == "data":
_parse(out_child)
elif out_child.tag == "collection":
_parse_collection(out_child)
elif out_child.tag == "output":
output_type = out_child.get("type")
if output_type == "data":
_parse(out_child)
elif output_type == "collection":
out_child.attrib["type"] = out_child.get("collection_type")
out_child.attrib["type_source"] = out_child.get("collection_type_source")
_parse_collection(out_child)
else:
_parse_expression(out_child)
else:
log.warning(f"Unknown output tag encountered [{out_child.tag}]")
for output_def in data_dict.values():
outputs[output_def.name] = output_def
return outputs, output_collections
def _parse_output(
self,
data_elem,
tool,
default_format="data",
default_format_source=None,
default_metadata_source="",
expression_type=None,
):
from_expression = data_elem.get("from")
output = ToolOutput(data_elem.get("name"), from_expression=from_expression)
output_format = data_elem.get("format", default_format)
auto_format = string_as_bool(data_elem.get("auto_format", "false"))
if auto_format and output_format != "data":
raise ValueError("Setting format and auto_format is not supported at this time.")
elif auto_format:
output_format = "_sniff_"
output.format = output_format
output.change_format = parse_change_format(data_elem.findall("change_format"))
output.format_source = data_elem.get("format_source", default_format_source)
output.default_identifier_source = data_elem.get("default_identifier_source", "None")
output.metadata_source = data_elem.get("metadata_source", default_metadata_source)
output.parent = data_elem.get("parent", None)
output.label = xml_text(data_elem, "label")
output.count = int(data_elem.get("count", 1))
output.filters = data_elem.findall("filter")
output.tool = tool
output.from_work_dir = data_elem.get("from_work_dir", None)
if output.from_work_dir and Version(str(getattr(tool, "profile", 0))) < Version("21.09"):
# We started quoting from_work_dir outputs in 21.09.
# Prior to quoting, trailing spaces had no effect.
# This ensures that old tools continue to work.
output.from_work_dir = output.from_work_dir.strip()
output.hidden = string_as_bool(data_elem.get("hidden", ""))
output.actions = ToolOutputActionGroup(output, data_elem.find("actions"))
output.dataset_collector_descriptions = dataset_collector_descriptions_from_elem(
data_elem, legacy=self.legacy_defaults
)
return output
def _parse_expression_output(self, output_elem, tool, **kwds):
output_type = output_elem.get("type")
from_expression = output_elem.get("from")
output = ToolExpressionOutput(
output_elem.get("name"),
output_type,
from_expression,
)
output.path = output_elem.get("value")
output.label = xml_text(output_elem, "label")
output.hidden = string_as_bool(output_elem.get("hidden", ""))
output.actions = ToolOutputActionGroup(output, output_elem.find("actions"))
output.dataset_collector_descriptions = []
return output
[docs] def parse_stdio(self):
"""
parse error handling from command and stdio tag
returns list of exit codes, list of regexes
- exit_codes contain all non-zero exit codes (:-1 and 1:) if
detect_errors is default (if not legacy), exit_code, or aggressive
- the oom_exit_code if given and detect_errors is exit_code
- exit codes and regexes from the stdio tag
these are prepended to the list, i.e. are evaluated prior to regexes
and exit codes derived from the properties of the command tag.
thus more specific regexes of the same or more severe error level
are triggered first.
"""
command_el = self._command_el
detect_errors = None
if command_el is not None:
detect_errors = command_el.get("detect_errors")
if detect_errors and detect_errors != "default":
if detect_errors == "exit_code":
oom_exit_code = None
if command_el is not None:
oom_exit_code = command_el.get("oom_exit_code", None)
if oom_exit_code is not None:
int(oom_exit_code)
exit_codes, regexes = error_on_exit_code(out_of_memory_exit_code=oom_exit_code)
elif detect_errors == "aggressive":
exit_codes, regexes = aggressive_error_checks()
else:
raise ValueError(f"Unknown detect_errors value encountered [{detect_errors}]")
elif len(self.root.findall("stdio")) == 0 and not self.legacy_defaults:
exit_codes, regexes = error_on_exit_code()
else:
exit_codes = []
regexes = []
if len(self.root.findall("stdio")) > 0:
parser = StdioParser(self.root)
exit_codes = parser.stdio_exit_codes + exit_codes
regexes = parser.stdio_regexes + regexes
return exit_codes, regexes
[docs] def parse_strict_shell(self):
command_el = self._command_el
if Version(self.parse_profile()) < Version("20.09"):
default = "False"
else:
default = "True"
if command_el is not None:
return string_as_bool(command_el.get("strict", default))
else:
return string_as_bool(default)
[docs] def parse_help(self):
help_elem = self.root.find("help")
return help_elem.text if help_elem is not None else None
@property
def macro_paths(self):
return self._macro_paths
@property
def source_path(self):
return self._source_path
[docs] def parse_tests_to_dict(self) -> ToolSourceTests:
tests_elem = self.root.find("tests")
tests: List[ToolSourceTest] = []
rval: ToolSourceTests = dict(tests=tests)
if tests_elem is not None:
for i, test_elem in enumerate(tests_elem.findall("test")):
profile = self.parse_profile()
tests.append(_test_elem_to_dict(test_elem, i, profile))
return rval
[docs] def parse_profile(self) -> str:
# Pre-16.04 or default XML defaults
# - Use standard error for error detection.
# - Don't run shells with -e
# - Auto-check for implicit multiple outputs.
# - Auto-check for $param_file.
# - Enable buggy interpreter attribute.
return self.root.get("profile", "16.01")
[docs] def parse_python_template_version(self):
python_template_version = self.root.get("python_template_version")
if python_template_version is not None:
python_template_version = Version(python_template_version)
return python_template_version
[docs] def parse_creator(self):
creators_el = self.root.find("creator")
if creators_el is None:
return None
creators = []
for creator_el in creators_el:
creator_as_dict = {}
if creator_el.tag == "person":
clazz = "Person"
elif creator_el.tag == "organization":
clazz = "Organization"
else:
continue
creator_as_dict["class"] = clazz
creator_as_dict.update(creator_el.attrib)
creators.append(creator_as_dict)
return creators
def _test_elem_to_dict(test_elem, i, profile=None) -> ToolSourceTest:
rval: ToolSourceTest = dict(
outputs=__parse_output_elems(test_elem),
output_collections=__parse_output_collection_elems(test_elem, profile=profile),
inputs=__parse_input_elems(test_elem, i),
expect_num_outputs=test_elem.get("expect_num_outputs"),
command=__parse_assert_list_from_elem(test_elem.find("assert_command")),
command_version=__parse_assert_list_from_elem(test_elem.find("assert_command_version")),
stdout=__parse_assert_list_from_elem(test_elem.find("assert_stdout")),
stderr=__parse_assert_list_from_elem(test_elem.find("assert_stderr")),
expect_exit_code=test_elem.get("expect_exit_code"),
expect_failure=string_as_bool(test_elem.get("expect_failure", False)),
expect_test_failure=string_as_bool(test_elem.get("expect_test_failure", False)),
maxseconds=test_elem.get("maxseconds", None),
)
_copy_to_dict_if_present(test_elem, rval, ["num_outputs"])
return rval
def __parse_input_elems(test_elem, i):
__expand_input_elems(test_elem)
return __parse_inputs_elems(test_elem, i)
def __parse_output_elems(test_elem):
outputs = []
for output_elem in test_elem.findall("output"):
name, file, attributes = __parse_output_elem(output_elem)
outputs.append({"name": name, "value": file, "attributes": attributes})
return outputs
def __parse_output_elem(output_elem):
attrib = dict(output_elem.attrib)
name = attrib.pop("name", None)
if name is None:
raise Exception("Test output does not have a 'name'")
file, attributes = __parse_test_attributes(output_elem, attrib, parse_discovered_datasets=True)
return name, file, attributes
def __parse_command_elem(test_elem):
assert_elem = test_elem.find("command")
return __parse_assert_list_from_elem(assert_elem)
def __parse_output_collection_elems(test_elem, profile=None):
output_collections = []
for output_collection_elem in test_elem.findall("output_collection"):
output_collection_def = __parse_output_collection_elem(output_collection_elem, profile=profile)
output_collections.append(output_collection_def)
return output_collections
def __parse_output_collection_elem(output_collection_elem, profile=None):
attrib = dict(output_collection_elem.attrib)
name = attrib.pop("name", None)
if name is None:
raise Exception("Test output collection does not have a 'name'")
element_tests = __parse_element_tests(output_collection_elem, profile=profile)
return TestCollectionOutputDef(name, attrib, element_tests).to_dict()
def __parse_element_tests(parent_element, profile=None):
element_tests = {}
for idx, element in enumerate(parent_element.findall("element")):
element_attrib = dict(element.attrib)
identifier = element_attrib.pop("name", None)
if identifier is None:
raise Exception("Test primary dataset does not have a 'identifier'")
element_tests[identifier] = __parse_test_attributes(
element, element_attrib, parse_elements=True, profile=profile
)
if profile and Version(profile) >= Version("20.09"):
element_tests[identifier][1]["expected_sort_order"] = idx
return element_tests
def __parse_test_attributes(output_elem, attrib, parse_elements=False, parse_discovered_datasets=False, profile=None):
assert_list = __parse_assert_list(output_elem)
# Allow either file or value to specify a target file to compare result with
# file was traditionally used by outputs and value by extra files.
file = attrib.pop("file", attrib.pop("value", None))
# File no longer required if an list of assertions was present.
attributes = {}
if "value_json" in attrib:
attributes["object"] = json.loads(attrib.pop("value_json"))
# Method of comparison
attributes["compare"] = attrib.pop("compare", "diff").lower()
# Number of lines to allow to vary in logs (for dates, etc)
attributes["lines_diff"] = int(attrib.pop("lines_diff", "0"))
# Allow a file size to vary if sim_size compare
attributes["delta"] = int(attrib.pop("delta", DEFAULT_DELTA))
attributes["delta_frac"] = float(attrib["delta_frac"]) if "delta_frac" in attrib else DEFAULT_DELTA_FRAC
attributes["sort"] = string_as_bool(attrib.pop("sort", False))
attributes["decompress"] = string_as_bool(attrib.pop("decompress", False))
# `location` may contain an URL to a remote file that will be used to download `file` (if not already present on disk).
location = attrib.get("location")
# Parameters for "image_diff" comparison
attributes["metric"] = attrib.pop("metric", DEFAULT_METRIC)
attributes["eps"] = float(attrib.pop("eps", DEFAULT_EPS))
attributes["pin_labels"] = attrib.pop("pin_labels", DEFAULT_PIN_LABELS)
if location and file is None:
file = os.path.basename(location) # If no file specified, try to get filename from URL last component
attributes["location"] = location
try:
attributes["count"] = int(attrib.pop("count"))
except KeyError:
attributes["count"] = None
extra_files = []
if "ftype" in attrib:
attributes["ftype"] = attrib["ftype"]
for extra in output_elem.findall("extra_files"):
extra_files.append(__parse_extra_files_elem(extra))
metadata = {}
for metadata_elem in output_elem.findall("metadata"):
metadata[metadata_elem.get("name")] = metadata_elem.get("value")
md5sum = attrib.get("md5", None)
checksum = attrib.get("checksum", None)
element_tests = {}
if parse_elements:
element_tests = __parse_element_tests(output_elem, profile=profile)
primary_datasets = {}
if parse_discovered_datasets:
for primary_elem in output_elem.findall("discovered_dataset") or []:
primary_attrib = dict(primary_elem.attrib)
designation = primary_attrib.pop("designation", None)
if designation is None:
raise Exception("Test primary dataset does not have a 'designation'")
primary_datasets[designation] = __parse_test_attributes(primary_elem, primary_attrib)
has_checksum = md5sum or checksum
has_nested_tests = extra_files or element_tests or primary_datasets
has_object = "object" in attributes
if not (assert_list or file or metadata or has_checksum or has_nested_tests or has_object):
raise Exception(
"Test output defines nothing to check (e.g. must have a 'file' check against, assertions to check, metadata or checksum tests, etc...)"
)
attributes["assert_list"] = assert_list
attributes["extra_files"] = extra_files
attributes["metadata"] = metadata
attributes["md5"] = md5sum
attributes["checksum"] = checksum
attributes["elements"] = element_tests
attributes["primary_datasets"] = primary_datasets
return file, attributes
def __parse_assert_list(output_elem):
assert_elem = output_elem.find("assert_contents")
return __parse_assert_list_from_elem(assert_elem)
def __parse_assert_list_from_elem(assert_elem) -> AssertionList:
assert_list = None
def convert_elem(elem):
"""Converts and XML element to a dictionary format, used by assertion checking code."""
tag = elem.tag
attributes = dict(elem.attrib)
converted_children = []
for child_elem in elem:
converted_children.append(convert_elem(child_elem))
return {"tag": tag, "attributes": attributes, "children": converted_children}
if assert_elem is not None:
assert_list = []
for assert_child in list(assert_elem):
assert_list.append(convert_elem(assert_child))
return assert_list
def __parse_extra_files_elem(extra):
# File or directory, when directory, compare basename
# by basename
attrib = dict(extra.attrib)
extra_type = attrib.pop("type", "file")
extra_name = attrib.pop("name", None)
assert (
extra_type == "directory" or extra_name is not None
), f"extra_files type ({extra_type}) requires a name attribute"
extra_value, extra_attributes = __parse_test_attributes(extra, attrib)
return {"value": extra_value, "name": extra_name, "type": extra_type, "attributes": extra_attributes}
def __expand_input_elems(root_elem, prefix=""):
__append_prefix_to_params(root_elem, prefix)
repeat_elems = root_elem.findall("repeat")
indices = {}
for repeat_elem in repeat_elems:
name = repeat_elem.get("name")
if name not in indices:
indices[name] = 0
index = 0
else:
index = indices[name] + 1
indices[name] = index
new_prefix = __prefix_join(prefix, name, index=index)
__expand_input_elems(repeat_elem, new_prefix)
__pull_up_params(root_elem, repeat_elem)
cond_elems = root_elem.findall("conditional")
for cond_elem in cond_elems:
new_prefix = __prefix_join(prefix, cond_elem.get("name"))
__expand_input_elems(cond_elem, new_prefix)
__pull_up_params(root_elem, cond_elem)
section_elems = root_elem.findall("section")
for section_elem in section_elems:
new_prefix = __prefix_join(prefix, section_elem.get("name"))
__expand_input_elems(section_elem, new_prefix)
__pull_up_params(root_elem, section_elem)
def __append_prefix_to_params(elem, prefix):
for param_elem in elem.findall("param"):
param_elem.set("name", __prefix_join(prefix, param_elem.get("name")))
def __pull_up_params(parent_elem, child_elem):
for param_elem in child_elem.findall("param"):
parent_elem.append(param_elem)
def __prefix_join(prefix, name, index=None):
name = name if index is None else "%s_%d" % (name, index)
return name if not prefix else f"{prefix}|{name}"
def _copy_to_dict_if_present(elem, rval, attributes):
for attribute in attributes:
if attribute in elem.attrib:
rval[attribute] = elem.get(attribute)
return rval
def __parse_inputs_elems(test_elem, i):
raw_inputs = []
for param_elem in test_elem.findall("param"):
raw_inputs.append(__parse_param_elem(param_elem, i))
return raw_inputs
def __parse_param_elem(param_elem, i=0):
attrib = dict(param_elem.attrib)
if "values" in attrib:
value = attrib["values"].split(",")
elif "value" in attrib:
value = attrib["value"]
elif "value_json" in attrib:
value = json.loads(attrib["value_json"])
else:
value = None
if value is None and attrib.get("location", None) is not None:
value = os.path.basename(attrib["location"])
children_elem = param_elem
if children_elem is not None:
# At this time, we can assume having children only
# occurs on DataToolParameter test items but this could
# change and would cause the below parsing to change
# based upon differences in children items
attrib["metadata"] = {}
attrib["composite_data"] = []
attrib["edit_attributes"] = []
# Composite datasets need to be renamed uniquely
composite_data_name = None
for child in children_elem:
if child.tag == "composite_data":
file_name = child.get("value")
attrib["composite_data"].append(file_name)
if composite_data_name is None:
# Generate a unique name; each test uses a
# fresh history.
composite_data_name = "_COMPOSITE_RENAMED_t%d_%s" % (i, uuid.uuid1().hex)
elif child.tag == "metadata":
attrib["metadata"][child.get("name")] = child.get("value")
elif child.tag == "edit_attributes":
attrib["edit_attributes"].append(child)
elif child.tag == "collection":
attrib["collection"] = TestCollectionDef.from_xml(child, __parse_param_elem)
if composite_data_name:
# Composite datasets need implicit renaming;
# inserted at front of list so explicit declarations
# take precedence
attrib["edit_attributes"].insert(0, {"type": "name", "value": composite_data_name})
name = attrib.pop("name")
return {"name": name, "value": value, "attributes": attrib}
[docs]class StdioParser:
[docs] def __init__(self, root):
try:
self.stdio_exit_codes = []
self.stdio_regexes = []
# We should have a single <stdio> element, but handle the case for
# multiples.
# For every stdio element, add all of the exit_code and regex
# subelements that we find:
for stdio_elem in root.findall("stdio"):
self.parse_stdio_exit_codes(stdio_elem)
self.parse_stdio_regexes(stdio_elem)
except Exception:
log.exception("Exception in parse_stdio!")
[docs] def parse_stdio_exit_codes(self, stdio_elem):
"""
Parse the tool's <stdio> element's <exit_code> subelements.
This will add all of those elements, if any, to self.stdio_exit_codes.
"""
try:
# Look for all <exit_code> elements. Each exit_code element must
# have a range/value.
# Exit-code ranges have precedence over a single exit code.
# So if there are value and range attributes, we use the range
# attribute. If there is neither a range nor a value, then print
# a warning and skip to the next.
for exit_code_elem in stdio_elem.findall("exit_code"):
exit_code = ToolStdioExitCode()
# Each exit code has an optional description that can be
# part of the "desc" or "description" attributes:
exit_code.desc = exit_code_elem.get("desc")
if exit_code.desc is None:
exit_code.desc = exit_code_elem.get("description")
# Parse the error level:
exit_code.error_level = self.parse_error_level(exit_code_elem.get("level"))
code_range = exit_code_elem.get("range")
if code_range is None:
code_range = exit_code_elem.get("value")
if code_range is None:
log.warning("Tool stdio exit codes must have a range or value")
continue
# Parse the range. We look for:
# :Y
# X:
# X:Y - Split on the colon. We do not allow a colon
# without a beginning or end, though we could.
# Also note that whitespace is eliminated.
# TODO: Turn this into a single match - it should be
# more efficient.
code_range = re.sub(r"\s", "", code_range)
code_ranges = re.split(r":", code_range)
if len(code_ranges) == 2:
if code_ranges[0] is None or "" == code_ranges[0]:
exit_code.range_start = -math.inf
else:
exit_code.range_start = int(code_ranges[0])
if code_ranges[1] is None or "" == code_ranges[1]:
exit_code.range_end = math.inf
else:
exit_code.range_end = int(code_ranges[1])
# If we got more than one colon, then ignore the exit code.
elif len(code_ranges) > 2:
log.warning(f"Invalid tool exit_code range {code_range} - ignored")
continue
# Else we have a singular value. If it's not an integer, then
# we'll just write a log message and skip this exit_code.
else:
try:
exit_code.range_start = int(code_range)
except Exception:
log.error(code_range)
log.warning(f"Invalid range start for tool's exit_code {code_range}: exit_code ignored")
continue
exit_code.range_end = exit_code.range_start
# TODO: Check if we got ">", ">=", "<", or "<=":
# Check that the range, regardless of how we got it,
# isn't bogus. If we have two infinite values, then
# the start must be -inf and the end must be +inf.
# So at least warn about this situation:
if math.isinf(exit_code.range_start) and math.isinf(exit_code.range_end):
log.warning(f"Tool exit_code range {code_range} will match on all exit codes")
self.stdio_exit_codes.append(exit_code)
except Exception:
log.exception("Exception in parse_stdio_exit_codes!")
[docs] def parse_stdio_regexes(self, stdio_elem):
"""
Look in the tool's <stdio> elem for all <regex> subelements
that define how to look for warnings and fatal errors in
stdout and stderr. This will add all such regex elements
to the Tols's stdio_regexes list.
"""
try:
# Look for every <regex> subelement. The regular expression
# will have "match" and "source" (or "src") attributes.
for regex_elem in stdio_elem.findall("regex"):
# TODO: Fill in ToolStdioRegex
regex = ToolStdioRegex()
# Each regex has an optional description that can be
# part of the "desc" or "description" attributes:
regex.desc = regex_elem.get("desc")
if regex.desc is None:
regex.desc = regex_elem.get("description")
# Parse the error level
regex.error_level = self.parse_error_level(regex_elem.get("level"))
regex.match = regex_elem.get("match")
if regex.match is None:
log.warning(
f"Ignoring tool's stdio regex element with attributes {regex_elem.attrib} - "
"the 'match' attribute must exist"
)
continue
# Parse the output sources. We look for the "src", "source",
# and "sources" attributes, in that order. If there is no
# such source, then the source defaults to stderr & stdout.
# Look for a comma and then look for "err", "error", "out",
# and "output":
output_srcs = regex_elem.get("src")
if output_srcs is None:
output_srcs = regex_elem.get("source")
if output_srcs is None:
output_srcs = regex_elem.get("sources")
if output_srcs is None:
output_srcs = "output,error"
output_srcs = re.sub(r"\s", "", output_srcs)
src_list = re.split(r",", output_srcs)
# Just put together anything to do with "out", including
# "stdout", "output", etc. Repeat for "stderr", "error",
# and anything to do with "err". If neither stdout nor
# stderr were specified, then raise a warning and scan both.
for src in src_list:
if re.search("both", src, re.IGNORECASE):
regex.stdout_match = True
regex.stderr_match = True
if re.search("out", src, re.IGNORECASE):
regex.stdout_match = True
if re.search("err", src, re.IGNORECASE):
regex.stderr_match = True
if not regex.stdout_match and not regex.stderr_match:
log.warning(
"Tool id %s: unable to determine if tool "
"stream source scanning is output, error, "
"or both. Defaulting to use both.",
self.id,
)
regex.stdout_match = True
regex.stderr_match = True
self.stdio_regexes.append(regex)
except Exception:
log.exception("Exception in parse_stdio_exit_codes!")
# TODO: This method doesn't have to be part of the Tool class.
[docs] def parse_error_level(self, err_level):
"""
Parses error level and returns error level enumeration. If
unparsable, returns 'fatal'
"""
return_level = StdioErrorLevel.FATAL
try:
if err_level:
if re.search("log", err_level, re.IGNORECASE):
return_level = StdioErrorLevel.LOG
elif re.search("qc", err_level, re.IGNORECASE):
return_level = StdioErrorLevel.QC
elif re.search("warning", err_level, re.IGNORECASE):
return_level = StdioErrorLevel.WARNING
elif re.search("fatal_oom", err_level, re.IGNORECASE):
return_level = StdioErrorLevel.FATAL_OOM
elif re.search("fatal", err_level, re.IGNORECASE):
return_level = StdioErrorLevel.FATAL
else:
log.debug(f"Tool {self.id}: error level {err_level} did not match log/warning/fatal")
except Exception:
log.exception("Exception in parse_error_level")
return return_level
[docs]class XmlPagesSource(PagesSource):
[docs] def __init__(self, root):
self.input_elem = root.find("inputs")
page_sources = []
if self.input_elem is not None:
pages_elem = self.input_elem.findall("page")
for page in pages_elem or [self.input_elem]:
page_sources.append(XmlPageSource(page))
super().__init__(page_sources)
@property
def inputs_defined(self):
return self.input_elem is not None
[docs]class XmlPageSource(PageSource):
[docs] def parse_display(self):
display_elem = self.parent_elem.find("display")
if display_elem is not None:
display = xml_to_string(display_elem)
else:
display = None
return display
[docs]class XmlInputSource(InputSource):
[docs] def __init__(self, input_elem):
self.input_elem = input_elem
self.input_type = self.input_elem.tag
[docs] def parse_dynamic_options_elem(self):
"""Return a galaxy.tools.parameters.dynamic_options.DynamicOptions
if appropriate.
"""
options_elem = self.input_elem.find("options")
return options_elem
[docs] def parse_static_options(self):
"""
>>> from galaxy.util import parse_xml_string_to_etree
>>> xml = '<param><option value="a">A</option><option value="b">B</option></param>'
>>> xis = XmlInputSource(parse_xml_string_to_etree(xml).getroot())
>>> xis.parse_static_options()
[('A', 'a', False), ('B', 'b', False)]
>>> xml = '<param><option value="a"/><option value="b"/><option value="a" selected="true"/></param>'
>>> xis = XmlInputSource(parse_xml_string_to_etree(xml).getroot())
>>> xis.parse_static_options()
[('a', 'a', True), ('b', 'b', False)]
"""
deduplicated_static_options = {}
elem = self.input_elem
for option in elem.findall("option"):
value = option.get("value")
text = option.text or value
selected = string_as_bool(option.get("selected", False))
deduplicated_static_options[value] = (text, value, selected)
return list(deduplicated_static_options.values())
[docs] def parse_optional(self, default=None):
"""Return boolean indicating whether parameter is optional."""
elem = self.input_elem
if self.get("type") == "data_column":
# Allow specifing force_select for backward compat., but probably
# should use optional going forward for consistency with other
# parameters.
if "force_select" in elem.attrib:
force_select = string_as_bool(elem.get("force_select"))
else:
force_select = not string_as_bool(elem.get("optional", False))
return not force_select
if default is None:
default = self.default_optional
return self.get_bool("optional", default)
[docs] def parse_conversion_tuples(self):
elem = self.input_elem
conversions = []
for conv_elem in elem.findall("conversion"):
name = conv_elem.get("name") # name for commandline substitution
conv_extensions = conv_elem.get("type") # target datatype extension
conversions.append((name, conv_extensions))
return conversions
[docs] def parse_test_input_source(self):
elem = self.input_elem
input_elem = elem.find("param")
assert input_elem is not None, "<conditional> must have a child <param>"
return XmlInputSource(input_elem)
[docs] def parse_when_input_sources(self):
elem = self.input_elem
sources = []
for case_elem in elem.findall("when"):
value = case_elem.get("value")
case_page_source = XmlPageSource(case_elem)
sources.append((value, case_page_source))
return sources
[docs] def parse_default(self) -> Optional[Dict[str, Any]]:
def file_default_from_elem(elem):
# TODO: hashes, created_from_basename, etc...
return {"class": "File", "location": elem.get("location")}
def read_elements(collection_elem):
element_dicts = []
elements = collection_elem.findall("element")
for element in elements:
identifier = element.get("name")
subcollection_elem = element.find("collection")
if subcollection_elem:
collection_type = subcollection_elem.get("collection_type")
element_dicts.append(
{
"class": "Collection",
"identifier": identifier,
"collection_type": collection_type,
"elements": read_elements(subcollection_elem),
}
)
else:
element_dict = file_default_from_elem(element)
element_dict["identifier"] = identifier
element_dicts.append(element_dict)
return element_dicts
elem = self.input_elem
element_type = self.input_elem.get("type")
if element_type == "data":
default_elem = elem.find("default")
if default_elem is not None:
return file_default_from_elem(default_elem)
else:
return None
else:
default_elem = elem.find("default")
if default_elem is not None:
default_elem = elem.find("default")
collection_type = default_elem.get("collection_type")
name = default_elem.get("name", elem.get("name"))
return {
"class": "Collection",
"name": name,
"collection_type": collection_type,
"elements": read_elements(default_elem),
}
else:
return None
[docs]class ParallelismInfo:
"""
Stores the information (if any) for running multiple instances of the tool in parallel
on the same set of inputs.
"""
[docs] def __init__(self, tag):
self.method = tag.get("method")
if isinstance(tag, dict):
items = tag.items()
else:
items = tag.attrib.items()
self.attributes = dict([item for item in items if item[0] != "method"])
if len(self.attributes) == 0:
# legacy basic mode - provide compatible defaults
self.attributes["split_size"] = 20
self.attributes["split_mode"] = "number_of_parts"