Source code for galaxy.tools.parameters.grouping

"""
Constructs for grouping tool parameters
"""
import io
import logging
import os
import unicodedata
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Mapping,
    Optional,
    TYPE_CHECKING,
)

from galaxy.datatypes import data
from galaxy.exceptions import (
    AdminRequiredException,
    ConfigDoesNotAllowException,
)
from galaxy.files.uris import stream_to_file
from galaxy.util import (
    asbool,
    inflector,
    relpath,
    sanitize_for_filename,
)
from galaxy.util.bunch import Bunch
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.expressions import ExpressionContext

if TYPE_CHECKING:
    from galaxy.tools import Tool
    from galaxy.tools.parameter.basic import ToolParameter

log = logging.getLogger(__name__)
URI_PREFIXES = [
    f"{x}://" for x in ["http", "https", "ftp", "file", "gxfiles", "gximport", "gxuserimport", "gxftp", "drs"]
]


[docs]class Group(Dictifiable): dict_collection_visible_keys = ["name", "type"] type: str
[docs] def __init__(self): self.name = None
@property def visible(self): return True
[docs] def value_to_basic(self, value, app, use_security=False): """ Convert value to a (possibly nested) representation using only basic types (dict, list, tuple, string_types, int, long, float, bool, None) """ return value
[docs] def value_from_basic(self, value, app, ignore_errors=False): """ Convert a basic representation as produced by `value_to_basic` back into the preferred value form. """ return value
[docs] def get_initial_value(self, trans, context): """ Return the initial state/value for this group """ raise TypeError("Not implemented")
[docs] def to_dict(self, trans): group_dict = super().to_dict() return group_dict
[docs]class Repeat(Group): dict_collection_visible_keys = ["name", "type", "title", "help", "default", "min", "max"] type = "repeat"
[docs] def __init__(self): Group.__init__(self) self._title = None self.inputs = None self.help = None self.default = 0 self.min = None self.max = None
@property def title(self): return self._title or self.name @title.setter def title(self, value): self._title = value @property def title_plural(self): return inflector.pluralize(self.title) @property def label(self): return f"Repeat ({self.title})"
[docs] def value_to_basic(self, value, app, use_security=False): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = [] for d in value: rval_dict = {} # Propogate __index__ if "__index__" in d: rval_dict["__index__"] = d["__index__"] for input in self.inputs.values(): if input.name in d: rval_dict[input.name] = input.value_to_basic(d[input.name], app, use_security) rval.append(rval_dict) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = [] try: for i, d in enumerate(value): rval_dict = {} # If the special __index__ key is not set, create it (for backward # compatibility) rval_dict["__index__"] = d.get("__index__", i) # Restore child inputs for input in self.inputs.values(): if ignore_errors and input.name not in d: # If we do not have a value, and are ignoring errors, we simply # do nothing. There will be no value for the parameter in the # conditional's values dictionary. pass else: rval_dict[input.name] = input.value_from_basic(d[input.name], app, ignore_errors) rval.append(rval_dict) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_initial_value(self, trans, context): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = [] for i in range(self.default): rval_dict = {"__index__": i} for input in self.inputs.values(): rval_dict[input.name] = input.get_initial_value(trans, context) rval.append(rval_dict) return rval
[docs] def to_dict(self, trans): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") repeat_dict = super().to_dict(trans) def input_to_dict(input): return input.to_dict(trans) repeat_dict["inputs"] = list(map(input_to_dict, self.inputs.values())) return repeat_dict
[docs]class Section(Group): dict_collection_visible_keys = ["name", "type", "title", "help", "expanded"] type = "section"
[docs] def __init__(self): Group.__init__(self) self.title = None self.inputs = None self.help = None self.expanded = False
@property def title_plural(self): return inflector.pluralize(self.title) @property def label(self): return f"Section ({self.title})"
[docs] def value_to_basic(self, value, app, use_security=False): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = {} for input in self.inputs.values(): if input.name in value: # parameter might be absent in unverified workflow rval[input.name] = input.value_to_basic(value[input.name], app, use_security) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = {} try: for input in self.inputs.values(): if not ignore_errors or input.name in value: rval[input.name] = input.value_from_basic(value[input.name], app, ignore_errors) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_initial_value(self, trans, context): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval: Dict[str, Any] = {} child_context = ExpressionContext(rval, context) for child_input in self.inputs.values(): rval[child_input.name] = child_input.get_initial_value(trans, child_context) return rval
[docs] def to_dict(self, trans): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") section_dict = super().to_dict(trans) def input_to_dict(input): return input.to_dict(trans) section_dict["inputs"] = list(map(input_to_dict, self.inputs.values())) return section_dict
[docs]class Dataset(Bunch): type: str file_type: str dbkey: str datatype: data.Data warnings: List[str] metadata: Dict[str, str] composite_files: Dict[str, Optional[str]] uuid: Optional[str] tag_using_filenames: Optional[str] tags: Optional[str] name: str primary_file: str to_posix_lines: bool auto_decompress: bool ext: str space_to_tab: bool
[docs]class UploadDataset(Group): type = "upload_dataset"
[docs] def __init__(self): Group.__init__(self) self.title = None self.inputs = None self.file_type_name = "file_type" self.default_file_type = "txt" self.file_type_to_ext = {"auto": self.default_file_type} self.metadata_ref = "files_metadata"
[docs] def get_composite_dataset_name(self, context): # FIXME: HACK # Special case of using 'base_name' metadata for use as Dataset name needs to be done in a General Fashion, as defined within a particular Datatype. # We get two different types of contexts here, one straight from submitted parameters, the other after being parsed into tool inputs dataset_name = context.get("files_metadata|base_name", None) if dataset_name is None: dataset_name = context.get("files_metadata", {}).get("base_name", None) if dataset_name is None: filenames = list() for composite_file in context.get("files", []): if not composite_file.get("ftp_files", ""): filenames.append((composite_file.get("file_data") or {}).get("filename", "")) else: filenames.append(composite_file.get("ftp_files", [])[0]) dataset_name = os.path.commonprefix(filenames).rstrip(".") or None if dataset_name is None: dataset_name = f"Uploaded Composite Dataset ({self.get_file_type(context)})" return dataset_name
[docs] def get_file_base_name(self, context): fd = context.get("files_metadata|base_name", "Galaxy_Composite_file") return fd
[docs] def get_file_type(self, context, parent_context=None): file_type = context.get(self.file_type_name, None) if file_type == "": if parent_context: file_type = parent_context.get(self.file_type_name, self.default_file_type) else: file_type = self.default_file_type return file_type
[docs] def get_dbkey(self, context, parent_context=None): dbkey = context.get("dbkey", None) if dbkey == "": if parent_context: dbkey = parent_context.get("dbkey", dbkey) return dbkey
[docs] def get_datatype_ext(self, trans, context, parent_context=None): ext = self.get_file_type(context, parent_context=parent_context) if ext in self.file_type_to_ext: ext = self.file_type_to_ext[ ext ] # when using autodetect, we will use composite info from 'text', i.e. only the main file return ext
[docs] def get_datatype(self, trans, context, parent_context=None): ext = self.get_datatype_ext(trans, context, parent_context=parent_context) return trans.app.datatypes_registry.get_datatype_by_extension(ext)
@property def title_plural(self): return inflector.pluralize(self.title)
[docs] def group_title(self, context): return f"{self.title} ({context.get(self.file_type_name, self.default_file_type)})"
[docs] def title_by_index(self, trans, index, context): d_type = self.get_datatype(trans, context) for i, (composite_name, composite_file) in enumerate(d_type.writable_files.items()): if i == index: rval = composite_name if composite_file.description: rval = f"{rval} ({composite_file.description})" if composite_file.optional: rval = f"{rval} [optional]" return rval if index < self.get_file_count(trans, context): return "Extra primary file" return None
[docs] def value_to_basic(self, value, app, use_security=False): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = [] for d in value: rval_dict = {} # Propogate __index__ if "__index__" in d: rval_dict["__index__"] = d["__index__"] for input in self.inputs.values(): if input.name in d: rval_dict[input.name] = input.value_to_basic(d[input.name], app, use_security) rval.append(rval_dict) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") rval = [] for i, d in enumerate(value): try: rval_dict = {} # If the special __index__ key is not set, create it (for backward # compatibility) rval_dict["__index__"] = d.get("__index__", i) # Restore child inputs for input in self.inputs.values(): if ignore_errors and input.name not in d: # this wasn't tested rval_dict[input.name] = input.get_initial_value(None, d) else: rval_dict[input.name] = input.value_from_basic(d[input.name], app, ignore_errors) rval.append(rval_dict) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_file_count(self, trans, context): file_count = context.get("file_count", "auto") if file_count == "auto": d_type = self.get_datatype(trans, context) return len(d_type.writable_files) if d_type else 1 else: return int(file_count)
[docs] def get_initial_value(self, trans, context): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") file_count = self.get_file_count(trans, context) rval = [] for i in range(file_count): rval_dict = {} rval_dict["__index__"] = i # create __index__ for input in self.inputs.values(): rval_dict[input.name] = input.get_initial_value(trans, context) rval.append(rval_dict) return rval
[docs] def get_uploaded_datasets(self, trans, context, override_name=None, override_info=None): def get_data_file_filename(data_file, override_name=None, override_info=None, purge=True): dataset_name = override_name def get_file_name(file_name): file_name = file_name.split("\\")[-1] file_name = file_name.split("/")[-1] return file_name try: # Use the existing file if not dataset_name and "filename" in data_file: dataset_name = get_file_name(data_file["filename"]) return Bunch(type="file", path=data_file["local_filename"], name=dataset_name, purge_source=purge) except Exception: # The uploaded file should've been persisted by the upload tool action return Bunch(type=None, path=None, name=None) def get_url_paste_urls_or_filename(group_incoming, override_name=None, override_info=None): url_paste_file = group_incoming.get("url_paste", None) if url_paste_file is not None: url_paste = open(url_paste_file).read() def start_of_url(content): start_of_url_paste = content.lstrip()[0:10].lower() looks_like_url = False for url_prefix in URI_PREFIXES: if start_of_url_paste.startswith(url_prefix): looks_like_url = True break return looks_like_url if start_of_url(url_paste): for line in url_paste.replace("\r", "").split("\n"): line = line.strip() if line: if not start_of_url(line): continue # non-url line, ignore if "file://" in line: if not trans.user_is_admin: raise AdminRequiredException() elif not trans.app.config.allow_path_paste: raise ConfigDoesNotAllowException() upload_path = line[len("file://") :] dataset_name = os.path.basename(upload_path) else: dataset_name = line if override_name: dataset_name = override_name yield Bunch(type="url", path=line, name=dataset_name) else: dataset_name = "Pasted Entry" # we need to differentiate between various url pastes here if override_name: dataset_name = override_name yield Bunch(type="file", path=url_paste_file, name=dataset_name) def get_one_filename(context): data_file = context["file_data"] url_paste = context["url_paste"] ftp_files = context["ftp_files"] name = context.get("NAME", None) info = context.get("INFO", None) uuid = context.get("uuid", None) or None # Turn '' to None file_type = context.get("file_type", None) dbkey = self.get_dbkey(context) warnings = [] to_posix_lines = False if context.get("to_posix_lines", None) not in ["None", None, False]: to_posix_lines = True auto_decompress = False if context.get("auto_decompress", None) not in ["None", None, False]: auto_decompress = True space_to_tab = False if context.get("space_to_tab", None) not in ["None", None, False]: space_to_tab = True file_bunch = get_data_file_filename(data_file, override_name=name, override_info=info) if file_bunch.path: if url_paste is not None and url_paste.strip(): warnings.append("All file contents specified in the paste box were ignored.") if ftp_files: warnings.append("All FTP uploaded file selections were ignored.") elif url_paste is not None and url_paste.strip(): # we need to use url_paste for file_bunch in get_url_paste_urls_or_filename(context, override_name=name, override_info=info): if file_bunch.path: break if file_bunch.path and ftp_files is not None: warnings.append("All FTP uploaded file selections were ignored.") elif ftp_files is not None and trans.user is not None: # look for files uploaded via FTP user_ftp_dir = trans.user_ftp_dir assert not os.path.islink(user_ftp_dir), "User FTP directory cannot be a symbolic link" for dirpath, _dirnames, filenames in os.walk(user_ftp_dir): for filename in filenames: for ftp_filename in ftp_files: if ftp_filename == filename: path = relpath(os.path.join(dirpath, filename), user_ftp_dir) if not os.path.islink(os.path.join(dirpath, filename)): ftp_data_file = { "local_filename": os.path.abspath(os.path.join(user_ftp_dir, path)), "filename": os.path.basename(path), } purge = getattr(trans.app.config, "ftp_upload_purge", True) file_bunch = get_data_file_filename( ftp_data_file, override_name=name, override_info=info, purge=purge, ) if file_bunch.path: break if file_bunch.path: break if file_bunch.path: break file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab file_bunch.uuid = uuid if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey return file_bunch, warnings def get_filenames(context): rval = [] data_file = context["file_data"] ftp_files = context["ftp_files"] uuid = context.get("uuid", None) or None # Turn '' to None name = context.get("NAME", None) info = context.get("INFO", None) file_type = context.get("file_type", None) dbkey = self.get_dbkey(context) to_posix_lines = False if context.get("to_posix_lines", None) not in ["None", None, False]: to_posix_lines = True auto_decompress = False if context.get("auto_decompress", None) not in ["None", None, False]: auto_decompress = True space_to_tab = False if context.get("space_to_tab", None) not in ["None", None, False]: space_to_tab = True file_bunch = get_data_file_filename(data_file, override_name=name, override_info=info) file_bunch.uuid = uuid if file_bunch.path: file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey rval.append(file_bunch) for file_bunch in get_url_paste_urls_or_filename(context, override_name=name, override_info=info): if file_bunch.path: file_bunch.uuid = uuid file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey rval.append(file_bunch) # look for files uploaded via FTP valid_files = [] if ftp_files is not None: # Normalize input paths to ensure utf-8 encoding is normal form c. # This allows for comparison when the filesystem uses a different encoding than the browser. ftp_files = [unicodedata.normalize("NFC", f) for f in ftp_files if isinstance(f, str)] if trans.user is None: log.warning(f"Anonymous user passed values in ftp_files: {ftp_files}") ftp_files = [] # TODO: warning to the user (could happen if session has become invalid) else: user_ftp_dir = trans.user_ftp_dir assert not os.path.islink(user_ftp_dir), "User FTP directory cannot be a symbolic link" for dirpath, _dirnames, filenames in os.walk(user_ftp_dir): for filename in filenames: path = relpath(os.path.join(dirpath, filename), user_ftp_dir) if not os.path.islink(os.path.join(dirpath, filename)): # Normalize filesystem paths if isinstance(path, str): valid_files.append(unicodedata.normalize("NFC", path)) else: valid_files.append(path) else: ftp_files = [] for ftp_file in ftp_files: if ftp_file not in valid_files: log.warning(f"User passed an invalid file path in ftp_files: {ftp_file}") continue # TODO: warning to the user (could happen if file is already imported) ftp_data_file = { "local_filename": os.path.abspath(os.path.join(user_ftp_dir, ftp_file)), "filename": os.path.basename(ftp_file), } purge = getattr(trans.app.config, "ftp_upload_purge", True) file_bunch = get_data_file_filename(ftp_data_file, override_name=name, override_info=info, purge=purge) if file_bunch.path: file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey rval.append(file_bunch) return rval file_type = self.get_file_type(context) file_count = self.get_file_count(trans, context) d_type = self.get_datatype(trans, context) dbkey = self.get_dbkey(context) tag_using_filenames = context.get("tag_using_filenames", False) tags = context.get("tags", False) force_composite = asbool(context.get("force_composite", "False")) writable_files = d_type.writable_files writable_files_offset = 0 groups_incoming = [None for _ in range(file_count)] for group_incoming in context.get(self.name, []): i = int(group_incoming["__index__"]) groups_incoming[i] = group_incoming if d_type.composite_type is not None or force_composite: # handle uploading of composite datatypes # Only one Dataset can be created dataset = Dataset() dataset.type = "composite" dataset.file_type = file_type dataset.dbkey = dbkey dataset.datatype = d_type dataset.warnings = [] dataset.metadata = {} dataset.composite_files = {} dataset.uuid = None dataset.tag_using_filenames = None dataset.tags = None # load metadata files_metadata = context.get(self.metadata_ref, {}) metadata_name_substition_default_dict = { composite_file.substitute_name_with_metadata: d_type.metadata_spec[ composite_file.substitute_name_with_metadata ].default for composite_file in d_type.composite_files.values() if composite_file.substitute_name_with_metadata } for meta_name, meta_spec in d_type.metadata_spec.items(): if meta_spec.set_in_upload: if meta_name in files_metadata: meta_value = files_metadata[meta_name] if meta_name in metadata_name_substition_default_dict: meta_value = sanitize_for_filename( meta_value, default=metadata_name_substition_default_dict[meta_name] ) dataset.metadata[meta_name] = meta_value dataset.name = self.get_composite_dataset_name(context) if dataset.datatype.composite_type == "auto_primary_file": # replace sniff here with just creating an empty file temp_name = stream_to_file( io.StringIO(d_type.generate_primary_file(dataset)), prefix="upload_auto_primary_file" ) dataset.primary_file = temp_name dataset.to_posix_lines = True dataset.auto_decompress = True dataset.space_to_tab = False else: file_bunch, warnings = get_one_filename(groups_incoming[0]) writable_files_offset = 1 dataset.primary_file = file_bunch.path dataset.to_posix_lines = file_bunch.to_posix_lines dataset.auto_decompress = file_bunch.auto_decompress dataset.space_to_tab = file_bunch.space_to_tab if file_bunch.file_type: dataset.file_type = file_type if file_bunch.dbkey: dataset.dbkey = dbkey dataset.warnings.extend(warnings) if dataset.primary_file is None: # remove this before finish, this should create an empty dataset raise Exception("No primary dataset file was available for composite upload") if not force_composite: keys = [value.name for value in writable_files.values()] else: keys = [str(index) for index in range(file_count)] for i, group_incoming in enumerate(groups_incoming[writable_files_offset:]): key = keys[i + writable_files_offset] if ( not force_composite and group_incoming is None and not writable_files[list(writable_files.keys())[keys.index(key)]].optional ): dataset.warnings.append(f"A required composite file ({key}) was not specified.") dataset.composite_files[key] = None else: file_bunch, warnings = get_one_filename(group_incoming) dataset.warnings.extend(warnings) if file_bunch.path: if force_composite: key = group_incoming.get("NAME") or i dataset.composite_files[key] = file_bunch.__dict__ elif not force_composite: dataset.composite_files[key] = None if not writable_files[list(writable_files.keys())[keys.index(key)]].optional: dataset.warnings.append(f"A required composite file ({key}) was not specified.") return [dataset] else: rval = [] for i, file_contexts in enumerate(context[self.name]): datasets = get_filenames(file_contexts) for dataset in datasets: override_file_type = self.get_file_type(context[self.name][i], parent_context=context) d_type = self.get_datatype(trans, context[self.name][i], parent_context=context) dataset.file_type = override_file_type dataset.datatype = d_type dataset.ext = self.get_datatype_ext(trans, context[self.name][i], parent_context=context) dataset.dbkey = self.get_dbkey(context[self.name][i], parent_context=context) dataset.tag_using_filenames = tag_using_filenames dataset.tags = tags rval.append(dataset) return rval
[docs]class Conditional(Group): type = "conditional" value_from: Callable[[ExpressionContext, "Conditional", "Tool"], Mapping[str, str]]
[docs] def __init__(self): Group.__init__(self) self.test_param: Optional[ToolParameter] = None self.cases = [] self.value_ref = None self.value_ref_in_group = True # When our test_param is not part of the conditional Group, this is False
@property def label(self): return f"Conditional ({self.name})"
[docs] def get_current_case(self, value): if self.test_param is None: raise Exception("Must set 'test_param' attribute to use.") # Convert value to user representation str_value = self.test_param.to_param_dict_string(value) # Find the matching case for index, case in enumerate(self.cases): if str_value == case.value: return index raise ValueError("No case matched value:", self.name, str_value)
[docs] def value_to_basic(self, value, app, use_security=False): if self.test_param is None: raise Exception("Must set 'test_param' attribute to use.") rval = dict() rval[self.test_param.name] = self.test_param.value_to_basic(value[self.test_param.name], app) current_case = rval["__current_case__"] = self.get_current_case(value[self.test_param.name]) for input in self.cases[current_case].inputs.values(): if input.name in value: # parameter might be absent in unverified workflow rval[input.name] = input.value_to_basic(value[input.name], app, use_security=use_security) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): if self.test_param is None: raise Exception("Must set 'test_param' attribute to use.") rval = dict() try: rval[self.test_param.name] = self.test_param.value_from_basic( value.get(self.test_param.name), app, ignore_errors ) current_case = rval["__current_case__"] = self.get_current_case(rval[self.test_param.name]) # Inputs associated with current case for input in self.cases[current_case].inputs.values(): # If we do not have a value, and are ignoring errors, we simply # do nothing. There will be no value for the parameter in the # conditional's values dictionary. if not ignore_errors or input.name in value: rval[input.name] = input.value_from_basic(value[input.name], app, ignore_errors) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_initial_value(self, trans, context): if self.test_param is None: raise Exception("Must set 'test_param' attribute to use.") # State for a conditional is a plain dictionary. rval = {} # Get the default value for the 'test element' and use it # to determine the current case test_value = self.test_param.get_initial_value(trans, context) current_case = self.get_current_case(test_value) # Store the current case in a special value rval["__current_case__"] = current_case # Store the value of the test element rval[self.test_param.name] = test_value # Fill in state for selected case child_context = ExpressionContext(rval, context) for child_input in self.cases[current_case].inputs.values(): rval[child_input.name] = child_input.get_initial_value(trans, child_context) return rval
[docs] def to_dict(self, trans): if self.test_param is None: raise Exception("Must set 'test_param' attribute to use.") cond_dict = super().to_dict(trans) def nested_to_dict(input): return input.to_dict(trans) cond_dict["cases"] = list(map(nested_to_dict, self.cases)) cond_dict["test_param"] = nested_to_dict(self.test_param) return cond_dict
[docs]class ConditionalWhen(Dictifiable): dict_collection_visible_keys = ["value"]
[docs] def __init__(self): self.value = None self.inputs = None
[docs] def to_dict(self, trans): if self.inputs is None: raise Exception("Must set 'inputs' attribute to use.") when_dict = super().to_dict() def input_to_dict(input): return input.to_dict(trans) when_dict["inputs"] = list(map(input_to_dict, self.inputs.values())) return when_dict