Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.parser.yaml
import json
from collections.abc import MutableMapping
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
import packaging.version
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parser.util import (
DEFAULT_DECOMPRESS,
DEFAULT_DELTA,
DEFAULT_DELTA_FRAC,
DEFAULT_SORT,
)
from galaxy.tool_util_models.parameter_validators import AnyValidatorModel
from galaxy.tool_util_models.tool_source import (
HelpContent,
XrefDict,
)
from galaxy.util import listify
from .interface import (
AssertionDict,
AssertionList,
InputSource,
PageSource,
PagesSource,
ToolSource,
ToolSourceTest,
ToolSourceTests,
)
from .output_actions import ToolOutputActionApp
from .output_collection_def import dataset_collector_descriptions_from_output_dict
from .output_objects import (
ToolOutput,
ToolOutputCollection,
ToolOutputCollectionStructure,
)
from .parameter_validators import parse_dict_validators
from .stdio import error_on_exit_code
from .util import is_dict
[docs]
class YamlToolSource(ToolSource):
language = "yaml"
[docs]
def __init__(self, root_dict: Dict, source_path=None):
self.root_dict = root_dict
self._source_path = source_path
self._macro_paths: List[str] = []
@property
def source_path(self):
return self._source_path
[docs]
def parse_version(self) -> Optional[str]:
version_raw = self.root_dict.get("version")
return str(version_raw) if version_raw is not None else None
[docs]
def parse_name(self) -> str:
rval = self.root_dict.get("name") or self.parse_id()
assert rval
return str(rval)
[docs]
def parse_icon(self) -> Optional[str]:
icon_elem = self.root_dict.get("icon", {})
return icon_elem.get("src") if icon_elem is not None else None
[docs]
def parse_edam_operations(self) -> List[str]:
return self.root_dict.get("edam_operations") or []
[docs]
def parse_xrefs(self) -> List[XrefDict]:
xrefs = self.root_dict.get("xrefs") or []
return [XrefDict(value=xref["value"], type=xref["type"]) for xref in xrefs if xref["type"]]
[docs]
def parse_display_interface(self, default):
return self.root_dict.get("display_interface", default)
[docs]
def parse_base_command(self) -> Optional[List[str]]:
"""Return string containing script entrypoint."""
return listify(self.root_dict.get("base_command"))
[docs]
def parse_version_command(self):
return self.root_dict.get("runtime_version", {}).get("command", None)
[docs]
def parse_version_command_interpreter(self):
return self.root_dict.get("runtime_version", {}).get("interpreter", None)
[docs]
def parse_requirements_and_containers(self):
mixed_requirements = self.root_dict.get("requirements", [])
container = self.root_dict.get("container")
containers = self.root_dict.get("containers")
if container:
if isinstance(container, str):
container = {"identifier": container, "type": "docker", "explicit": True}
containers = [container]
elif containers:
containers = containers
else:
containers = []
return requirements.parse_requirements_from_lists(
software_requirements=[r for r in mixed_requirements if r.get("type") == "package"],
containers=containers,
resource_requirements=[r for r in mixed_requirements if r.get("type") == "resource"],
javascript_requirements=[r for r in mixed_requirements if r.get("type") == "javascript"],
)
[docs]
def parse_input_pages(self) -> PagesSource:
# All YAML tools have only one page (feature is deprecated)
page_source = YamlPageSource(self.root_dict.get("inputs", {}))
return PagesSource([page_source])
[docs]
def parse_help(self) -> Optional[HelpContent]:
help = self.root_dict.get("help")
format = "markdown"
if isinstance(help, dict):
format = help.get("format", "markdown")
if isinstance(help, str):
return HelpContent(format=format, content=help)
elif help and "content" in help:
return HelpContent(format=format, content=help["content"])
else:
return None
[docs]
def parse_outputs(self, app: Optional[ToolOutputActionApp]):
outputs = self.root_dict.get("outputs", [])
if isinstance(outputs, MutableMapping):
for name, output_dict in outputs.items():
output_dict["name"] = name
outputs = outputs.values()
output_defs = []
output_collection_defs = []
for output_dict in outputs:
output_type = output_dict.get("type", "data")
name = output_dict["name"]
if output_type == "data":
output_defs.append(self._parse_output(app, name, output_dict))
elif output_type == "collection":
output_collection_defs.append(self._parse_output_collection(app, name, output_dict))
else:
message = f"Unknown output_type [{output_type}] encountered."
raise Exception(message)
outputs = {}
for output in output_defs:
outputs[output.name] = output
output_collections = {}
for output in output_collection_defs:
output_collections[output.name] = output
return outputs, output_collections
def _parse_output(self, app, name, output_dict):
output = ToolOutput.from_dict(name, output_dict, app=app)
return output
def _parse_output_collection(self, tool, name, output_dict):
name = output_dict.get("name")
label = output_dict.get("label")
default_format = output_dict.get("format", "data")
collection_type = output_dict.get("type", None)
collection_type_source = output_dict.get("type_source", None)
structured_like = output_dict.get("structured_like", None)
inherit_format = False
inherit_metadata = False
if structured_like:
inherit_format = output_dict.get("inherit_format", None)
inherit_metadata = output_dict.get("inherit_metadata", None)
default_format_source = output_dict.get("format_source", None)
default_metadata_source = output_dict.get("metadata_source", None)
filters = []
dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict)
structure = ToolOutputCollectionStructure(
collection_type=collection_type,
collection_type_source=collection_type_source,
structured_like=structured_like,
dataset_collector_descriptions=dataset_collector_descriptions,
)
output_collection = ToolOutputCollection(
name,
structure,
label=label,
filters=filters,
default_format=default_format,
inherit_format=inherit_format,
inherit_metadata=inherit_metadata,
default_format_source=default_format_source,
default_metadata_source=default_metadata_source,
)
return output_collection
[docs]
def parse_tests_to_dict(self) -> ToolSourceTests:
tests: List[ToolSourceTest] = []
rval: ToolSourceTests = dict(tests=tests)
for i, test_dict in enumerate(self.root_dict.get("tests", [])):
tests.append(_parse_test(i, test_dict))
return rval
[docs]
def parse_python_template_version(self):
python_template_version = self.root_dict.get("python_template_version")
if python_template_version is not None:
python_template_version = packaging.version.Version(python_template_version)
return python_template_version
[docs]
def to_string(self):
# TODO: Unit test for dumping/restoring
return json.dumps(self.root_dict, ensure_ascii=False, sort_keys=False)
def _parse_test(i, test_dict) -> ToolSourceTest:
inputs = test_dict["inputs"]
if is_dict(inputs):
new_inputs = []
for key, value in inputs.items():
new_inputs.append({"name": key, "value": value, "attributes": {}})
test_dict["inputs"] = new_inputs
outputs = test_dict["outputs"]
new_outputs = []
if is_dict(outputs):
for key, value in outputs.items():
if is_dict(value):
attributes = value
file = attributes.get("file")
else:
file = value
attributes = {}
new_outputs.append({"name": key, "value": file, "attributes": attributes})
else:
for output in outputs:
name = output["name"]
value = output.get("file", None)
attributes = output
new_outputs.append({"name": name, "value": value, "attributes": attributes})
for output in new_outputs:
attributes = output["attributes"]
defaults = {
"compare": "diff",
"lines_diff": 0,
"delta": DEFAULT_DELTA,
"delta_frac": DEFAULT_DELTA_FRAC,
"sort": DEFAULT_SORT,
"decompress": DEFAULT_DECOMPRESS,
}
# TODO
attributes["extra_files"] = []
# TODO
attributes["metadata"] = {}
assert_list = __to_test_assert_list(attributes.get("asserts", []))
attributes["assert_list"] = assert_list
_ensure_has(attributes, defaults)
test_dict["outputs"] = new_outputs
# TODO: implement output collections for YAML tools.
test_dict["output_collections"] = []
test_dict["command"] = __to_test_assert_list(test_dict.get("command", []))
test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", []))
test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", []))
test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None)
test_dict["expect_failure"] = test_dict.get("expect_failure", False)
test_dict["expect_test_failure"] = test_dict.get("expect_test_failure", False)
return test_dict
[docs]
def to_test_assert_list(assertions) -> AssertionList:
def expand_dict_form(item):
key, value = item
new_value = value.copy()
new_value["that"] = key
return new_value
if is_dict(assertions):
assertions = map(expand_dict_form, assertions.items())
assert_list: List[AssertionDict] = []
for assertion in assertions:
# TODO: not handling nested assertions correctly,
# not sure these are used though.
if "that" not in assertion:
new_assertion = {}
for assertion_key, assertion_value in assertion.items():
new_assertion["that"] = assertion_key
new_assertion.update(assertion_value)
assertion = new_assertion
children = assertion.pop("asserts", assertion.pop("children", []))
# if there are no nested assertions then children should be []
# but to_test_assert_list would return None
if children:
children = to_test_assert_list(children)
assert_dict: AssertionDict = dict(
tag=assertion["that"],
attributes=assertion,
children=children,
)
assert_list.append(assert_dict)
return assert_list or None # XML variant is None if no assertions made
# Planemo depends on this and was never updated unfortunately.
# https://github.com/galaxyproject/planemo/blob/master/planemo/test/_check_output.py
__to_test_assert_list = to_test_assert_list
[docs]
class YamlInputSource(InputSource):
[docs]
def __init__(self, input_dict, trusted: bool = True):
self.input_dict = input_dict
self.trusted = trusted
[docs]
def parse_input_type(self):
input_type = self.input_dict["type"]
if input_type == "repeat":
return "repeat"
elif input_type == "conditional":
return "conditional"
else:
return "param"
[docs]
def parse_extensions(self):
extensions = self.input_dict.get("extensions")
if not extensions:
extensions = self.get("format", "data").split(",")
return [ext.strip().lower() for ext in extensions]
[docs]
def parse_nested_inputs_source(self):
assert self.parse_input_type() == "repeat"
return YamlPageSource(self.input_dict["blocks"])
[docs]
def parse_test_input_source(self):
test_dict = self.input_dict.get("test_parameter", None)
assert test_dict is not None, "conditional must contain a `test_parameter` definition"
return YamlInputSource(test_dict)
[docs]
def parse_when_input_sources(self):
input_dict = self.input_dict
sources = []
if "when" in input_dict:
for key, value in input_dict["when"].items():
# casting to string because default value for BooleanToolParameter.legal_values is "true" / "false"
# Unfortunate, but I guess that's ok for now?
discriminator = "true" if key is True else "false" if key is False else key
case_page_source = YamlPageSource(value)
sources.append((discriminator, case_page_source))
else:
for value in input_dict.get("whens", []):
key = value.get("discriminator")
discriminator = "true" if key is True else "false" if key is False else key
case_page_source = YamlPageSource(value["parameters"])
sources.append((discriminator, case_page_source))
return sources
[docs]
def parse_validators(self) -> List[AnyValidatorModel]:
return parse_dict_validators(self.input_dict.get("validators", []), trusted=self.trusted)
[docs]
def parse_static_options(self) -> List[Tuple[str, str, bool]]:
static_options = []
input_dict = self.input_dict
for option in input_dict.get("options", {}):
value = option.get("value")
label = option.get("label", value)
selected = option.get("selected", False)
static_options.append((label, value, selected))
return static_options
[docs]
def parse_default(self) -> Optional[Dict[str, Any]]:
input_dict = self.input_dict
default_def = input_dict.get("default", None)
return default_def
def _ensure_has(dict, defaults):
for key, value in defaults.items():
if key not in dict:
dict[key] = value