This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.parameters.wrapped_json

import logging

import packaging.version

log = logging.getLogger(__name__)

SKIP_INPUT = object()

[docs]def json_wrap(inputs, input_values, profile, as_dict=None, handle_files="skip"): if as_dict is None: as_dict = {} for input in inputs.values(): input_name = input.name value_wrapper = input_values[input_name] json_value = _json_wrap_input(input, value_wrapper, profile, handle_files=handle_files) if json_value is SKIP_INPUT: continue as_dict[input_name] = json_value return as_dict
def data_input_to_path(v): path = _cast_if_not_none(v, str) if path == "None": path = None return path def data_collection_input_to_path(v): return v.all_paths def data_collection_input_to_staging_path_and_source_path(v): staging_paths = v.all_staging_paths source_paths = v.all_paths metadata_files = v.all_metadata_files return [ {'staging_path': staging_path, 'source_path': source_path, 'metadata_files': [{'staging_path': f"{staging_path}.{mf[0]}", 'source_path': mf[1]} for mf in metadata_files] } for staging_path, source_path, metadata_files in zip(staging_paths, source_paths, metadata_files)] def data_input_to_staging_path_and_source_path(v): return { 'staging_path': v.staging_path, 'source_path': data_input_to_path(v), 'metadata_files': [{'staging_path': f"{v.staging_path}.{mf[0]}", 'source_path': mf[1]} for mf in v.all_metadata_files] } def _json_wrap_input(input, value_wrapper, profile, handle_files="skip"): input_type = input.type if input_type == "repeat": repeat_job_value = [] for d in value_wrapper: repeat_instance_job_value = {} json_wrap(input.inputs, d, profile, repeat_instance_job_value, handle_files=handle_files) repeat_job_value.append(repeat_instance_job_value) json_value = repeat_job_value elif input_type == "conditional": values = value_wrapper current = values["__current_case__"] conditional_job_value = {} json_wrap(input.cases[current].inputs, values, profile, conditional_job_value, handle_files=handle_files) test_param = input.test_param test_param_name = test_param.name test_value = _json_wrap_input(test_param, values[test_param_name], profile, handle_files=handle_files) conditional_job_value[test_param_name] = test_value json_value = conditional_job_value elif input_type == "section": values = value_wrapper section_job_value = {} json_wrap(input.inputs, values, profile, section_job_value, handle_files=handle_files) json_value = section_job_value elif input_type == "data" and input.multiple: if handle_files == "paths": json_value = [data_input_to_path(v) for v in value_wrapper] elif handle_files == 'staging_path_and_source_path': json_value = [data_input_to_staging_path_and_source_path(v) for v in value_wrapper] elif handle_files == "skip": return SKIP_INPUT else: raise NotImplementedError() elif input_type == "data": if handle_files == "paths": json_value = data_input_to_path(value_wrapper) elif handle_files == 'staging_path_and_source_path': json_value = data_input_to_staging_path_and_source_path(value_wrapper) elif handle_files == "skip": return SKIP_INPUT elif handle_files == "OBJECT": if value_wrapper: if isinstance(value_wrapper, list): value_wrapper = value_wrapper[0] json_value = _hda_to_object(value_wrapper) if input.load_contents: with open(str(value_wrapper), mode='rb') as fh: json_value['contents'] = fh.read(input.load_contents).decode('utf-8', errors='replace') return json_value else: return None else: raise NotImplementedError() elif input_type == "data_collection": if handle_files == "skip": return SKIP_INPUT elif handle_files == "paths": return data_collection_input_to_path(value_wrapper) elif handle_files == "staging_path_and_source_path": return data_collection_input_to_staging_path_and_source_path(value_wrapper) raise NotImplementedError() elif input_type in ["text", "color", "hidden"]: if getattr(input, "optional", False) and value_wrapper is not None and value_wrapper.value is None: json_value = None else: json_value = _cast_if_not_none(value_wrapper, str) elif input_type == "float": json_value = _cast_if_not_none(value_wrapper, float, empty_to_none=True) elif input_type == "integer": json_value = _cast_if_not_none(value_wrapper, int, empty_to_none=True) elif input_type == "boolean": if input.optional and value_wrapper is not None and value_wrapper.value is None: json_value = None else: json_value = _cast_if_not_none(value_wrapper, bool, empty_to_none=input.optional) elif input_type == "select": if packaging.version.parse(str(profile)) < packaging.version.parse('20.05'): json_value = _cast_if_not_none(value_wrapper, str) else: if input.multiple: json_value = [str(_) for _ in _cast_if_not_none(value_wrapper.value, list)] else: json_value = _cast_if_not_none(value_wrapper.value, str) elif input_type == "data_column": # value is a SelectToolParameterWrapper() if input.multiple: json_value = [int(_) for _ in _cast_if_not_none(value_wrapper.value, list)] else: json_value = [_cast_if_not_none(value_wrapper.value, int)] elif input_type == "directory_uri": json_value = _cast_if_not_none(value_wrapper, str) else: raise NotImplementedError(f"input_type [{input_type}] not implemented") return json_value def _hda_to_object(hda): hda_dict = hda.to_dict() metadata_dict = {} for key, value in hda_dict.items(): if key.startswith("metadata_"): metadata_dict[key[len("metadata_"):]] = value return { 'file_ext': hda_dict['file_ext'], 'file_size': hda_dict['file_size'], 'name': hda_dict['name'], 'metadata': metadata_dict, 'src': {'src': 'hda', 'id': hda.id}, } def _cast_if_not_none(value, cast_to, empty_to_none=False): # log.debug("value [%s], type[%s]" % (value, type(value))) if value is None or (empty_to_none and str(value) == ''): return None else: return cast_to(value) __all__ = ('json_wrap', )