Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.parser.yaml
import json
from collections.abc import MutableMapping
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
)
import packaging.version
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parser.util import (
    DEFAULT_DECOMPRESS,
    DEFAULT_DELTA,
    DEFAULT_DELTA_FRAC,
    DEFAULT_SORT,
)
from galaxy.tool_util_models.parameter_validators import AnyValidatorModel
from galaxy.tool_util_models.tool_source import (
    HelpContent,
    XrefDict,
    YamlTemplateConfigFile,
)
from galaxy.util import listify
from .interface import (
    AssertionDict,
    AssertionList,
    InputSource,
    PageSource,
    PagesSource,
    ToolSource,
    ToolSourceTest,
    ToolSourceTests,
)
from .output_actions import ToolOutputActionApp
from .output_collection_def import dataset_collector_descriptions_from_output_dict
from .output_objects import (
    ToolOutput,
    ToolOutputCollection,
    ToolOutputCollectionStructure,
)
from .parameter_validators import parse_dict_validators
from .stdio import error_on_exit_code
from .util import is_dict
[docs]
class YamlToolSource(ToolSource):
    language = "yaml"
[docs]
    def __init__(self, root_dict: Dict, source_path=None):
        self.root_dict = root_dict
        self._source_path = source_path
        self._macro_paths: List[str] = []
    @property
    def source_path(self):
        return self._source_path
[docs]
    def parse_version(self) -> Optional[str]:
        version_raw = self.root_dict.get("version")
        return str(version_raw) if version_raw is not None else None
[docs]
    def parse_name(self) -> str:
        rval = self.root_dict.get("name") or self.parse_id()
        assert rval
        return str(rval)
[docs]
    def parse_icon(self) -> Optional[str]:
        icon_elem = self.root_dict.get("icon", {})
        return icon_elem.get("src") if icon_elem is not None else None
[docs]
    def parse_edam_operations(self) -> List[str]:
        return self.root_dict.get("edam_operations") or []
[docs]
    def parse_xrefs(self) -> List[XrefDict]:
        xrefs = self.root_dict.get("xrefs") or []
        return [XrefDict(value=xref["value"], type=xref["type"]) for xref in xrefs if xref["type"]]
[docs]
    def parse_display_interface(self, default):
        return self.root_dict.get("display_interface", default)
[docs]
    def parse_base_command(self) -> Optional[List[str]]:
        """Return string containing script entrypoint."""
        return listify(self.root_dict.get("base_command"))
[docs]
    def parse_template_configfiles(self):
        return [YamlTemplateConfigFile(**config) for config in self.root_dict.get("configfiles") or []]
[docs]
    def parse_version_command(self):
        return self.root_dict.get("runtime_version", {}).get("command", None)
[docs]
    def parse_version_command_interpreter(self):
        return self.root_dict.get("runtime_version", {}).get("interpreter", None)
[docs]
    def parse_requirements(self):
        mixed_requirements = self.root_dict.get("requirements", [])
        container = self.root_dict.get("container")
        containers = self.root_dict.get("containers")
        if container:
            if isinstance(container, str):
                container = {"identifier": container, "type": "docker", "explicit": True}
            containers = [container]
        elif containers:
            containers = containers
        else:
            containers = []
        return requirements.parse_requirements_from_lists(
            software_requirements=[r for r in mixed_requirements if r.get("type") == "package"],
            containers=containers,
            resource_requirements=[r for r in mixed_requirements if r.get("type") == "resource"],
            javascript_requirements=[r for r in mixed_requirements if r.get("type") == "javascript"],
            credentials=self.root_dict.get("credentials", []),
        )
[docs]
    def parse_input_pages(self) -> PagesSource:
        # All YAML tools have only one page (feature is deprecated)
        page_source = YamlPageSource(self.root_dict.get("inputs", {}))
        return PagesSource([page_source])
[docs]
    def parse_help(self) -> Optional[HelpContent]:
        help = self.root_dict.get("help")
        format = "markdown"
        if isinstance(help, dict):
            format = help.get("format", "markdown")
        if isinstance(help, str):
            return HelpContent(format=format, content=help)
        elif help and "content" in help:
            return HelpContent(format=format, content=help["content"])
        else:
            return None
[docs]
    def parse_outputs(self, app: Optional[ToolOutputActionApp]):
        outputs = self.root_dict.get("outputs", [])
        if isinstance(outputs, MutableMapping):
            for name, output_dict in outputs.items():
                output_dict["name"] = name
            outputs = outputs.values()
        output_defs = []
        output_collection_defs = []
        for output_dict in outputs:
            output_type = output_dict.get("type", "data")
            name = output_dict["name"]
            if output_type == "data":
                output_defs.append(self._parse_output(app, name, output_dict))
            elif output_type == "collection":
                output_collection_defs.append(self._parse_output_collection(app, name, output_dict))
            else:
                message = f"Unknown output_type [{output_type}] encountered."
                raise Exception(message)
        outputs = {}
        for output in output_defs:
            outputs[output.name] = output
        output_collections = {}
        for output in output_collection_defs:
            output_collections[output.name] = output
        return outputs, output_collections
    def _parse_output(self, app, name, output_dict):
        output = ToolOutput.from_dict(name, output_dict, app=app)
        return output
    def _parse_output_collection(self, tool, name, output_dict):
        name = output_dict.get("name")
        label = output_dict.get("label")
        default_format = output_dict.get("format", "data")
        collection_type = output_dict.get("type", None)
        collection_type_source = output_dict.get("type_source", None)
        structured_like = output_dict.get("structured_like", None)
        inherit_format = False
        inherit_metadata = False
        if structured_like:
            inherit_format = output_dict.get("inherit_format", None)
            inherit_metadata = output_dict.get("inherit_metadata", None)
        default_format_source = output_dict.get("format_source", None)
        default_metadata_source = output_dict.get("metadata_source", None)
        filters = []
        dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict)
        structure = ToolOutputCollectionStructure(
            collection_type=collection_type,
            collection_type_source=collection_type_source,
            structured_like=structured_like,
            dataset_collector_descriptions=dataset_collector_descriptions,
        )
        output_collection = ToolOutputCollection(
            name,
            structure,
            label=label,
            filters=filters,
            default_format=default_format,
            inherit_format=inherit_format,
            inherit_metadata=inherit_metadata,
            default_format_source=default_format_source,
            default_metadata_source=default_metadata_source,
        )
        return output_collection
[docs]
    def parse_tests_to_dict(self) -> ToolSourceTests:
        tests: List[ToolSourceTest] = []
        rval: ToolSourceTests = dict(tests=tests)
        for i, test_dict in enumerate(self.root_dict.get("tests", [])):
            tests.append(_parse_test(i, test_dict))
        return rval
[docs]
    def parse_python_template_version(self):
        python_template_version = self.root_dict.get("python_template_version")
        if python_template_version is not None:
            python_template_version = packaging.version.Version(python_template_version)
        return python_template_version
[docs]
    def to_string(self):
        # TODO: Unit test for dumping/restoring
        return json.dumps(self.root_dict, ensure_ascii=False, sort_keys=False)
def _parse_test(i, test_dict) -> ToolSourceTest:
    inputs = test_dict["inputs"]
    if is_dict(inputs):
        new_inputs = []
        for key, value in inputs.items():
            new_inputs.append({"name": key, "value": value, "attributes": {}})
        test_dict["inputs"] = new_inputs
    outputs = test_dict["outputs"]
    new_outputs = []
    if is_dict(outputs):
        for key, value in outputs.items():
            if is_dict(value):
                attributes = value
                file = attributes.get("file")
            else:
                file = value
                attributes = {}
            new_outputs.append({"name": key, "value": file, "attributes": attributes})
    else:
        for output in outputs:
            name = output["name"]
            value = output.get("file", None)
            attributes = output
            new_outputs.append({"name": name, "value": value, "attributes": attributes})
    for output in new_outputs:
        attributes = output["attributes"]
        defaults = {
            "compare": "diff",
            "lines_diff": 0,
            "delta": DEFAULT_DELTA,
            "delta_frac": DEFAULT_DELTA_FRAC,
            "sort": DEFAULT_SORT,
            "decompress": DEFAULT_DECOMPRESS,
        }
        # TODO
        attributes["extra_files"] = []
        # TODO
        attributes["metadata"] = {}
        assert_list = __to_test_assert_list(attributes.get("asserts", []))
        attributes["assert_list"] = assert_list
        _ensure_has(attributes, defaults)
    test_dict["outputs"] = new_outputs
    # TODO: implement output collections for YAML tools.
    test_dict["output_collections"] = []
    test_dict["command"] = __to_test_assert_list(test_dict.get("command", []))
    test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", []))
    test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", []))
    test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None)
    test_dict["expect_failure"] = test_dict.get("expect_failure", False)
    test_dict["expect_test_failure"] = test_dict.get("expect_test_failure", False)
    return test_dict
[docs]
def to_test_assert_list(assertions) -> AssertionList:
    assertions = assertions or []
    def expand_dict_form(item):
        key, value = item
        new_value = value.copy()
        new_value["that"] = key
        return new_value
    if is_dict(assertions):
        assertions = map(expand_dict_form, assertions.items())
    assert_list: List[AssertionDict] = []
    for assertion in assertions:
        # TODO: not handling nested assertions correctly,
        # not sure these are used though.
        if "that" not in assertion:
            new_assertion = {}
            for assertion_key, assertion_value in assertion.items():
                new_assertion["that"] = assertion_key
                new_assertion.update(assertion_value)
            assertion = new_assertion
        children = assertion.pop("asserts", assertion.pop("children", []))
        # if there are no nested assertions then children should be []
        # but to_test_assert_list would return None
        if children:
            children = to_test_assert_list(children)
        assert_dict: AssertionDict = dict(
            tag=assertion["that"],
            attributes=assertion,
            children=children,
        )
        assert_list.append(assert_dict)
    return assert_list or None  # XML variant is None if no assertions made
# Planemo depends on this and was never updated unfortunately.
# https://github.com/galaxyproject/planemo/blob/master/planemo/test/_check_output.py
__to_test_assert_list = to_test_assert_list
[docs]
class YamlInputSource(InputSource):
[docs]
    def __init__(self, input_dict, trusted: bool = True):
        self.input_dict = input_dict
        self.trusted = trusted
[docs]
    def parse_input_type(self):
        input_type = self.input_dict["type"]
        if input_type == "repeat":
            return "repeat"
        elif input_type == "conditional":
            return "conditional"
        else:
            return "param"
[docs]
    def parse_extensions(self):
        extensions = self.input_dict.get("extensions")
        if not extensions:
            extensions = self.get("format", "data").split(",")
        return [ext.strip().lower() for ext in extensions]
[docs]
    def parse_nested_inputs_source(self):
        assert self.parse_input_type() == "repeat"
        return YamlPageSource(self.input_dict["blocks"])
[docs]
    def parse_test_input_source(self):
        test_dict = self.input_dict.get("test_parameter", None)
        assert test_dict is not None, "conditional must contain a `test_parameter` definition"
        return YamlInputSource(test_dict)
[docs]
    def parse_when_input_sources(self):
        input_dict = self.input_dict
        sources = []
        if "when" in input_dict:
            for key, value in input_dict["when"].items():
                # casting to string because default value for BooleanToolParameter.legal_values is "true" / "false"
                # Unfortunate, but I guess that's ok for now?
                discriminator = "true" if key is True else "false" if key is False else key
                case_page_source = YamlPageSource(value)
                sources.append((discriminator, case_page_source))
        else:
            for value in input_dict.get("whens", []):
                key = value.get("discriminator")
                discriminator = "true" if key is True else "false" if key is False else key
                case_page_source = YamlPageSource(value["parameters"])
                sources.append((discriminator, case_page_source))
        return sources
[docs]
    def parse_validators(self) -> List[AnyValidatorModel]:
        return parse_dict_validators(self.input_dict.get("validators", []), trusted=self.trusted)
[docs]
    def parse_static_options(self) -> List[Tuple[str, str, bool]]:
        static_options = []
        input_dict = self.input_dict
        for option in input_dict.get("options", {}):
            value = option.get("value")
            label = option.get("label", value)
            selected = option.get("selected", False)
            static_options.append((label, value, selected))
        return static_options
[docs]
    def parse_default(self) -> Optional[Dict[str, Any]]:
        input_dict = self.input_dict
        default_def = input_dict.get("default", None)
        return default_def
def _ensure_has(dict, defaults):
    for key, value in defaults.items():
        if key not in dict:
            dict[key] = value