Source code for galaxy.tools.parameters.wrapped_json

import json
import logging
from typing import (
    Any,
    Dict,
    List,
    Sequence,
    TYPE_CHECKING,
)

from packaging.version import Version

if TYPE_CHECKING:
    from galaxy.tools.parameters.wrappers import (
        DatasetCollectionWrapper,
        DatasetFilenameWrapper,
    )

log = logging.getLogger(__name__)

SKIP_INPUT = object()


[docs]def json_wrap(inputs, input_values, profile, as_dict=None, handle_files="skip"): if as_dict is None: as_dict = {} for input in inputs.values(): input_name = input.name value_wrapper = input_values[input_name] json_value = _json_wrap_input(input, value_wrapper, profile, handle_files=handle_files) if json_value is SKIP_INPUT: continue as_dict[input_name] = json_value return as_dict
def data_input_to_path(v): path = _cast_if_not_none(v, str) if path == "None": path = None return path def data_collection_input_to_path(v): return v.all_paths def data_collection_input_to_staging_path_and_source_path( v: "DatasetCollectionWrapper", invalid_chars: Sequence[str] = ("/",), include_collection_name: bool = False ) -> List[Dict[str, Any]]: staging_paths = v.get_all_staging_paths( invalid_chars=invalid_chars, include_collection_name=include_collection_name ) if v.element_identifiers_extensions_paths_and_metadata_files: element_identifiers, extensions, source_paths, metadata_files = zip( *v.element_identifiers_extensions_paths_and_metadata_files ) else: element_identifiers, extensions, source_paths, metadata_files = (), (), (), () return [ { "element_identifier": element_identifier, "ext": extension, "staging_path": staging_path, "source_path": source_path, "metadata_files": [ {"staging_path": f"{staging_path}.{mf[0]}", "source_path": mf[1]} for mf in metadata_files ], } for element_identifier, extension, staging_path, source_path, metadata_files in zip( element_identifiers, extensions, staging_paths, source_paths, metadata_files ) ] def data_input_to_staging_path_and_source_path( v: "DatasetFilenameWrapper", invalid_chars: Sequence[str] = ("/",) ) -> Dict[str, Any]: staging_path = v.get_staging_path(invalid_chars=invalid_chars) # note that the element identifier should be always a list return { "element_identifier": [v.element_identifier], "ext": v.file_ext, "staging_path": staging_path, "source_path": data_input_to_path(v), "metadata_files": [ {"staging_path": f"{staging_path}.{mf[0]}", "source_path": mf[1]} for mf in v.all_metadata_files ], } def _json_wrap_input(input, value_wrapper, profile, handle_files="skip"): input_type = input.type if input_type == "repeat": repeat_job_value = [] for d in value_wrapper: repeat_instance_job_value = {} json_wrap(input.inputs, d, profile, repeat_instance_job_value, handle_files=handle_files) repeat_job_value.append(repeat_instance_job_value) json_value = repeat_job_value elif input_type == "conditional": values = value_wrapper current = values["__current_case__"] conditional_job_value = {} json_wrap(input.cases[current].inputs, values, profile, conditional_job_value, handle_files=handle_files) test_param = input.test_param test_param_name = test_param.name test_value = _json_wrap_input(test_param, values[test_param_name], profile, handle_files=handle_files) conditional_job_value[test_param_name] = test_value json_value = conditional_job_value elif input_type == "section": values = value_wrapper section_job_value = {} json_wrap(input.inputs, values, profile, section_job_value, handle_files=handle_files) json_value = section_job_value elif input_type == "data" and input.multiple: if handle_files == "paths": json_value = [data_input_to_path(v) for v in value_wrapper] elif handle_files == "staging_path_and_source_path": json_value = [data_input_to_staging_path_and_source_path(v) for v in value_wrapper] elif handle_files == "skip": return SKIP_INPUT else: raise NotImplementedError() elif input_type == "data": if handle_files == "paths": json_value = data_input_to_path(value_wrapper) elif handle_files == "staging_path_and_source_path": json_value = data_input_to_staging_path_and_source_path(value_wrapper) elif handle_files == "skip": return SKIP_INPUT elif handle_files == "OBJECT": if value_wrapper: if isinstance(value_wrapper, list): value_wrapper = value_wrapper[0] json_value = _hda_to_object(value_wrapper) if input.load_contents: with open(str(value_wrapper), mode="rb") as fh: json_value["contents"] = fh.read(input.load_contents).decode("utf-8", errors="replace") return json_value else: return None else: raise NotImplementedError() elif input_type == "data_collection": if handle_files == "skip": return SKIP_INPUT elif handle_files == "paths": return data_collection_input_to_path(value_wrapper) elif handle_files == "staging_path_and_source_path": return data_collection_input_to_staging_path_and_source_path(value_wrapper) raise NotImplementedError() elif input_type in ["text", "color", "hidden"]: if getattr(input, "optional", False) and value_wrapper is not None and value_wrapper.value is None: json_value = None else: json_value = _cast_if_not_none(value_wrapper, str) elif input_type == "float": json_value = _cast_if_not_none(value_wrapper, float, empty_to_none=True) elif input_type == "integer": json_value = _cast_if_not_none(value_wrapper, int, empty_to_none=True) elif input_type == "boolean": if input.optional and value_wrapper is not None and value_wrapper.value is None: json_value = None else: json_value = _cast_if_not_none(value_wrapper, bool, empty_to_none=input.optional) elif input_type == "select": if Version(str(profile)) < Version("20.05"): json_value = _cast_if_not_none(value_wrapper, str) else: if input.multiple: json_value = [str(_) for _ in _cast_if_not_none(value_wrapper.value, list)] else: json_value = _cast_if_not_none(value_wrapper.value, str) elif input_type == "data_column": # value is a SelectToolParameterWrapper() if input.multiple: json_value = [int(_) for _ in _cast_if_not_none(value_wrapper.value, list)] else: json_value = [_cast_if_not_none(value_wrapper.value, int)] elif input_type == "directory_uri": json_value = _cast_if_not_none(value_wrapper, str) else: raise NotImplementedError(f"input_type [{input_type}] not implemented") return json_value def _hda_to_object(hda): if hda.extension == "expression.json": # We may have a null data value with open(str(hda)) as inp: try: rval = json.loads(inp.read(5)) if rval is None: return rval except Exception: pass hda_dict = hda.to_dict() metadata_dict = {} for key, value in hda_dict.items(): if key.startswith("metadata_"): metadata_dict[key[len("metadata_") :]] = value return { "file_ext": hda_dict["file_ext"], "file_size": hda_dict["file_size"], "name": hda_dict["name"], "metadata": metadata_dict, "src": {"src": "hda", "id": hda.id}, } def _cast_if_not_none(value, cast_to, empty_to_none=False): # log.debug("value [%s], type[%s]" % (value, type(value))) if value is None or (empty_to_none and str(value) == ""): return None else: return cast_to(value) __all__ = ("json_wrap",)