Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.parser.yaml
from collections import OrderedDict
import packaging.version
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parser.util import (
DEFAULT_DELTA,
DEFAULT_DELTA_FRAC
)
from .interface import (
InputSource,
PageSource,
PagesSource,
ToolSource,
)
from .output_collection_def import dataset_collector_descriptions_from_output_dict
from .output_objects import (
ToolOutput,
ToolOutputCollection,
ToolOutputCollectionStructure,
)
from .stdio import error_on_exit_code
from .util import is_dict
[docs]class YamlToolSource(ToolSource):
[docs] def __init__(self, root_dict, source_path=None):
self.root_dict = root_dict
self._source_path = source_path
self._macro_paths = []
@property
def source_path(self):
return self._source_path
[docs] def parse_xrefs(self):
xrefs = self.root_dict.get("xrefs", [])
return [dict(value=xref["value"], reftype=xref["type"]) for xref in xrefs if xref["type"]]
[docs] def parse_is_multi_byte(self):
return self.root_dict.get("is_multi_byte", self.default_is_multi_byte)
[docs] def parse_display_interface(self, default):
return self.root_dict.get('display_interface', default)
[docs] def parse_version_command(self):
return self.root_dict.get("runtime_version", {}).get("command", None)
[docs] def parse_version_command_interpreter(self):
return self.root_dict.get("runtime_version", {}).get("interpreter", None)
[docs] def parse_requirements_and_containers(self):
return requirements.parse_requirements_from_dict(self.root_dict)
[docs] def parse_input_pages(self):
# All YAML tools have only one page (feature is deprecated)
page_source = YamlPageSource(self.root_dict.get("inputs", {}))
return PagesSource([page_source])
[docs] def parse_outputs(self, tool):
outputs = self.root_dict.get("outputs", {})
output_defs = []
output_collection_defs = []
for name, output_dict in outputs.items():
output_type = output_dict.get("type", "data")
if output_type == "data":
output_defs.append(self._parse_output(tool, name, output_dict))
elif output_type == "collection":
output_collection_defs.append(self._parse_output_collection(tool, name, output_dict))
else:
message = "Unknown output_type [%s] encountered." % output_type
raise Exception(message)
outputs = OrderedDict()
for output in output_defs:
outputs[output.name] = output
output_collections = OrderedDict()
for output in output_collection_defs:
output_collections[output.name] = output
return outputs, output_collections
def _parse_output(self, tool, name, output_dict):
output = ToolOutput.from_dict(name, output_dict, tool=tool)
return output
def _parse_output_collection(self, tool, name, output_dict):
name = output_dict.get("name")
label = output_dict.get("label")
default_format = output_dict.get("format", "data")
collection_type = output_dict.get("type", None)
collection_type_source = output_dict.get("type_source", None)
structured_like = output_dict.get("structured_like", None)
inherit_format = False
inherit_metadata = False
if structured_like:
inherit_format = output_dict.get("inherit_format", None)
inherit_metadata = output_dict.get("inherit_metadata", None)
default_format_source = output_dict.get("format_source", None)
default_metadata_source = output_dict.get("metadata_source", "")
filters = []
dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict)
structure = ToolOutputCollectionStructure(
collection_type=collection_type,
collection_type_source=collection_type_source,
structured_like=structured_like,
dataset_collector_descriptions=dataset_collector_descriptions,
)
output_collection = ToolOutputCollection(
name,
structure,
label=label,
filters=filters,
default_format=default_format,
inherit_format=inherit_format,
inherit_metadata=inherit_metadata,
default_format_source=default_format_source,
default_metadata_source=default_metadata_source,
)
return output_collection
[docs] def parse_tests_to_dict(self):
tests = []
rval = dict(
tests=tests
)
for i, test_dict in enumerate(self.root_dict.get("tests", [])):
tests.append(_parse_test(i, test_dict))
return rval
[docs] def parse_python_template_version(self):
python_template_version = self.root_dict.get("python_template_version", None)
if python_template_version is not None:
python_template_version = packaging.version.parse(python_template_version)
return python_template_version
def _parse_test(i, test_dict):
inputs = test_dict["inputs"]
if is_dict(inputs):
new_inputs = []
for key, value in inputs.items():
new_inputs.append({"name": key, "value": value, "attributes": {}})
test_dict["inputs"] = new_inputs
outputs = test_dict["outputs"]
new_outputs = []
if is_dict(outputs):
for key, value in outputs.items():
if is_dict(value):
attributes = value
file = attributes.get("file")
else:
file = value
attributes = {}
new_outputs.append({
"name": key,
"value": file,
"attributes": attributes
})
else:
for output in outputs:
name = output["name"]
value = output.get("file", None)
attributes = output
new_outputs.append((name, value, attributes))
for output in new_outputs:
attributes = output["attributes"]
defaults = {
'compare': 'diff',
'lines_diff': 0,
'delta': DEFAULT_DELTA,
'delta_frac': DEFAULT_DELTA_FRAC,
'sort': False,
}
# TODO
attributes["extra_files"] = []
# TODO
attributes["metadata"] = {}
# TODO
assert_list = []
assert_list = __to_test_assert_list(attributes.get("asserts", []))
attributes["assert_list"] = assert_list
_ensure_has(attributes, defaults)
test_dict["outputs"] = new_outputs
# TODO: implement output collections for YAML tools.
test_dict["output_collections"] = []
test_dict["command"] = __to_test_assert_list(test_dict.get("command", []))
test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", []))
test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", []))
test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None)
test_dict["expect_failure"] = test_dict.get("expect_exit_code", False)
return test_dict
def __to_test_assert_list(assertions):
def expand_dict_form(item):
key, value = item
new_value = value.copy()
new_value["that"] = key
return new_value
if is_dict(assertions):
assertions = map(expand_dict_form, assertions.items())
assert_list = []
for assertion in assertions:
# TODO: not handling nested assertions correctly,
# not sure these are used though.
children = []
if "children" in assertion:
children = assertion["children"]
del assertion["children"]
assert_dict = dict(
tag=assertion["that"],
attributes=assertion,
children=children,
)
assert_list.append(assert_dict)
return assert_list or None # XML variant is None if no assertions made
[docs]class YamlInputSource(InputSource):
[docs] def parse_input_type(self):
input_type = self.input_dict["type"]
if input_type == "repeat":
return "repeat"
elif input_type == "conditional":
return "conditional"
else:
return "param"
[docs] def parse_nested_inputs_source(self):
assert self.parse_input_type() == "repeat"
return YamlPageSource(self.input_dict["blocks"])
[docs] def parse_test_input_source(self):
test_dict = self.input_dict.get("test", None)
assert test_dict is not None, "conditional must contain a `test` definition"
return YamlInputSource(test_dict)
[docs] def parse_when_input_sources(self):
input_dict = self.input_dict
sources = []
for value, block in input_dict.get("when", {}).items():
if value is True:
value = "true"
elif value is False:
value = "false"
else:
value = str(value)
# str here to lose type information like XML, needed?
if not isinstance(block, list):
block = [block]
case_page_source = YamlPageSource(block)
sources.append((value, case_page_source))
return sources
[docs] def parse_static_options(self):
static_options = list()
input_dict = self.input_dict
for index, option in enumerate(input_dict.get("options", {})):
value = option.get("value")
label = option.get("label", value)
selected = option.get("selected", False)
static_options.append((label, value, selected))
return static_options
def _ensure_has(dict, defaults):
for key, value in defaults.items():
if key not in dict:
dict[key] = value