Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.parser.yaml

import json

import packaging.version

from galaxy.tool_util.deps import requirements
from galaxy.tool_util.parser.util import (
    DEFAULT_DELTA,
    DEFAULT_DELTA_FRAC,
)
from .interface import (
    InputSource,
    PageSource,
    PagesSource,
    ToolSource,
)
from .output_collection_def import dataset_collector_descriptions_from_output_dict
from .output_objects import (
    ToolOutput,
    ToolOutputCollection,
    ToolOutputCollectionStructure,
)
from .stdio import error_on_exit_code
from .util import is_dict


[docs]class YamlToolSource(ToolSource): language = "yaml"
[docs] def __init__(self, root_dict, source_path=None): self.root_dict = root_dict self._source_path = source_path self._macro_paths = []
@property def source_path(self): return self._source_path
[docs] def parse_tool_type(self): return self.root_dict.get("tool_type")
[docs] def parse_id(self): return self.root_dict.get("id")
[docs] def parse_version(self): return str(self.root_dict.get("version"))
[docs] def parse_name(self): return self.root_dict.get("name")
[docs] def parse_description(self): return self.root_dict.get("description", "")
[docs] def parse_edam_operations(self): return self.root_dict.get("edam_operations", [])
[docs] def parse_edam_topics(self): return self.root_dict.get("edam_topics", [])
[docs] def parse_xrefs(self): xrefs = self.root_dict.get("xrefs", []) return [dict(value=xref["value"], reftype=xref["type"]) for xref in xrefs if xref["type"]]
[docs] def parse_sanitize(self): return self.root_dict.get("sanitize", True)
[docs] def parse_display_interface(self, default): return self.root_dict.get("display_interface", default)
[docs] def parse_require_login(self, default): return self.root_dict.get("require_login", default)
[docs] def parse_command(self): return self.root_dict.get("command")
[docs] def parse_expression(self): return self.root_dict.get("expression")
[docs] def parse_environment_variables(self): return []
[docs] def parse_interpreter(self): return self.root_dict.get("interpreter")
[docs] def parse_version_command(self): return self.root_dict.get("runtime_version", {}).get("command", None)
[docs] def parse_version_command_interpreter(self): return self.root_dict.get("runtime_version", {}).get("interpreter", None)
[docs] def parse_requirements_and_containers(self): return requirements.parse_requirements_from_dict(self.root_dict)
[docs] def parse_input_pages(self): # All YAML tools have only one page (feature is deprecated) page_source = YamlPageSource(self.root_dict.get("inputs", {})) return PagesSource([page_source])
[docs] def parse_strict_shell(self): # TODO: Add ability to disable this. return True
[docs] def parse_stdio(self): return error_on_exit_code()
[docs] def parse_help(self): return self.root_dict.get("help", None)
[docs] def parse_outputs(self, tool): outputs = self.root_dict.get("outputs", {}) output_defs = [] output_collection_defs = [] for name, output_dict in outputs.items(): output_type = output_dict.get("type", "data") if output_type == "data": output_defs.append(self._parse_output(tool, name, output_dict)) elif output_type == "collection": output_collection_defs.append(self._parse_output_collection(tool, name, output_dict)) else: message = f"Unknown output_type [{output_type}] encountered." raise Exception(message) outputs = {} for output in output_defs: outputs[output.name] = output output_collections = {} for output in output_collection_defs: output_collections[output.name] = output return outputs, output_collections
def _parse_output(self, tool, name, output_dict): output = ToolOutput.from_dict(name, output_dict, tool=tool) return output def _parse_output_collection(self, tool, name, output_dict): name = output_dict.get("name") label = output_dict.get("label") default_format = output_dict.get("format", "data") collection_type = output_dict.get("type", None) collection_type_source = output_dict.get("type_source", None) structured_like = output_dict.get("structured_like", None) inherit_format = False inherit_metadata = False if structured_like: inherit_format = output_dict.get("inherit_format", None) inherit_metadata = output_dict.get("inherit_metadata", None) default_format_source = output_dict.get("format_source", None) default_metadata_source = output_dict.get("metadata_source", "") filters = [] dataset_collector_descriptions = dataset_collector_descriptions_from_output_dict(output_dict) structure = ToolOutputCollectionStructure( collection_type=collection_type, collection_type_source=collection_type_source, structured_like=structured_like, dataset_collector_descriptions=dataset_collector_descriptions, ) output_collection = ToolOutputCollection( name, structure, label=label, filters=filters, default_format=default_format, inherit_format=inherit_format, inherit_metadata=inherit_metadata, default_format_source=default_format_source, default_metadata_source=default_metadata_source, ) return output_collection
[docs] def parse_tests_to_dict(self): tests = [] rval = dict(tests=tests) for i, test_dict in enumerate(self.root_dict.get("tests", [])): tests.append(_parse_test(i, test_dict)) return rval
[docs] def parse_profile(self): return self.root_dict.get("profile", "16.04")
[docs] def parse_license(self): return self.root_dict.get("license")
[docs] def parse_interactivetool(self): return self.root_dict.get("entry_points", [])
[docs] def parse_python_template_version(self): python_template_version = self.root_dict.get("python_template_version", None) if python_template_version is not None: python_template_version = packaging.version.parse(python_template_version) return python_template_version
[docs] def to_string(self): # TODO: Unit test for dumping/restoring return json.dumps(self.root_dict)
def _parse_test(i, test_dict): inputs = test_dict["inputs"] if is_dict(inputs): new_inputs = [] for key, value in inputs.items(): new_inputs.append({"name": key, "value": value, "attributes": {}}) test_dict["inputs"] = new_inputs outputs = test_dict["outputs"] new_outputs = [] if is_dict(outputs): for key, value in outputs.items(): if is_dict(value): attributes = value file = attributes.get("file") else: file = value attributes = {} new_outputs.append({"name": key, "value": file, "attributes": attributes}) else: for output in outputs: name = output["name"] value = output.get("file", None) attributes = output new_outputs.append((name, value, attributes)) for output in new_outputs: attributes = output["attributes"] defaults = { "compare": "diff", "lines_diff": 0, "delta": DEFAULT_DELTA, "delta_frac": DEFAULT_DELTA_FRAC, "sort": False, } # TODO attributes["extra_files"] = [] # TODO attributes["metadata"] = {} # TODO assert_list = [] assert_list = __to_test_assert_list(attributes.get("asserts", [])) attributes["assert_list"] = assert_list _ensure_has(attributes, defaults) test_dict["outputs"] = new_outputs # TODO: implement output collections for YAML tools. test_dict["output_collections"] = [] test_dict["command"] = __to_test_assert_list(test_dict.get("command", [])) test_dict["stdout"] = __to_test_assert_list(test_dict.get("stdout", [])) test_dict["stderr"] = __to_test_assert_list(test_dict.get("stderr", [])) test_dict["expect_exit_code"] = test_dict.get("expect_exit_code", None) test_dict["expect_failure"] = test_dict.get("expect_failure", False) test_dict["expect_test_failure"] = test_dict.get("expect_test_failure", False) return test_dict def __to_test_assert_list(assertions): def expand_dict_form(item): key, value = item new_value = value.copy() new_value["that"] = key return new_value if is_dict(assertions): assertions = map(expand_dict_form, assertions.items()) assert_list = [] for assertion in assertions: # TODO: not handling nested assertions correctly, # not sure these are used though. children = [] if "children" in assertion: children = assertion["children"] del assertion["children"] assert_dict = dict( tag=assertion["that"], attributes=assertion, children=children, ) assert_list.append(assert_dict) return assert_list or None # XML variant is None if no assertions made
[docs]class YamlPageSource(PageSource):
[docs] def __init__(self, inputs_list): self.inputs_list = inputs_list
[docs] def parse_input_sources(self): return list(map(YamlInputSource, self.inputs_list))
[docs]class YamlInputSource(InputSource):
[docs] def __init__(self, input_dict): self.input_dict = input_dict
[docs] def get(self, key, default=None): return self.input_dict.get(key, default)
[docs] def get_bool(self, key, default): return self.input_dict.get(key, default)
[docs] def parse_input_type(self): input_type = self.input_dict["type"] if input_type == "repeat": return "repeat" elif input_type == "conditional": return "conditional" else: return "param"
[docs] def parse_nested_inputs_source(self): assert self.parse_input_type() == "repeat" return YamlPageSource(self.input_dict["blocks"])
[docs] def parse_test_input_source(self): test_dict = self.input_dict.get("test", None) assert test_dict is not None, "conditional must contain a `test` definition" return YamlInputSource(test_dict)
[docs] def parse_when_input_sources(self): input_dict = self.input_dict sources = [] for value, block in input_dict.get("when", {}).items(): if value is True: value = "true" elif value is False: value = "false" else: value = str(value) # str here to lose type information like XML, needed? if not isinstance(block, list): block = [block] case_page_source = YamlPageSource(block) sources.append((value, case_page_source)) return sources
[docs] def parse_static_options(self): static_options = list() input_dict = self.input_dict for option in input_dict.get("options", {}): value = option.get("value") label = option.get("label", value) selected = option.get("selected", False) static_options.append((label, value, selected)) return static_options
def _ensure_has(dict, defaults): for key, value in defaults.items(): if key not in dict: dict[key] = value