Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.parameters.wrapped_json

import logging

import packaging.version

log = logging.getLogger(__name__)

SKIP_INPUT = object()


[docs]def json_wrap(inputs, input_values, profile, as_dict=None, handle_files="skip"): if as_dict is None: as_dict = {} for input in inputs.values(): input_name = input.name value_wrapper = input_values[input_name] json_value = _json_wrap_input(input, value_wrapper, profile, handle_files=handle_files) if json_value is SKIP_INPUT: continue as_dict[input_name] = json_value return as_dict
def _json_wrap_input(input, value_wrapper, profile, handle_files="skip"): input_type = input.type def _data_input_to_path(v): path = _cast_if_not_none(v, str) if path == "None": path = None return path if input_type == "repeat": repeat_job_value = [] for d in value_wrapper: repeat_instance_job_value = {} json_wrap(input.inputs, d, profile, repeat_instance_job_value, handle_files=handle_files) repeat_job_value.append(repeat_instance_job_value) json_value = repeat_job_value elif input_type == "conditional": values = value_wrapper current = values["__current_case__"] conditional_job_value = {} json_wrap(input.cases[current].inputs, values, profile, conditional_job_value, handle_files=handle_files) test_param = input.test_param test_param_name = test_param.name test_value = _json_wrap_input(test_param, values[test_param_name], profile, handle_files=handle_files) conditional_job_value[test_param_name] = test_value json_value = conditional_job_value elif input_type == "section": values = value_wrapper section_job_value = {} json_wrap(input.inputs, values, profile, section_job_value, handle_files=handle_files) json_value = section_job_value elif input_type == "data" and input.multiple: if handle_files == "paths": json_value = list(map(_data_input_to_path, value_wrapper)) elif handle_files == "skip": return SKIP_INPUT else: raise NotImplementedError() elif input_type == "data": if handle_files == "paths": json_value = _data_input_to_path(value_wrapper) elif handle_files == "skip": return SKIP_INPUT elif handle_files == "OBJECT": if value_wrapper: if isinstance(value_wrapper, list): value_wrapper = value_wrapper[0] json_value = _hda_to_object(value_wrapper) if input.load_contents: with open(str(value_wrapper), mode='rb') as fh: json_value['contents'] = fh.read(input.load_contents).decode('utf-8', errors='replace') return json_value else: return None else: raise NotImplementedError() elif input_type == "data_collection": if handle_files == "skip": return SKIP_INPUT raise NotImplementedError() elif input_type in ["text", "color", "hidden"]: if getattr(input, "optional", False) and value_wrapper is not None and value_wrapper.value is None: json_value = None else: json_value = _cast_if_not_none(value_wrapper, str) elif input_type == "float": json_value = _cast_if_not_none(value_wrapper, float, empty_to_none=True) elif input_type == "integer": json_value = _cast_if_not_none(value_wrapper, int, empty_to_none=True) elif input_type == "boolean": if input.optional and value_wrapper is not None and value_wrapper.value is None: json_value = None else: json_value = _cast_if_not_none(value_wrapper, bool, empty_to_none=input.optional) elif input_type == "select": if packaging.version.parse(str(profile)) < packaging.version.parse('20.05'): json_value = _cast_if_not_none(value_wrapper, str) else: if input.multiple: json_value = [str(_) for _ in _cast_if_not_none(value_wrapper.value, list)] else: json_value = _cast_if_not_none(value_wrapper.value, str) elif input_type == "data_column": # value is a SelectToolParameterWrapper() if input.multiple: json_value = [int(_) for _ in _cast_if_not_none(value_wrapper.value, list)] else: json_value = [_cast_if_not_none(value_wrapper.value, int)] else: raise NotImplementedError("input_type [%s] not implemented" % input_type) return json_value def _hda_to_object(hda): hda_dict = hda.to_dict() metadata_dict = {} for key, value in hda_dict.items(): if key.startswith("metadata_"): metadata_dict[key[len("metadata_"):]] = value return { 'file_ext': hda_dict['file_ext'], 'file_size': hda_dict['file_size'], 'name': hda_dict['name'], 'metadata': metadata_dict, 'src': {'src': 'hda', 'id': hda.id}, } def _cast_if_not_none(value, cast_to, empty_to_none=False): # log.debug("value [%s], type[%s]" % (value, type(value))) if value is None or (empty_to_none and str(value) == ''): return None else: return cast_to(value) __all__ = ('json_wrap', )