Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.parser.yaml
import json
from collections.abc import MutableMapping
from copy import deepcopy
from typing import (
Any,
cast,
Dict,
List,
Optional,
Tuple,
Union,
)
import packaging.version
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parameters.convert import _select_which_when
from galaxy.tool_util.parameters.factory import input_models_for_tool_source
from galaxy.tool_util.parameters.state import TestCaseJsonToolState
from galaxy.tool_util.parameters.visitor import validate_explicit_conditional_test_value
from galaxy.tool_util.parser.util import (
DEFAULT_DECOMPRESS,
DEFAULT_DELTA,
DEFAULT_DELTA_FRAC,
DEFAULT_SORT,
)
from galaxy.tool_util_models.parameter_validators import AnyValidatorModel
from galaxy.tool_util_models.parameters import (
DiscriminatorType,
ToolParameterBundle,
ToolParameterBundleModel,
ToolParameterT,
)
from galaxy.tool_util_models.tool_source import (
HelpContent,
JsonTestCollectionDefDict,
XrefDict,
YamlTemplateConfigFile,
)
from galaxy.util import listify
from .interface import (
AssertionDict,
AssertionList,
InputSource,
PageSource,
PagesSource,
ToolSource,
ToolSourceTest,
ToolSourceTestInputs,
ToolSourceTests,
)
from .output_actions import ToolOutputActionApp
from .output_collection_def import dataset_collector_descriptions_from_output_dict
from .output_objects import (
ToolOutput,
ToolOutputCollection,
ToolOutputCollectionStructure,
)
from .parameter_validators import parse_dict_validators
from .stdio import error_on_exit_code
from .util import is_dict
[docs]
class YamlToolSource(ToolSource):
language = "yaml"
[docs]
def __init__(self, root_dict: Dict, source_path=None):
self.root_dict = root_dict
self._source_path = source_path
self._macro_paths: List[str] = []
@property
def source_path(self):
return self._source_path
[docs]
def parse_tool_module(self) -> Optional[Tuple[str, str]]:
# This should not be settable for user defined tools - placing this here to
# ensure this. If we want to implement tool modules for YAML tools in the future
# ensure class is not GalaxyUserTool.
return None
[docs]
def parse_version(self) -> Optional[str]:
version_raw = self.root_dict.get("version")
return str(version_raw) if version_raw is not None else None
[docs]
def parse_name(self) -> str:
rval = self.root_dict.get("name") or self.parse_id()
assert rval
return str(rval)
[docs]
def parse_icon(self) -> Optional[str]:
icon_elem = self.root_dict.get("icon", {})
return icon_elem.get("src") if icon_elem is not None else None
[docs]
def parse_edam_operations(self) -> List[str]:
return self.root_dict.get("edam_operations") or []
[docs]
def parse_xrefs(self) -> List[XrefDict]:
xrefs = self.root_dict.get("xrefs") or []
return [XrefDict(value=xref["value"], type=xref["type"]) for xref in xrefs if xref["type"]]
[docs]
def parse_display_interface(self, default):
return self.root_dict.get("display_interface", default)
[docs]
def parse_base_command(self) -> Optional[List[str]]:
"""Return string containing script entrypoint."""
return listify(self.root_dict.get("base_command"))
[docs]
def parse_template_configfiles(self):
return [YamlTemplateConfigFile(**config) for config in self.root_dict.get("configfiles") or []]
[docs]
def parse_version_command(self):
return self.root_dict.get("runtime_version", {}).get("command", None)
[docs]
def parse_version_command_interpreter(self):
return self.root_dict.get("runtime_version", {}).get("interpreter", None)
[docs]
def parse_requirements(self):
mixed_requirements = self.root_dict.get("requirements", [])
container = self.root_dict.get("container")
containers = self.root_dict.get("containers")
if container:
if isinstance(container, str):
container = {"identifier": container, "type": "docker", "explicit": True}
containers = [container]
elif containers:
containers = containers
else:
containers = []
return requirements.parse_requirements_from_lists(
software_requirements=[r for r in mixed_requirements if r.get("type") == "package"],
containers=containers,
resource_requirements=[r for r in mixed_requirements if r.get("type") == "resource"],
javascript_requirements=[r for r in mixed_requirements if r.get("type") == "javascript"],
credentials=self.root_dict.get("credentials", []),
)
[docs]
def parse_input_pages(self) -> PagesSource:
# All YAML tools have only one page (feature is deprecated)
page_source = YamlPageSource(self.root_dict.get("inputs", {}))
return PagesSource([page_source], "cwl")
[docs]
def parse_help(self) -> Optional[HelpContent]:
help = self.root_dict.get("help")
format = "markdown"
if isinstance(help, dict):
format = help.get("format", "markdown")
if isinstance(help, str):
return HelpContent(format=format, content=help)
elif help and "content" in help:
return HelpContent(format=format, content=help["content"])
else:
return None
[docs]
def parse_outputs(self, app: Optional[ToolOutputActionApp]):
outputs = deepcopy(self.root_dict.get("outputs", []))
if isinstance(outputs, MutableMapping):
for name, output_dict in outputs.items():
output_dict["name"] = name
outputs = outputs.values()
output_defs = []
output_collection_defs = []
for output_dict in outputs:
output_type = output_dict.get("type", "data")
name = output_dict["name"]
if output_type == "data":
output_defs.append(self._parse_output(app, name, output_dict))
elif output_type == "collection":
output_collection_defs.append(self._parse_output_collection(app, name, output_dict))
else:
message = f"Unknown output_type [{output_type}] encountered."
raise Exception(message)
outputs = {}
for output in output_defs:
outputs[output.name] = output
output_collections = {}
for output in output_collection_defs:
output_collections[output.name] = output
outputs[output.name] = output
return outputs, output_collections
def _parse_output(self, app, name, output_dict):
output = ToolOutput.from_dict(name, output_dict, app=app)
return output
def _parse_output_collection(self, tool, name, output_dict):
name = output_dict.get("name")
label = output_dict.get("label")
default_format = output_dict.get("format", "data")
collection_type = output_dict.get("collection_type", None)
collection_type_source = output_dict.get("type_source", None)
structured_like = output_dict.get("structured_like", None)
inherit_format = False
inherit_metadata = False
if structured_like:
inherit_format = output_dict.get("inherit_format", None)
inherit_metadata = output_dict.get("inherit_metadata", None)
default_format_source = output_dict.get("format_source", None)
default_metadata_source = output_dict.get("metadata_source", None)
filters = []
dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict)
structure = ToolOutputCollectionStructure(
collection_type=collection_type,
collection_type_source=collection_type_source,
structured_like=structured_like,
dataset_collector_descriptions=dataset_collector_descriptions,
)
output_collection = ToolOutputCollection(
name,
structure,
label=label,
filters=filters,
default_format=default_format,
inherit_format=inherit_format,
inherit_metadata=inherit_metadata,
default_format_source=default_format_source,
default_metadata_source=default_metadata_source,
)
return output_collection
[docs]
def parse_tests_to_dict(self) -> ToolSourceTests:
tests: List[ToolSourceTest] = []
rval: ToolSourceTests = dict(tests=tests)
raw_tests = deepcopy(self.root_dict.get("tests", []))
for i, test_dict in enumerate(raw_tests):
inputs = test_dict.get("inputs", {})
state = TestCaseJsonToolState(inputs)
parameters = self._parse_parameters()
state.validate(parameters, name=f"test case json {i}")
flat_inputs: Dict[str, Any] = {}
self._flatten_parameters(inputs, parameters, flat_inputs=flat_inputs)
test_dict["inputs"] = flat_inputs
parsed_test = _parse_test(i, test_dict)
tests.append(parsed_test)
return rval
def _flatten_parameters(
self, test_dict: Dict[str, Any], input_models: ToolParameterBundle, flat_inputs, prefix=None
):
for parameter in input_models.parameters:
self._flatten_parameter(test_dict, parameter, flat_inputs, prefix=prefix)
def _flatten_parameter(self, test_dict: Dict[str, Any], parameter: ToolParameterT, flat_inputs, prefix=None):
name = parameter.name
if prefix:
flat_name = f"{prefix}|{name}"
else:
flat_name = name
if parameter.parameter_type == "gx_conditional":
if name not in test_dict:
test_dict[name] = {}
raw_conditional_state = test_dict[name]
assert isinstance(raw_conditional_state, dict)
conditional_state = cast(Dict[str, Any], raw_conditional_state)
test_parameter = parameter.test_parameter
test_parameter_name = test_parameter.name
explicit_test_value: Optional[DiscriminatorType] = (
conditional_state[test_parameter_name] if test_parameter_name in conditional_state else None
)
test_value = validate_explicit_conditional_test_value(test_parameter_name, explicit_test_value)
when = _select_which_when(parameter, test_value, conditional_state)
self._flatten_parameter(conditional_state, test_parameter, flat_inputs, prefix=flat_name)
self._flatten_parameters(conditional_state, when, flat_inputs, prefix=flat_name)
elif parameter.parameter_type == "gx_repeat":
if name not in test_dict:
test_dict[name] = []
repeat_instances = cast(List[Dict[str, Any]], test_dict[name])
if parameter.min:
while len(repeat_instances) < parameter.min:
repeat_instances.append({})
for i, instance_state in enumerate(repeat_instances):
if prefix:
instance_prefix = f"{prefix}|{name}_{i}"
else:
instance_prefix = f"{name}_{i}"
self._flatten_parameters(instance_state, parameter, flat_inputs, prefix=instance_prefix)
else:
if name in test_dict:
flat_inputs[flat_name] = test_dict[name]
def _parse_parameters(self) -> ToolParameterBundleModel:
parameter_bundle = input_models_for_tool_source(self)
return parameter_bundle
[docs]
def parse_python_template_version(self):
python_template_version = self.root_dict.get("python_template_version")
if python_template_version is not None:
python_template_version = packaging.version.Version(python_template_version)
return python_template_version
[docs]
def to_string(self):
# TODO: Unit test for dumping/restoring
return json.dumps(self.root_dict, ensure_ascii=False, sort_keys=False)
def __parse_test_inputs(i: int, test_inputs: Union[list, dict]) -> ToolSourceTestInputs:
inputs: list = test_inputs if isinstance(test_inputs, list) else []
if isinstance(test_inputs, dict):
for key, value in test_inputs.items():
inputs.append({"name": key, "value": value, "attributes": {}})
for input in inputs:
if is_dict(input["value"]) and "class" in input["value"] and input["value"]["class"] == "Collection":
collection_def = cast(JsonTestCollectionDefDict, input["value"])
attrib = input.setdefault("attributes", {})
attrib["collection"] = collection_def
return cast(ToolSourceTestInputs, inputs)
def _parse_test(i: int, test_dict: dict) -> ToolSourceTest:
test_dict["inputs"] = __parse_test_inputs(i, deepcopy(test_dict["inputs"]))
outputs = test_dict["outputs"]
new_outputs = []
if is_dict(outputs):
for key, value in outputs.items():
if is_dict(value):
attributes = value
file = attributes.get("file")
else:
file = value
attributes = {}
new_outputs.append({"name": key, "value": file, "attributes": attributes})
else:
for output in outputs:
name = output["name"]
value = output.get("file", None)
attributes = output
new_outputs.append({"name": name, "value": value, "attributes": attributes})
for output in new_outputs:
attributes = output["attributes"]
defaults = {
"compare": "diff",
"lines_diff": 0,
"delta": DEFAULT_DELTA,
"delta_frac": DEFAULT_DELTA_FRAC,
"sort": DEFAULT_SORT,
"decompress": DEFAULT_DECOMPRESS,
}
# TODO
attributes["extra_files"] = []
# TODO
attributes["metadata"] = {}
assert_list = __to_test_assert_list(attributes.get("asserts", []))
attributes["assert_list"] = assert_list
_ensure_has(attributes, defaults)
test_dict["outputs"] = new_outputs
# TODO: implement output collections for YAML tools.
test_dict["output_collections"] = []
test_dict["command"] = __to_test_assert_list(test_dict.get("command", []))
test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", []))
test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", []))
test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None)
test_dict["expect_failure"] = test_dict.get("expect_failure", False)
test_dict["expect_test_failure"] = test_dict.get("expect_test_failure", False)
test_dict["value_state_representation"] = "test_case_json"
return cast(ToolSourceTest, test_dict)
[docs]
def to_test_assert_list(assertions) -> AssertionList:
assertions = assertions or []
def expand_dict_form(item):
key, value = item
new_value = value.copy()
new_value["that"] = key
return new_value
if is_dict(assertions):
assertions = map(expand_dict_form, assertions.items())
assert_list: List[AssertionDict] = []
for assertion in assertions:
# TODO: not handling nested assertions correctly,
# not sure these are used though.
if "that" not in assertion:
new_assertion = {}
for assertion_key, assertion_value in assertion.items():
new_assertion["that"] = assertion_key
new_assertion.update(assertion_value)
assertion = new_assertion
children = assertion.pop("asserts", assertion.pop("children", []))
# if there are no nested assertions then children should be []
# but to_test_assert_list would return None
if children:
children = to_test_assert_list(children)
assert_dict: AssertionDict = dict(
tag=assertion["that"],
attributes=assertion,
children=children,
)
assert_list.append(assert_dict)
return assert_list or None # XML variant is None if no assertions made
# Planemo depends on this and was never updated unfortunately.
# https://github.com/galaxyproject/planemo/blob/master/planemo/test/_check_output.py
__to_test_assert_list = to_test_assert_list
[docs]
class YamlInputSource(InputSource):
[docs]
def __init__(self, input_dict, trusted: bool = True):
self.input_dict = input_dict
self.trusted = trusted
[docs]
def parse_input_type(self):
input_type = self.input_dict["type"]
if input_type == "repeat":
return "repeat"
elif input_type == "conditional":
return "conditional"
else:
return "param"
[docs]
def parse_extensions(self):
extensions = self.input_dict.get("extensions")
if not extensions:
extensions = self.get("format", "data").split(",")
return [ext.strip().lower() for ext in extensions]
[docs]
def parse_nested_inputs_source(self):
assert self.parse_input_type() == "repeat"
return YamlPageSource(self.input_dict["blocks"])
[docs]
def parse_test_input_source(self):
test_dict = self.input_dict.get("test_parameter", None)
assert test_dict is not None, "conditional must contain a `test_parameter` definition"
return YamlInputSource(test_dict)
[docs]
def parse_when_input_sources(self):
input_dict = self.input_dict
sources = []
if "when" in input_dict:
for key, value in input_dict["when"].items():
# casting to string because default value for BooleanToolParameter.legal_values is "true" / "false"
# Unfortunate, but I guess that's ok for now?
discriminator = "true" if key is True else "false" if key is False else key
case_page_source = YamlPageSource(value)
sources.append((discriminator, case_page_source))
else:
for value in input_dict.get("whens", []):
key = value.get("discriminator")
discriminator = "true" if key is True else "false" if key is False else key
case_page_source = YamlPageSource(value["parameters"])
sources.append((discriminator, case_page_source))
return sources
[docs]
def parse_validators(self) -> List[AnyValidatorModel]:
return parse_dict_validators(self.input_dict.get("validators", []), trusted=self.trusted)
[docs]
def parse_static_options(self) -> List[Tuple[str, str, bool]]:
static_options = []
input_dict = self.input_dict
for option in input_dict.get("options", {}):
value = option.get("value")
label = option.get("label", value)
selected = option.get("selected", False)
static_options.append((label, value, selected))
return static_options
[docs]
def parse_default(self) -> Optional[Dict[str, Any]]:
input_dict = self.input_dict
default_def = input_dict.get("default", None)
return default_def
def _ensure_has(dict, defaults):
for key, value in defaults.items():
if key not in dict:
dict[key] = value