Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.parser.yaml

import json
from collections.abc import MutableMapping
from copy import deepcopy
from typing import (
    Any,
    cast,
    Dict,
    List,
    Optional,
    Tuple,
    Union,
)

import packaging.version

from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parameters.convert import _select_which_when
from galaxy.tool_util.parameters.factory import input_models_for_tool_source
from galaxy.tool_util.parameters.state import TestCaseJsonToolState
from galaxy.tool_util.parameters.visitor import validate_explicit_conditional_test_value
from galaxy.tool_util.parser.util import (
    DEFAULT_DECOMPRESS,
    DEFAULT_DELTA,
    DEFAULT_DELTA_FRAC,
    DEFAULT_SORT,
)
from galaxy.tool_util_models.parameter_validators import AnyValidatorModel
from galaxy.tool_util_models.parameters import (
    DiscriminatorType,
    ToolParameterBundle,
    ToolParameterBundleModel,
    ToolParameterT,
)
from galaxy.tool_util_models.tool_source import (
    HelpContent,
    JsonTestCollectionDefDict,
    XrefDict,
    YamlTemplateConfigFile,
)
from galaxy.util import listify
from .interface import (
    AssertionDict,
    AssertionList,
    InputSource,
    PageSource,
    PagesSource,
    ToolSource,
    ToolSourceTest,
    ToolSourceTestInputs,
    ToolSourceTests,
)
from .output_actions import ToolOutputActionApp
from .output_collection_def import dataset_collector_descriptions_from_output_dict
from .output_objects import (
    ToolOutput,
    ToolOutputCollection,
    ToolOutputCollectionStructure,
)
from .parameter_validators import parse_dict_validators
from .stdio import error_on_exit_code
from .util import is_dict


[docs] class YamlToolSource(ToolSource): language = "yaml"
[docs] def __init__(self, root_dict: Dict, source_path=None): self.root_dict = root_dict self._source_path = source_path self._macro_paths: List[str] = []
@property def source_path(self): return self._source_path
[docs] def parse_class(self): return self.root_dict.get("class")
[docs] def parse_tool_type(self): return self.root_dict.get("tool_type")
[docs] def parse_tool_module(self) -> Optional[Tuple[str, str]]: # This should not be settable for user defined tools - placing this here to # ensure this. If we want to implement tool modules for YAML tools in the future # ensure class is not GalaxyUserTool. return None
[docs] def parse_id(self): return self.root_dict.get("id")
[docs] def parse_version(self) -> Optional[str]: version_raw = self.root_dict.get("version") return str(version_raw) if version_raw is not None else None
[docs] def parse_name(self) -> str: rval = self.root_dict.get("name") or self.parse_id() assert rval return str(rval)
[docs] def parse_description(self) -> str: return self.root_dict.get("description") or ""
[docs] def parse_icon(self) -> Optional[str]: icon_elem = self.root_dict.get("icon", {}) return icon_elem.get("src") if icon_elem is not None else None
[docs] def parse_edam_operations(self) -> List[str]: return self.root_dict.get("edam_operations") or []
[docs] def parse_edam_topics(self) -> List[str]: return self.root_dict.get("edam_topics") or []
[docs] def parse_xrefs(self) -> List[XrefDict]: xrefs = self.root_dict.get("xrefs") or [] return [XrefDict(value=xref["value"], type=xref["type"]) for xref in xrefs if xref["type"]]
[docs] def parse_sanitize(self): return self.root_dict.get("sanitize", True)
[docs] def parse_display_interface(self, default): return self.root_dict.get("display_interface", default)
[docs] def parse_require_login(self, default): return self.root_dict.get("require_login", default)
[docs] def parse_command(self): return self.root_dict.get("command")
[docs] def parse_expression(self): return self.root_dict.get("expression")
[docs] def parse_shell_command(self) -> Optional[str]: return self.root_dict.get("shell_command")
[docs] def parse_base_command(self) -> Optional[List[str]]: """Return string containing script entrypoint.""" return listify(self.root_dict.get("base_command"))
[docs] def parse_arguments(self) -> Optional[List[str]]: return self.root_dict.get("arguments")
[docs] def parse_environment_variables(self): return []
[docs] def parse_template_configfiles(self): return [YamlTemplateConfigFile(**config) for config in self.root_dict.get("configfiles") or []]
[docs] def parse_interpreter(self): return self.root_dict.get("interpreter")
[docs] def parse_version_command(self): return self.root_dict.get("runtime_version", {}).get("command", None)
[docs] def parse_version_command_interpreter(self): return self.root_dict.get("runtime_version", {}).get("interpreter", None)
[docs] def parse_requirements(self): mixed_requirements = self.root_dict.get("requirements", []) container = self.root_dict.get("container") containers = self.root_dict.get("containers") if container: if isinstance(container, str): container = {"identifier": container, "type": "docker", "explicit": True} containers = [container] elif containers: containers = containers else: containers = [] return requirements.parse_requirements_from_lists( software_requirements=[r for r in mixed_requirements if r.get("type") == "package"], containers=containers, resource_requirements=[r for r in mixed_requirements if r.get("type") == "resource"], javascript_requirements=[r for r in mixed_requirements if r.get("type") == "javascript"], credentials=self.root_dict.get("credentials", []), )
[docs] def parse_input_pages(self) -> PagesSource: # All YAML tools have only one page (feature is deprecated) page_source = YamlPageSource(self.root_dict.get("inputs", {})) return PagesSource([page_source], "cwl")
[docs] def parse_strict_shell(self): # TODO: Add ability to disable this. return True
[docs] def parse_stdio(self): return error_on_exit_code()
[docs] def parse_help(self) -> Optional[HelpContent]: help = self.root_dict.get("help") format = "markdown" if isinstance(help, dict): format = help.get("format", "markdown") if isinstance(help, str): return HelpContent(format=format, content=help) elif help and "content" in help: return HelpContent(format=format, content=help["content"]) else: return None
[docs] def parse_outputs(self, app: Optional[ToolOutputActionApp]): outputs = deepcopy(self.root_dict.get("outputs", [])) if isinstance(outputs, MutableMapping): for name, output_dict in outputs.items(): output_dict["name"] = name outputs = outputs.values() output_defs = [] output_collection_defs = [] for output_dict in outputs: output_type = output_dict.get("type", "data") name = output_dict["name"] if output_type == "data": output_defs.append(self._parse_output(app, name, output_dict)) elif output_type == "collection": output_collection_defs.append(self._parse_output_collection(app, name, output_dict)) else: message = f"Unknown output_type [{output_type}] encountered." raise Exception(message) outputs = {} for output in output_defs: outputs[output.name] = output output_collections = {} for output in output_collection_defs: output_collections[output.name] = output outputs[output.name] = output return outputs, output_collections
def _parse_output(self, app, name, output_dict): output = ToolOutput.from_dict(name, output_dict, app=app) return output def _parse_output_collection(self, tool, name, output_dict): name = output_dict.get("name") label = output_dict.get("label") default_format = output_dict.get("format", "data") collection_type = output_dict.get("collection_type", None) collection_type_source = output_dict.get("type_source", None) structured_like = output_dict.get("structured_like", None) inherit_format = False inherit_metadata = False if structured_like: inherit_format = output_dict.get("inherit_format", None) inherit_metadata = output_dict.get("inherit_metadata", None) default_format_source = output_dict.get("format_source", None) default_metadata_source = output_dict.get("metadata_source", None) filters = [] dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict) structure = ToolOutputCollectionStructure( collection_type=collection_type, collection_type_source=collection_type_source, structured_like=structured_like, dataset_collector_descriptions=dataset_collector_descriptions, ) output_collection = ToolOutputCollection( name, structure, label=label, filters=filters, default_format=default_format, inherit_format=inherit_format, inherit_metadata=inherit_metadata, default_format_source=default_format_source, default_metadata_source=default_metadata_source, ) return output_collection
[docs] def parse_tests_to_dict(self) -> ToolSourceTests: tests: List[ToolSourceTest] = [] rval: ToolSourceTests = dict(tests=tests) raw_tests = deepcopy(self.root_dict.get("tests", [])) for i, test_dict in enumerate(raw_tests): inputs = test_dict.get("inputs", {}) state = TestCaseJsonToolState(inputs) parameters = self._parse_parameters() state.validate(parameters, name=f"test case json {i}") flat_inputs: Dict[str, Any] = {} self._flatten_parameters(inputs, parameters, flat_inputs=flat_inputs) test_dict["inputs"] = flat_inputs parsed_test = _parse_test(i, test_dict) tests.append(parsed_test) return rval
def _flatten_parameters( self, test_dict: Dict[str, Any], input_models: ToolParameterBundle, flat_inputs, prefix=None ): for parameter in input_models.parameters: self._flatten_parameter(test_dict, parameter, flat_inputs, prefix=prefix) def _flatten_parameter(self, test_dict: Dict[str, Any], parameter: ToolParameterT, flat_inputs, prefix=None): name = parameter.name if prefix: flat_name = f"{prefix}|{name}" else: flat_name = name if parameter.parameter_type == "gx_conditional": if name not in test_dict: test_dict[name] = {} raw_conditional_state = test_dict[name] assert isinstance(raw_conditional_state, dict) conditional_state = cast(Dict[str, Any], raw_conditional_state) test_parameter = parameter.test_parameter test_parameter_name = test_parameter.name explicit_test_value: Optional[DiscriminatorType] = ( conditional_state[test_parameter_name] if test_parameter_name in conditional_state else None ) test_value = validate_explicit_conditional_test_value(test_parameter_name, explicit_test_value) when = _select_which_when(parameter, test_value, conditional_state) self._flatten_parameter(conditional_state, test_parameter, flat_inputs, prefix=flat_name) self._flatten_parameters(conditional_state, when, flat_inputs, prefix=flat_name) elif parameter.parameter_type == "gx_repeat": if name not in test_dict: test_dict[name] = [] repeat_instances = cast(List[Dict[str, Any]], test_dict[name]) if parameter.min: while len(repeat_instances) < parameter.min: repeat_instances.append({}) for i, instance_state in enumerate(repeat_instances): if prefix: instance_prefix = f"{prefix}|{name}_{i}" else: instance_prefix = f"{name}_{i}" self._flatten_parameters(instance_state, parameter, flat_inputs, prefix=instance_prefix) else: if name in test_dict: flat_inputs[flat_name] = test_dict[name] def _parse_parameters(self) -> ToolParameterBundleModel: parameter_bundle = input_models_for_tool_source(self) return parameter_bundle
[docs] def parse_profile(self) -> str: return self.root_dict.get("profile") or "24.2"
[docs] def parse_license(self) -> Optional[str]: return self.root_dict.get("license")
[docs] def parse_interactivetool(self): return self.root_dict.get("entry_points", [])
[docs] def parse_python_template_version(self): python_template_version = self.root_dict.get("python_template_version") if python_template_version is not None: python_template_version = packaging.version.Version(python_template_version) return python_template_version
[docs] def to_string(self): # TODO: Unit test for dumping/restoring return json.dumps(self.root_dict, ensure_ascii=False, sort_keys=False)
def __parse_test_inputs(i: int, test_inputs: Union[list, dict]) -> ToolSourceTestInputs: inputs: list = test_inputs if isinstance(test_inputs, list) else [] if isinstance(test_inputs, dict): for key, value in test_inputs.items(): inputs.append({"name": key, "value": value, "attributes": {}}) for input in inputs: if is_dict(input["value"]) and "class" in input["value"] and input["value"]["class"] == "Collection": collection_def = cast(JsonTestCollectionDefDict, input["value"]) attrib = input.setdefault("attributes", {}) attrib["collection"] = collection_def return cast(ToolSourceTestInputs, inputs) def _parse_test(i: int, test_dict: dict) -> ToolSourceTest: test_dict["inputs"] = __parse_test_inputs(i, deepcopy(test_dict["inputs"])) outputs = test_dict["outputs"] new_outputs = [] if is_dict(outputs): for key, value in outputs.items(): if is_dict(value): attributes = value file = attributes.get("file") else: file = value attributes = {} new_outputs.append({"name": key, "value": file, "attributes": attributes}) else: for output in outputs: name = output["name"] value = output.get("file", None) attributes = output new_outputs.append({"name": name, "value": value, "attributes": attributes}) for output in new_outputs: attributes = output["attributes"] defaults = { "compare": "diff", "lines_diff": 0, "delta": DEFAULT_DELTA, "delta_frac": DEFAULT_DELTA_FRAC, "sort": DEFAULT_SORT, "decompress": DEFAULT_DECOMPRESS, } # TODO attributes["extra_files"] = [] # TODO attributes["metadata"] = {} assert_list = __to_test_assert_list(attributes.get("asserts", [])) attributes["assert_list"] = assert_list _ensure_has(attributes, defaults) test_dict["outputs"] = new_outputs # TODO: implement output collections for YAML tools. test_dict["output_collections"] = [] test_dict["command"] = __to_test_assert_list(test_dict.get("command", [])) test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", [])) test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", [])) test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None) test_dict["expect_failure"] = test_dict.get("expect_failure", False) test_dict["expect_test_failure"] = test_dict.get("expect_test_failure", False) test_dict["value_state_representation"] = "test_case_json" return cast(ToolSourceTest, test_dict)
[docs] def to_test_assert_list(assertions) -> AssertionList: assertions = assertions or [] def expand_dict_form(item): key, value = item new_value = value.copy() new_value["that"] = key return new_value if is_dict(assertions): assertions = map(expand_dict_form, assertions.items()) assert_list: List[AssertionDict] = [] for assertion in assertions: # TODO: not handling nested assertions correctly, # not sure these are used though. if "that" not in assertion: new_assertion = {} for assertion_key, assertion_value in assertion.items(): new_assertion["that"] = assertion_key new_assertion.update(assertion_value) assertion = new_assertion children = assertion.pop("asserts", assertion.pop("children", [])) # if there are no nested assertions then children should be [] # but to_test_assert_list would return None if children: children = to_test_assert_list(children) assert_dict: AssertionDict = dict( tag=assertion["that"], attributes=assertion, children=children, ) assert_list.append(assert_dict) return assert_list or None # XML variant is None if no assertions made
# Planemo depends on this and was never updated unfortunately. # https://github.com/galaxyproject/planemo/blob/master/planemo/test/_check_output.py __to_test_assert_list = to_test_assert_list
[docs] class YamlPageSource(PageSource):
[docs] def __init__(self, inputs_list): self.inputs_list = inputs_list
[docs] def parse_input_sources(self): return list(map(YamlInputSource, self.inputs_list))
[docs] class YamlInputSource(InputSource):
[docs] def __init__(self, input_dict, trusted: bool = True): self.input_dict = input_dict self.trusted = trusted
[docs] def get(self, key, default=None): return self.input_dict.get(key, default)
[docs] def get_bool(self, key, default): return self.input_dict.get(key, default)
[docs] def get_bool_or_none(self, key, default): return self.input_dict.get(key, default)
[docs] def parse_input_type(self): input_type = self.input_dict["type"] if input_type == "repeat": return "repeat" elif input_type == "conditional": return "conditional" else: return "param"
[docs] def parse_extensions(self): extensions = self.input_dict.get("extensions") if not extensions: extensions = self.get("format", "data").split(",") return [ext.strip().lower() for ext in extensions]
[docs] def parse_nested_inputs_source(self): assert self.parse_input_type() == "repeat" return YamlPageSource(self.input_dict["blocks"])
[docs] def parse_test_input_source(self): test_dict = self.input_dict.get("test_parameter", None) assert test_dict is not None, "conditional must contain a `test_parameter` definition" return YamlInputSource(test_dict)
[docs] def parse_when_input_sources(self): input_dict = self.input_dict sources = [] if "when" in input_dict: for key, value in input_dict["when"].items(): # casting to string because default value for BooleanToolParameter.legal_values is "true" / "false" # Unfortunate, but I guess that's ok for now? discriminator = "true" if key is True else "false" if key is False else key case_page_source = YamlPageSource(value) sources.append((discriminator, case_page_source)) else: for value in input_dict.get("whens", []): key = value.get("discriminator") discriminator = "true" if key is True else "false" if key is False else key case_page_source = YamlPageSource(value["parameters"]) sources.append((discriminator, case_page_source)) return sources
[docs] def parse_validators(self) -> List[AnyValidatorModel]: return parse_dict_validators(self.input_dict.get("validators", []), trusted=self.trusted)
[docs] def parse_static_options(self) -> List[Tuple[str, str, bool]]: static_options = [] input_dict = self.input_dict for option in input_dict.get("options", {}): value = option.get("value") label = option.get("label", value) selected = option.get("selected", False) static_options.append((label, value, selected)) return static_options
[docs] def parse_default(self) -> Optional[Dict[str, Any]]: input_dict = self.input_dict default_def = input_dict.get("default", None) return default_def
def _ensure_has(dict, defaults): for key, value in defaults.items(): if key not in dict: dict[key] = value