Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.parameters.basic

"""
Basic tool parameters.
"""

import contextlib
import json
import logging
import os
import os.path
import re
import typing
import urllib.parse
from collections.abc import (
    MutableMapping,
    Sequence,
)
from typing import (
    Any,
    cast,
    Optional,
    TYPE_CHECKING,
)

from packaging.version import Version
from webob.compat import cgi_FieldStorage

from galaxy import util
from galaxy.files import ProvidesFileSourcesUserContext
from galaxy.managers.dbkeys import read_dbnames
from galaxy.model import (
    cached_id,
    Dataset,
    DatasetCollection,
    DatasetCollectionElement,
    DatasetHash,
    DatasetInstance,
    DatasetSource,
    HistoryDatasetAssociation,
    HistoryDatasetCollectionAssociation,
    LibraryDatasetDatasetAssociation,
)
from galaxy.model.dataset_collections import (
    builder,
    query,
)
from galaxy.model.dataset_collections.adapters import (
    CollectionAdapter,
    recover_adapter,
    TransientCollectionAdapterDatasetInstanceElement,
    validate_collection_adapter_src_dict,
)
from galaxy.model.dataset_collections.types.sample_sheet_util import column_definitions_compatible
from galaxy.schema.fetch_data import FilesPayload
from galaxy.tool_util.parameters.factory import get_color_value
from galaxy.tool_util.parser import get_input_source as ensure_input_source
from galaxy.tool_util.parser.util import (
    boolean_is_checked,
    boolean_true_and_false_values,
    multiple_select_value_split,
    ParameterParseException,
    text_input_is_optional,
)
from galaxy.tool_util_models.tool_source import DrillDownOptionsDict
from galaxy.tools.parameters.options import ParameterOption
from galaxy.tools.parameters.pagination import (
    DataOptionsBuilder,
    make_dce_entry,
    make_hda_entry,
    make_hdca_entry,
    make_ldda_entry,
    MAX_OPTIONS_PAGE_SIZE,
    ParameterPaginationT,
)
from galaxy.tools.parameters.workflow_utils import (
    NO_REPLACEMENT,
    workflow_building_modes,
)
from galaxy.util import (
    sanitize_param,
    string_as_bool,
    string_as_bool_or_none,
    unicodify,
)
from galaxy.util.dictifiable import UsesDictVisibleKeys
from galaxy.util.expressions import ExpressionContext
from galaxy.util.hash_util import HASH_NAMES
from galaxy.util.rules_dsl import RuleSet
from . import (
    dynamic_options,
    validation,
)
from .dataset_matcher import get_dataset_matcher_factory
from .sanitize import ToolParameterSanitizer
from .workflow_utils import (
    ConnectedValue,
    is_runtime_value,
    runtime_to_json,
    runtime_to_object,
    RuntimeValue,
)

if TYPE_CHECKING:
    from sqlalchemy.orm import Session

    from galaxy.managers.context import ProvidesHistoryContext
    from galaxy.model import (
        History,
        HistoryItem,
    )
    from galaxy.security.idencoding import IdEncodingHelper
    from galaxy.structured_app import MinimalApp
    from galaxy.tools import Tool

log = logging.getLogger(__name__)


WORKFLOW_PARAMETER_REGULAR_EXPRESSION = re.compile(r"\$\{.+?\}")


[docs] class ImplicitConversionRequired(Exception): pass
[docs] def contains_workflow_parameter(value, search=False): if not isinstance(value, str): return False if search and WORKFLOW_PARAMETER_REGULAR_EXPRESSION.search(value): return True if not search and WORKFLOW_PARAMETER_REGULAR_EXPRESSION.match(value): return True return False
[docs] def is_runtime_context(trans, other_values): if trans.workflow_building_mode: return True for context_value in other_values.values(): if is_runtime_value(context_value): return True for v in util.listify(context_value): if isinstance(v, HistoryDatasetAssociation) and ( (hasattr(v, "state") and v.state != Dataset.states.OK) or hasattr(v, "implicit_conversion") ): return True return False
[docs] def parse_dynamic_options(param, input_source): dynamic_options_config = input_source.parse_dynamic_options() if not dynamic_options_config: return None options_elem = dynamic_options_config.elem() if options_elem is None: return None return dynamic_options.DynamicOptions(options_elem, param)
[docs] def serialize_options(security: "IdEncodingHelper", options: Sequence[ParameterOption]): return [option.serialize(security) for option in options]
# Describe a parameter value error where there is no actual supplied # parameter - e.g. just a specification issue. NO_PARAMETER_VALUE = object()
[docs] @contextlib.contextmanager def assert_throws_param_value_error(message): exception_thrown = False try: yield except ParameterValueError as e: exception_thrown = True assert str(e) == message assert exception_thrown
[docs] class ParameterValueError(ValueError):
[docs] def __init__(self, message_suffix, parameter_name, parameter_value=NO_PARAMETER_VALUE, is_dynamic=None): message = f"Parameter '{parameter_name}': {message_suffix}" super().__init__(message) self.message_suffix = message_suffix self.parameter_name = parameter_name self.parameter_value = parameter_value self.is_dynamic = is_dynamic
[docs] def to_dict(self): as_dict = {"message": unicodify(self)} as_dict["message_suffix"] = self.message_suffix as_dict["parameter_name"] = self.parameter_name if self.parameter_value is not NO_PARAMETER_VALUE: as_dict["parameter_value"] = self.parameter_value if self.is_dynamic is not None: as_dict["is_dynamic"] = self.is_dynamic return as_dict
[docs] class ToolParameter(UsesDictVisibleKeys): """ Describes a parameter accepted by a tool. This is just a simple stub at the moment but in the future should encapsulate more complex parameters (lists of valid choices, validation logic, ...) >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, security=lambda x: x) >>> p = ToolParameter(None, XML('<param argument="--parameter-name" type="text" value="default" />')) >>> assert p.name == 'parameter_name' >>> assert sorted(p.to_dict(trans).items()) == [('argument', '--parameter-name'), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ToolParameter'), ('name', 'parameter_name'), ('optional', False), ('refresh_on_change', False), ('type', 'text'), ('value', None)] """ name: str dict_collection_visible_keys = ["name", "argument", "type", "label", "help", "help_format", "refresh_on_change"]
[docs] def __init__(self, tool: Optional["Tool"], input_source, context=None): input_source = ensure_input_source(input_source) self.tool = tool self.argument = input_source.get("argument") self.name = self.__class__.parse_name(input_source) self.type = input_source.get("type") self.hidden = input_source.get_bool("hidden", False) self.refresh_on_change = input_source.get_bool("refresh_on_change", False) self.optional = input_source.parse_optional() self.is_dynamic = False self.label = input_source.parse_label() self.help = input_source.parse_help() self.help_format = input_source.get("help_format") or "html" if (sanitizer_elem := input_source.parse_sanitizer_elem()) is not None: self.sanitizer = ToolParameterSanitizer.from_element(sanitizer_elem) else: self.sanitizer = None self.validators = validation.to_validators(tool.app if tool else None, input_source.parse_validators())
@property def visible(self) -> bool: """Return true if the parameter should be rendered on the form""" return True
[docs] def get_label(self): """Return user friendly name for the parameter""" return self.label if self.label else self.name
[docs] def from_json(self, value, trans, other_values=None): """ Convert a value from an HTML POST into the parameters preferred value format. """ return value
[docs] def get_initial_value(self, trans, other_values): """ Return the starting value of the parameter """ return None
[docs] def get_required_enctype(self): """ If this parameter needs the form to have a specific encoding return it, otherwise return None (indicating compatibility with any encoding) """ return None
[docs] def get_dependencies(self): """ Return the names of any other parameters this parameter depends on """ return []
[docs] def to_json(self, value, app, use_security) -> str: """Convert a value to a string representation suitable for persisting""" return unicodify(value)
[docs] def to_python(self, value, app): """Convert a value created with to_json back to an object representation""" return value
[docs] def value_to_basic(self, value, app, use_security=False): if is_runtime_value(value): return runtime_to_json(value) elif value == NO_REPLACEMENT: return {"__class__": "NoReplacement"} return self.to_json(value, app, use_security)
[docs] def value_from_basic(self, value, app, ignore_errors=False): # Handle Runtime and Unvalidated values if is_runtime_value(value): if isinstance(self, HiddenToolParameter): raise ParameterValueError(message_suffix="Runtime Parameter not valid", parameter_name=self.name) return runtime_to_object(value) elif isinstance(value, MutableMapping): if value.get("__class__") == "UnvalidatedValue": return value["value"] elif value.get("__class__") == "NoReplacement": return NO_REPLACEMENT # Delegate to the 'to_python' method try: return self.to_python(value, app) except Exception: if not ignore_errors: raise return value
[docs] def value_to_display_text(self, value) -> str: if is_runtime_value(value): return "Not available." return self.to_text(value)
[docs] def to_text(self, value) -> str: """ Convert a value to a text representation suitable for displaying to the user >>> from galaxy.util import XML >>> p = ToolParameter(None, XML('<param name="_name" />')) >>> print(p.to_text(None)) Not available. >>> print(p.to_text('')) Empty. >>> print(p.to_text('text')) text >>> print(p.to_text(True)) True >>> print(p.to_text(False)) False >>> print(p.to_text(0)) 0 """ if value is not None: str_value = unicodify(value) if not str_value: return "Empty." return str_value return "Not available."
[docs] def to_param_dict_string(self, value, other_values=None) -> str: """Called via __str__ when used in the Cheetah template""" if value is None: value = "" elif not isinstance(value, str): value = str(value) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: value = self.sanitizer.sanitize_param(value) else: value = sanitize_param(value) return value
[docs] def validate(self, value, trans=None) -> None: if value in ["", None] and self.optional: return for validator in self.validators: try: validator.validate(value, trans) except ValueError as e: raise ParameterValueError(str(e), self.name, value) from None
[docs] def to_dict(self, trans, other_values=None): """to_dict tool parameter. This can be overridden by subclasses.""" other_values = other_values or {} tool_dict = self._dictify_view_keys() tool_dict["model_class"] = self.__class__.__name__ tool_dict["optional"] = self.optional tool_dict["hidden"] = self.hidden tool_dict["is_dynamic"] = self.is_dynamic tool_dict["value"] = self.value_to_basic( self.get_initial_value(trans, other_values), trans.app, use_security=True ) return tool_dict
[docs] @classmethod def build(cls, tool: "Tool", input_source) -> "ToolParameter": """Factory method to create parameter of correct type""" input_source = ensure_input_source(input_source) param_name = cls.parse_name(input_source) param_type = input_source.get("type") if not param_type: raise ValueError(f"parameter '{param_name}' requires a 'type'") elif param_type not in parameter_types: raise ValueError(f"parameter '{param_name}' uses an unknown type '{param_type}'") else: return parameter_types[param_type](tool, input_source)
[docs] @staticmethod def parse_name(input_source): return input_source.parse_name()
[docs] class SimpleTextToolParameter(ToolParameter):
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) optional = input_source.get("optional", None) if optional is not None: optional = string_as_bool(optional) else: # Optionality not explicitly defined, default to False optional = False self.optional = optional if self.optional: self.value = None else: self.value = ""
[docs] def get_initial_value(self, trans, other_values): return self.value
[docs] class TextToolParameter(SimpleTextToolParameter): """ Parameter that can take on any text value. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None) >>> p = TextToolParameter(None, XML('<param name="_name" type="text" value="default" />')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('area', False), ('argument', None), ('datalist', []), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'TextToolParameter'), ('name', '_name'), ('optional', True), ('refresh_on_change', False), ('type', 'text'), ('value', 'default')] """
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.profile = tool.profile if tool else None self.datalist = [] for title, value, _ in input_source.parse_static_options(): self.datalist.append({"label": title, "value": value}) # why does Integer and Float subclass this :_( if self.type == "text": self.optional, self.optionality_inferred = text_input_is_optional(input_source) else: self.optionality_inferred = False self.value = input_source.get("value") self.area = input_source.get_bool("area", False)
[docs] def validate(self, value, trans=None): search = self.type == "text" if not ( trans and trans.workflow_building_mode is workflow_building_modes.ENABLED and contains_workflow_parameter(value, search=search) ): return super().validate(value, trans)
@property def wrapper_default(self) -> str | None: """Handle change in default handling pre and post 23.0 profiles.""" profile = self.profile legacy_behavior = profile is None or Version(str(profile)) < Version("23.0") default_value = None if self.optional and self.optionality_inferred and legacy_behavior: default_value = "" return default_value
[docs] def to_dict(self, trans, other_values=None): d = super().to_dict(trans) other_values = other_values or {} d["area"] = self.area d["datalist"] = self.datalist d["optional"] = self.optional return d
[docs] class IntegerToolParameter(TextToolParameter): """ Parameter that takes an integer value. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=True) >>> p = IntegerToolParameter(None, XML('<param name="_name" type="integer" value="10" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('area', False), ('argument', None), ('datalist', []), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('max', None), ('min', None), ('model_class', 'IntegerToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'integer'), ('value', u'10')] >>> assert type(p.from_json("10", trans)) == int >>> with assert_throws_param_value_error("Parameter '_name': an integer or workflow parameter is required"): ... p.from_json("_string", trans) """ dict_collection_visible_keys = ToolParameter.dict_collection_visible_keys + ["min", "max"]
[docs] def __init__(self, tool: Optional["Tool"], input_source): super().__init__(tool, input_source) if self.value: try: int(self.value) except ValueError: raise ParameterValueError("the attribute 'value' must be an integer", self.name) self.min = input_source.get("min") self.max = input_source.get("max") if self.min: try: self.min = int(self.min) except ValueError: raise ParameterValueError("attribute 'min' must be an integer", self.name, self.min) if self.max: try: self.max = int(self.max) except ValueError: raise ParameterValueError("attribute 'max' must be an integer", self.name, self.max) if self.min is not None or self.max is not None: self.validators.append(validation.InRangeValidator.simple_range_validator(self.min, self.max))
[docs] def from_json(self, value, trans, other_values=None): other_values = other_values or {} try: return int(value) except (TypeError, ValueError): if contains_workflow_parameter(value) and trans.workflow_building_mode is workflow_building_modes.ENABLED: return value if not value and self.optional: return "" if trans.workflow_building_mode is workflow_building_modes.ENABLED: raise ParameterValueError("an integer or workflow parameter is required", self.name, value) else: raise ParameterValueError( "the attribute 'value' must be set for non optional parameters", self.name, value )
[docs] def to_python(self, value, app): try: return int(value) except (TypeError, ValueError): if contains_workflow_parameter(value): return value if not value and self.optional: return None raise ParameterValueError("an integer is required", self.name, value)
[docs] def get_initial_value(self, trans, other_values): if self.value is not None and self.value != "": return int(self.value) else: return None
[docs] class FloatToolParameter(TextToolParameter): """ Parameter that takes a real number value. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=True) >>> p = FloatToolParameter(None, XML('<param name="_name" type="float" value="3.141592" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('area', False), ('argument', None), ('datalist', []), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('max', None), ('min', None), ('model_class', 'FloatToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'float'), ('value', u'3.141592')] >>> assert type(p.from_json("36.1", trans)) == float >>> with assert_throws_param_value_error("Parameter '_name': an integer or workflow parameter is required"): ... p.from_json("_string", trans) """ dict_collection_visible_keys = ToolParameter.dict_collection_visible_keys + ["min", "max"]
[docs] def __init__(self, tool: Optional["Tool"], input_source): super().__init__(tool, input_source) self.min = input_source.get("min") self.max = input_source.get("max") if self.value: try: float(self.value) except ValueError: raise ParameterValueError("the attribute 'value' must be a real number", self.name, self.value) if self.min: try: self.min = float(self.min) except ValueError: raise ParameterValueError("attribute 'min' must be a real number", self.name, self.min) if self.max: try: self.max = float(self.max) except ValueError: raise ParameterValueError("attribute 'max' must be a real number", self.name, self.max) if self.min is not None or self.max is not None: self.validators.append(validation.InRangeValidator.simple_range_validator(self.min, self.max))
[docs] def from_json(self, value, trans, other_values=None): other_values = other_values or {} try: return float(value) except (TypeError, ValueError): if contains_workflow_parameter(value) and trans.workflow_building_mode is workflow_building_modes.ENABLED: return value if not value and self.optional: return "" if trans.workflow_building_mode is workflow_building_modes.ENABLED: raise ParameterValueError("an integer or workflow parameter is required", self.name, value) else: raise ParameterValueError( "the attribute 'value' must be set for non optional parameters", self.name, value )
[docs] def to_python(self, value, app): try: return float(value) except (TypeError, ValueError): if contains_workflow_parameter(value): return value if not value and self.optional: return None raise ParameterValueError("a float is required", self.name, value)
[docs] def get_initial_value(self, trans, other_values): if self.value is None: return None try: return float(self.value) except Exception: return None
[docs] class BooleanToolParameter(ToolParameter): """ Parameter that takes one of two values. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch()) >>> p = BooleanToolParameter(None, XML('<param name="_name" type="boolean" checked="yes" truevalue="_truevalue" falsevalue="_falsevalue" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('falsevalue', '_falsevalue'), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'BooleanToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('truevalue', '_truevalue'), ('type', 'boolean'), ('value', True)] >>> print(p.from_json('true', trans)) True >>> print(p.to_param_dict_string(True)) _truevalue >>> print(p.from_json('false', trans)) False >>> print(p.to_param_dict_string(False)) _falsevalue >>> value = p.to_json('false', trans.app, use_security=False) >>> assert isinstance(value, bool) >>> assert value == False >>> value = p.to_json(True, trans.app, use_security=False) >>> assert isinstance(value, bool) >>> assert value == True """
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) try: profile = tool.profile if tool else None truevalue, falsevalue = boolean_true_and_false_values(input_source, profile) except ParameterParseException as ppe: raise ParameterValueError(ppe.message, self.name) self.truevalue = truevalue self.falsevalue = falsevalue self.optional = input_source.get_bool("optional", False) self.checked = boolean_is_checked(input_source)
[docs] def from_json(self, value, trans, other_values=None): return self.to_python(value)
[docs] def to_python(self, value, app=None): if not self.optional: ret_val = string_as_bool(value) else: ret_val = string_as_bool_or_none(value) return ret_val
[docs] def to_json(self, value, app, use_security): return self.to_python(value, app)
[docs] def get_initial_value(self, trans, other_values): return self.checked
[docs] def to_param_dict_string(self, value, other_values=None): if self.to_python(value): return self.truevalue else: return self.falsevalue
[docs] def to_dict(self, trans, other_values=None): d = super().to_dict(trans) d["truevalue"] = self.truevalue d["falsevalue"] = self.falsevalue d["optional"] = self.optional return d
@property def legal_values(self): return [self.truevalue, self.falsevalue]
[docs] class FileToolParameter(ToolParameter): """ Parameter that takes an uploaded file as a value. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch()) >>> p = FileToolParameter(None, XML('<param name="_name" type="file"/>')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'FileToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'file'), ('value', None)] """
[docs] def from_json(self, value, trans, other_values=None): # Middleware or proxies may encode files in special ways (TODO: this # should be pluggable) if isinstance(value, FilesPayload): # multi-part upload handled and persisted in service layer return value.model_dump() elif isinstance(value, MutableMapping): if "session_id" in value: # handle api upload session_id = value["session_id"] upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path if re.match(r"^[\w-]+$", session_id) is None: raise ValueError("Invalid session id format.") local_filename = os.path.abspath(os.path.join(upload_store, session_id)) else: # handle nginx upload upload_store = trans.app.config.nginx_upload_store assert ( upload_store ), "Request appears to have been processed by nginx_upload_module but Galaxy is not configured to recognize it." local_filename = os.path.abspath(value["path"]) assert local_filename.startswith( upload_store ), f"Filename provided by nginx ({local_filename}) is not in correct directory ({upload_store})." value = dict(filename=value["name"], local_filename=local_filename) return value
[docs] def get_required_enctype(self): """ File upload elements require the multipart/form-data encoding """ return "multipart/form-data"
[docs] def to_json(self, value, app, use_security): if value in [None, ""]: return None elif isinstance(value, str): return value elif isinstance(value, MutableMapping): # or should we jsonify? try: return value["local_filename"] except KeyError: return None elif isinstance(value, cgi_FieldStorage): return value.file.name raise Exception("FileToolParameter cannot be persisted")
[docs] def to_python(self, value, app): if value is None: return None elif isinstance(value, str): return value else: raise Exception("FileToolParameter cannot be persisted")
[docs] class FTPFileToolParameter(ToolParameter): """ Parameter that takes a file uploaded via FTP as a value. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(), user=None) >>> p = FTPFileToolParameter(None, XML('<param name="_name" type="ftpfile"/>')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'FTPFileToolParameter'), ('multiple', True), ('name', '_name'), ('optional', True), ('refresh_on_change', False), ('type', 'ftpfile'), ('value', None)] """
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.multiple = input_source.get_bool("multiple", True) self.optional = input_source.parse_optional(True) self.user_ftp_dir = ""
[docs] def get_initial_value(self, trans, other_values): if trans is not None: if trans.user is not None: self.user_ftp_dir = f"{trans.user_ftp_dir}/" return None
@property def visible(self): assert self.tool if self.tool.app.config.ftp_upload_dir is None or self.tool.app.config.ftp_upload_site is None: return False return True
[docs] def to_param_dict_string(self, value, other_values=None): if value == "": return "None" lst = [f"{self.user_ftp_dir}{dataset}" for dataset in value] if self.multiple: return lst else: return lst[0]
[docs] def from_json(self, value, trans, other_values=None): return self.to_python(value, trans.app, validate=True)
[docs] def to_json(self, value, app, use_security): return self.to_python(value, app)
[docs] def to_python(self, value, app, validate=False): if not isinstance(value, list): value = [value] lst: list[str] = [] for val in value: if val in [None, ""]: lst = [] break if isinstance(val, MutableMapping): lst.append(val["name"]) else: lst.append(val) if len(lst) == 0: if not self.optional and validate: raise ValueError("Please select a valid FTP file.") return None if validate: assert self.tool if self.tool.app.config.ftp_upload_dir is None: raise ValueError("The FTP directory is not configured.") return lst
[docs] def to_dict(self, trans, other_values=None): d = super().to_dict(trans) d["multiple"] = self.multiple return d
[docs] class HiddenToolParameter(ToolParameter): """ Parameter that takes one of two values. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch()) >>> p = HiddenToolParameter(None, XML('<param name="_name" type="hidden" value="_value"/>')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('help', ''), ('help_format', 'html'), ('hidden', True), ('is_dynamic', False), ('label', ''), ('model_class', 'HiddenToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'hidden'), ('value', u'_value')] """
[docs] def __init__(self, tool: Optional["Tool"], input_source): super().__init__(tool, input_source) self.value = input_source.get("value") self.hidden = True
[docs] def get_initial_value(self, trans, other_values): return self.value
[docs] def get_label(self): return None
[docs] class ColorToolParameter(ToolParameter): """ Parameter that stores a color. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch()) >>> p = ColorToolParameter(None, XML('<param name="_name" type="color" value="#ffffff"/>')) >>> print(p.name) _name >>> print(p.to_param_dict_string("#ffffff")) #ffffff >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ColorToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'color'), ('value', u'#ffffff')] >>> p = ColorToolParameter(None, XML('<param name="_name" type="color"/>')) >>> print(p.get_initial_value(trans, {})) #000000 >>> p = ColorToolParameter(None, XML('<param name="_name" type="color" value="#ffffff" rgb="True"/>')) >>> print(p.to_param_dict_string("#ffffff")) (255, 255, 255) >>> with assert_throws_param_value_error("Parameter '_name': Failed to convert 'None' to RGB."): ... p.to_param_dict_string(None) """
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.value = get_color_value(input_source) self.rgb = input_source.get_bool("rgb", False)
[docs] def get_initial_value(self, trans, other_values): if self.value is not None: return self.value.lower()
[docs] def to_param_dict_string(self, value, other_values=None): if self.rgb: try: return str(tuple(int(value.lstrip("#")[i : i + 2], 16) for i in (0, 2, 4))) except Exception: raise ParameterValueError(f"Failed to convert '{value}' to RGB.", self.name) return str(value)
[docs] class BaseURLToolParameter(HiddenToolParameter): """ Returns a parameter that contains its value prepended by the current server base url. Used in all redirects. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch()) >>> p = BaseURLToolParameter(None, XML('<param name="_name" type="base_url" value="_value"/>')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('help', ''), ('help_format', 'html'), ('hidden', True), ('is_dynamic', False), ('label', ''), ('model_class', 'BaseURLToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'base_url'), ('value', u'_value')] """
[docs] def __init__(self, tool: Optional["Tool"], input_source): super().__init__(tool, input_source) self.value = input_source.get("value", "")
[docs] def get_initial_value(self, trans, other_values): return self._get_value(trans)
[docs] def from_json(self, value, trans, other_values=None): return self._get_value(trans)
def _get_value(self, trans): try: if not self.value.startswith("/"): raise Exception("baseurl value must start with a /") return trans.url_builder(self.value, qualified=True) except Exception as e: log.debug('Url creation failed for "%s": %s', self.name, unicodify(e)) return self.value
[docs] def to_dict(self, trans, other_values=None): d = super().to_dict(trans) return d
[docs] def iter_to_string(iterable: typing.Iterable[typing.Any]) -> typing.Generator[str, None, None]: for item in iterable: yield str(item)
[docs] class SelectToolParameter(ToolParameter): """ Parameter that takes on one (or many) or a specific set of values. >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=False, security=lambda x: x) >>> p = SelectToolParameter(None, XML( ... ''' ... <param name="_name" type="select"> ... <option value="x">x_label</option> ... <option value="y" selected="true">y_label</option> ... <option value="z">z_label</option> ... </param> ... ''')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('display', None), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'SelectToolParameter'), ('multiple', False), ('name', '_name'), ('optional', False), ('options', [('x_label', 'x', False), ('y_label', 'y', True), ('z_label', 'z', False)]), ('refresh_on_change', False), ('textable', False), ('type', 'select'), ('value', 'y')] >>> p = SelectToolParameter(None, XML( ... ''' ... <param name="_name" type="select" multiple="true"> ... <option value="x">x_label</option> ... <option value="y" selected="true">y_label</option> ... <option value="z" selected="true">z_label</option> ... </param> ... ''')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('display', None), ('help', ''), ('help_format', 'html'), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'SelectToolParameter'), ('multiple', True), ('name', '_name'), ('optional', True), ('options', [('x_label', 'x', False), ('y_label', 'y', True), ('z_label', 'z', True)]), ('refresh_on_change', False), ('textable', False), ('type', 'select'), ('value', ['y', 'z'])] >>> print(p.to_param_dict_string(["y", "z"])) y,z """ value_label: str
[docs] def __init__(self, tool: Optional["Tool"], input_source, context=None): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.multiple = input_source.get_bool("multiple", False) # Multiple selects are optional by default, single selection is the inverse. self.optional = input_source.parse_optional(self.multiple) self.display = input_source.get("display", None) self.separator = input_source.get("separator", ",") self.legal_values = set() self.dynamic_options = input_source.get("dynamic_options", None) self.options = parse_dynamic_options(self, input_source) if self.options is not None: for validator in self.options.validators: self.validators.append(validator) if self.dynamic_options is None and self.options is None: self.static_options = input_source.parse_static_options() for _, value, _ in self.static_options: self.legal_values.add(value) self.is_dynamic = (self.dynamic_options is not None) or (self.options is not None)
def _get_dynamic_options_call_other_values(self, trans, other_values): call_other_values = ExpressionContext({"__trans__": trans}) if other_values: call_other_values.parent = other_values.parent call_other_values.update(other_values.dict) return call_other_values
[docs] def get_options(self, trans, other_values) -> Sequence[ParameterOption | DrillDownOptionsDict]: if self.options: return self.options.get_options(trans, other_values) elif self.dynamic_options: call_other_values = self._get_dynamic_options_call_other_values(trans, other_values) assert self.tool try: return [ ParameterOption(*o) for o in eval(self.dynamic_options, self.tool.code_namespace, call_other_values) ] except Exception as e: log.debug( "Error determining dynamic options for parameter '%s' in tool '%s':", self.name, self.tool.id, exc_info=e, ) return [] else: return [ParameterOption(*o) for o in self.static_options]
[docs] def from_json(self, value, trans, other_values=None): return self._select_from_json(value, trans, other_values=other_values, require_legal_value=True)
def _select_from_json(self, value, trans, other_values=None, require_legal_value=True): other_values = other_values or {} try: legal_values = self.get_legal_values(trans, other_values, value) except ImplicitConversionRequired: return value # if the given value is not found in the set of values of the legal # options we fall back to check if the value is in the set of names of # the legal options. this is done with the fallback_values dict which # allows to determine the corresponding legal values fallback_values = self.get_legal_names(trans, other_values) if (not legal_values or not require_legal_value) and is_runtime_context(trans, other_values): if self.multiple: # While it is generally allowed that a select value can be '', # we do not allow this to be the case in a dynamically # generated multiple select list being set in workflow building # mode we instead treat '' as 'No option Selected' (None) if value == "": value = None else: if isinstance(value, str): # Split on all whitespace. This not only provides flexibility # in interpreting values but also is needed because many browsers # use \r\n to separate lines. value = value.split() return value elif value is None: if self.optional: return None raise ParameterValueError( "an invalid option (None) was selected, please verify", self.name, None, is_dynamic=self.is_dynamic ) elif not legal_values: assert self.tool if self.optional and Version(str(self.tool.profile)) < Version("18.09"): # Covers optional parameters with default values that reference other optional parameters. # These will have a value but no legal_values. # See https://github.com/galaxyproject/tools-iuc/pull/1842#issuecomment-394083768 for context. return None raise ParameterValueError( "requires a value, but no legal values defined", self.name, is_dynamic=self.is_dynamic ) if isinstance(value, list): if not self.multiple: raise ParameterValueError( "multiple values provided but parameter is not expecting multiple values", self.name, is_dynamic=self.is_dynamic, ) if set(value).issubset(legal_values): return value elif set(value).issubset(set(fallback_values.keys())): return [fallback_values[v] for v in value] else: invalid_options = iter_to_string(set(value) - set(legal_values)) raise ParameterValueError( f"invalid options ({','.join(invalid_options)!r}) were selected (valid options: {','.join(iter_to_string(legal_values))})", self.name, is_dynamic=self.is_dynamic, ) else: value_is_none = value == "None" and "None" not in legal_values if value_is_none or not value: if self.multiple: if self.optional: return [] else: raise ParameterValueError( "no option was selected for non optional parameter", self.name, is_dynamic=self.is_dynamic ) if is_runtime_value(value): return None if value in legal_values: return value elif value in fallback_values: return fallback_values[value] elif not require_legal_value: return value else: raise ParameterValueError( f"an invalid option ({value!r}) was selected (valid options: {','.join(iter_to_string(legal_values))})", self.name, value, is_dynamic=self.is_dynamic, )
[docs] def to_param_dict_string(self, value, other_values=None): if value in (None, []): return "None" if isinstance(value, list): if not self.multiple: raise ParameterValueError( "multiple values provided but parameter is not expecting multiple values", self.name, is_dynamic=self.is_dynamic, ) value = list(map(str, value)) else: value = str(value) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: value = self.sanitizer.sanitize_param(value) else: value = sanitize_param(value) if isinstance(value, list): value = self.separator.join(value) return value
[docs] def to_json(self, value, app, use_security): if isinstance(value, HistoryDatasetAssociation): return history_item_to_json(value, app, use_security=use_security) return value
[docs] def to_python(self, value, app): if isinstance(value, MutableMapping) and "src" in value: return history_item_dict_to_python(value, app, self.name) return super().to_python(value, app)
[docs] def get_initial_value(self, trans, other_values): try: options = cast(list[ParameterOption], self.get_options(trans, other_values)) except ImplicitConversionRequired: return None if not options: return None value = [option.value for option in options if option.selected] if len(value) == 0: if not self.optional and not self.multiple and options: # Nothing selected, but not optional and not a multiple select, with some values, # so we have to default to something (the HTML form will anyway) value2: str | list[str] | None = options[0].value else: value2 = None elif len(value) == 1 or not self.multiple: value2 = value[0] else: value2 = value return value2
[docs] def to_text(self, value): if not isinstance(value, list): value = [value] # FIXME: Currently only translating values back to labels if they # are not dynamic if self.is_dynamic: rval = [str(_) for _ in value] else: options = list(self.static_options) rval = [] for t, v, _ in options: if v in value: rval.append(t) if rval: return "\n".join(rval) return "Nothing selected."
[docs] def get_dependencies(self): """ Get the *names* of the other params this param depends on. """ if self.options: return self.options.get_dependency_names() else: return []
[docs] def to_dict(self, trans, other_values=None): other_values = other_values or {} d = super().to_dict(trans, other_values) # Get options, value. options = cast(list[ParameterOption], self.get_options(trans, other_values)) d["options"] = serialize_options(trans.security, options) d["display"] = self.display d["multiple"] = self.multiple d["textable"] = is_runtime_context(trans, other_values) return d
[docs] def validate(self, value, trans=None): if not value: super().validate(value, trans) if self.multiple: if not isinstance(value, list): value = [value] else: value = [value] for v in value: super().validate(v, trans)
[docs] class GenomeBuildParameter(SelectToolParameter): """ Select list that sets the last used genome build for the current history as "selected". >>> # Create a mock transaction with 'hg17' as the current build >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None), security=lambda x:x) >>> p = GenomeBuildParameter(None, XML('<param name="_name" type="genomebuild" value="hg17" />')) >>> print(p.name) _name >>> d = p.to_dict(trans) >>> o = d['options'] >>> [i for i in o if i[2] == True] [('Human May 2004 (NCBI35/hg17) (hg17)', 'hg17', True)] >>> [i for i in o if i[1] == 'hg18'] [('Human Mar. 2006 (NCBI36/hg18) (hg18)', 'hg18', False)] >>> p.is_dynamic True """
[docs] def __init__(self, *args, **kwds): super().__init__(*args, **kwds) if self.tool: self.static_options = [(value, key, False) for key, value in self._get_dbkey_names()] self.is_dynamic = True
[docs] def get_options(self, trans, other_values) -> Sequence[ParameterOption]: last_used_build = object() if trans.history: last_used_build = trans.history.genome_build return [ ParameterOption(build_name, dbkey, (dbkey == last_used_build)) for dbkey, build_name in self._get_dbkey_names(trans=trans) ]
[docs] def to_dict(self, trans, other_values=None): # skip SelectToolParameter (the immediate parent) bc we need to get options in a different way here d = ToolParameter.to_dict(self, trans) # Get options, value - options is a generator here, so compile to list options = self.get_options(trans, {}) value = options[0].value for option in options: if option.selected: # Found selected option. value = option.value d.update( { "options": serialize_options(trans, options), "value": value, "display": self.display, "multiple": self.multiple, } ) return d
def _get_dbkey_names(self, trans=None): if not self.tool: # Hack for unit tests, since we have no tool return read_dbnames(None) return self.tool.app.genome_builds.get_genome_build_names(trans=trans)
[docs] class SelectTagParameter(SelectToolParameter): """ Select set that is composed of a set of tags available for an input. """
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.tag_key = input_source.get("group", False) self.optional = input_source.get("optional", False) self.multiple = input_source.get("multiple", False) self.accept_default = input_source.get_bool("accept_default", False) if self.accept_default: self.optional = True self.data_ref = input_source.get("data_ref", None) self.ref_input = None # Legacy style default value specification... self.default_value = input_source.get("default_value", None) if self.default_value is None: # Newer style... more in line with other parameters. self.default_value = input_source.get("value", None) self.is_dynamic = True
[docs] def from_json(self, value, trans, other_values=None): other_values = other_values or {} if self.multiple: tag_list = [] # split on newline and , if isinstance(value, list) or isinstance(value, str): tag_list = multiple_select_value_split(value) if len(tag_list) == 0: value = None else: value = tag_list else: if not value: value = None # We skip requiring legal values -- this is similar to optional, but allows only subset of datasets to be positive # TODO: May not actually be required for (nested) collection input ? return super()._select_from_json(value, trans, other_values, require_legal_value=False)
[docs] def get_tag_list(self, other_values): """ Generate a select list containing the tags of the associated dataset (if found). """ # Get the value of the associated data reference (a dataset) history_items = other_values.get(self.data_ref, None) # Check if a dataset is selected if is_runtime_value(history_items): return [] if not history_items: return [] tags = set() for history_item in util.listify(history_items): if hasattr(history_item, "dataset_instances"): for dataset in history_item.dataset_instances: for tag in dataset.tags: if tag.user_tname == "group": tags.add(tag.user_value) else: for tag in history_item.tags: if tag.user_tname == "group": tags.add(tag.user_value) return list(tags)
[docs] def get_options(self, trans, other_values) -> Sequence[ParameterOption]: """ Show tags """ options = [] for tag in self.get_tag_list(other_values): options.append(ParameterOption(f"Tags: {tag}", tag, False)) return options
[docs] def get_initial_value(self, trans, other_values): if self.default_value is not None: return self.default_value return super().get_initial_value(trans, other_values)
[docs] def get_dependencies(self): return [self.data_ref]
[docs] def to_dict(self, trans, other_values=None): other_values = other_values or {} d = super().to_dict(trans, other_values=other_values) d["data_ref"] = self.data_ref return d
[docs] class ColumnListParameter(SelectToolParameter): """ Select list that consists of either the total number of columns or only those columns that contain numerical values in the associated DataToolParameter. # TODO: we need better testing here, but not sure how to associate a DatatoolParameter with a ColumnListParameter >>> # Mock up a history (not connected to database) >>> from galaxy.model import History, HistoryDatasetAssociation >>> from galaxy.util.bunch import Bunch >>> from galaxy.util import XML >>> from galaxy.model.mapping import init >>> sa_session = init("/tmp", "sqlite:///:memory:", create_tables=True).session >>> hist = History() >>> with sa_session.begin(): ... sa_session.add(hist) >>> hda = hist.add_dataset(HistoryDatasetAssociation(id=1, extension='interval', create_dataset=True, sa_session=sa_session)) >>> dtp = DataToolParameter(None, XML('<param name="blah" type="data" format="interval"/>')) >>> print(dtp.name) blah >>> clp = ColumnListParameter(None, XML('<param name="numerical_column" type="data_column" data_ref="blah" numerical="true"/>')) >>> print(clp.name) numerical_column """
[docs] def __init__(self, tool: Optional["Tool"], input_source): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.numerical = input_source.get_bool("numerical", False) self.optional = input_source.parse_optional(False) self.accept_default = input_source.get_bool("accept_default", False) if self.accept_default: self.optional = True self.data_ref = input_source.get("data_ref", None) profile = tool.profile if tool else None assert ( profile is None or Version(str(profile)) < Version("24.2") or self.data_ref is not None ), f"data_column parameter {self.name} requires a valid data_ref attribute" self.ref_input = None # Legacy style default value specification... self.default_value = input_source.get("default_value", None) if self.default_value is None: # Newer style... more in line with other parameters. self.default_value = input_source.get("value", None) if self.default_value is not None: self.default_value = ColumnListParameter._strip_c(self.default_value) self.is_dynamic = True self.usecolnames = input_source.get_bool("use_header_names", False)
[docs] def to_json(self, value, app, use_security): if isinstance(value, str): return value.strip() return value
[docs] def from_json(self, value, trans, other_values=None): """ Label convention prepends column number with a 'c', but tool uses the integer. This removes the 'c' when entered into a workflow. """ other_values = other_values or {} if self.multiple: # split on newline and , if isinstance(value, list) or isinstance(value, str): column_list = multiple_select_value_split(value) if len(column_list) == 0: value = None else: value = list(map(ColumnListParameter._strip_c, column_list)) else: value = None else: if value: value = ColumnListParameter._strip_c(value) else: value = None if not value and self.accept_default: value = self.default_value or "1" return [value] if self.multiple else value return super().from_json(value, trans, other_values)
@staticmethod def _strip_c(column): column = str(column).strip() if column.startswith("c") and len(column) > 1 and all(c.isdigit() for c in column[1:]): column = column.lower()[1:] return column
[docs] def get_column_list(self, trans, other_values): """ Generate a select list containing the columns of the associated dataset (if found). """ # Get the value of the associated data reference (one or more datasets) datasets = other_values.get(self.data_ref) # Check if a dataset is selected if not datasets: return [] column_list = None for dataset in util.listify(datasets): # Use representative dataset if a dataset collection is parsed if isinstance(dataset, HistoryDatasetCollectionAssociation): dataset = dataset.to_hda_representative() if isinstance(dataset, DatasetCollectionElement): dataset = dataset.first_dataset_instance() if isinstance(dataset, HistoryDatasetAssociation) and self.ref_input and self.ref_input.formats: direct_match, target_ext, converted_dataset = dataset.find_conversion_destination( self.ref_input.formats ) if not direct_match and target_ext: if not converted_dataset: raise ImplicitConversionRequired else: dataset = converted_dataset # Columns can only be identified if the dataset is ready and metadata is available if not hasattr(dataset, "metadata") or not dataset.metadata.get_if_set("columns"): return [] # Build up possible columns for this dataset this_column_list = [] if self.numerical: # If numerical was requested, filter columns based on metadata for i, col in enumerate(dataset.metadata.get_if_set("column_types", [])): if col == "int" or col == "float": this_column_list.append(str(i + 1)) else: this_column_list = [str(i) for i in range(1, dataset.metadata.columns + 1)] # Take the intersection of these columns with the other columns. if column_list is None: column_list = this_column_list else: column_list = [c for c in column_list if c in this_column_list] return column_list
[docs] def get_options(self, trans, other_values) -> Sequence[ParameterOption]: """ Show column labels rather than c1..cn if use_header_names=True """ options: Sequence[ParameterOption] = [] try: column_list = self.get_column_list(trans, other_values) except ImplicitConversionRequired: return options if not column_list: return options # if available use column_names metadata for option names # otherwise read first row - assume is a header with tab separated names if self.usecolnames: dataset = other_values.get(self.data_ref, None) if isinstance(dataset, HistoryDatasetCollectionAssociation): dataset = dataset.to_hda_representative() if isinstance(dataset, DatasetCollectionElement): dataset = dataset.first_dataset_instance() column_names = getattr(dataset, "metadata", None) and dataset.metadata.get_if_set("column_names") if column_names: try: options = [ParameterOption(f"c{c}: {column_names[int(c) - 1]}", c, False) for c in column_list] except IndexError: # ignore and rely on fallback pass else: try: with open(dataset.get_file_name()) as f: head = f.readline() cnames = head.rstrip("\n\r ").split("\t") options = [ParameterOption(f"c{c}: {cnames[int(c) - 1]}", c, False) for c in column_list] except Exception: # ignore and rely on fallback pass if not options: # fallback if no options list could be built so far options = [ParameterOption(f"Column: {col}", col, False) for col in column_list] return options
[docs] def get_initial_value(self, trans, other_values): if self.default_value is not None: return self.default_value return super().get_initial_value(trans, other_values)
[docs] def is_file_empty(self, trans, other_values): for dataset in util.listify(other_values.get(self.data_ref)): # Use representative dataset if a dataset collection is parsed if isinstance(dataset, HistoryDatasetCollectionAssociation): if dataset.populated: dataset = dataset.to_hda_representative() else: # That's fine, we'll check again on execution return True if isinstance(dataset, DatasetCollectionElement): dataset = dataset.first_dataset_instance() if isinstance(dataset, DatasetInstance): return not dataset.has_data() if is_runtime_value(dataset): return True else: msg = f"Dataset '{dataset}' for data_ref attribute '{self.data_ref}' of parameter '{self.name}' is not a DatasetInstance" log.debug(msg, exc_info=True) raise ParameterValueError(msg, self.name) return False
[docs] def get_dependencies(self): return [self.data_ref]
[docs] def to_dict(self, trans, other_values=None): other_values = other_values or {} d = super().to_dict(trans, other_values=other_values) d["data_ref"] = self.data_ref d["numerical"] = self.numerical return d
[docs] class DrillDownSelectToolParameter(SelectToolParameter): """ Parameter that takes on one (or many) of a specific set of values. Creating a hierarchical select menu, which allows users to 'drill down' a tree-like set of options. >>> from galaxy.util import XML >>> from galaxy.util.bunch import Bunch >>> app = Bunch(config=Bunch(tool_data_path=None)) >>> tool = Bunch(app=app) >>> trans = Bunch(app=app, history=Bunch(genome_build='hg17'), db_builds=read_dbnames(None), security=lambda x: x) >>> p = DrillDownSelectToolParameter(tool, XML( ... ''' ... <param name="_name" type="drill_down" display="checkbox" hierarchy="recurse" multiple="true"> ... <options> ... <option name="Heading 1" value="heading1"> ... <option name="Option 1" value="option1"/> ... <option name="Option 2" value="option2"/> ... <option name="Heading 2" value="heading2"> ... <option name="Option 3" value="option3"/> ... <option name="Option 4" value="option4"/> ... </option> ... </option> ... <option name="Option 5" value="option5"/> ... </options> ... </param> ... ''')) >>> print(p.name) _name >>> d = p.to_dict(trans) >>> assert d['multiple'] == True >>> assert d['display'] == 'checkbox' >>> assert d['options'][0]['name'] == 'Heading 1' >>> assert d['options'][0]['value'] == 'heading1' >>> assert d['options'][0]['options'][0]['name'] == 'Option 1' >>> assert d['options'][0]['options'][0]['value'] == 'option1' >>> assert d['options'][0]['options'][1]['name'] == 'Option 2' >>> assert d['options'][0]['options'][1]['value'] == 'option2' >>> assert d['options'][0]['options'][2]['name'] == 'Heading 2' >>> assert d['options'][0]['options'][2]['value'] == 'heading2' >>> assert d['options'][0]['options'][2]['options'][0]['name'] == 'Option 3' >>> assert d['options'][0]['options'][2]['options'][0]['value'] == 'option3' >>> assert d['options'][0]['options'][2]['options'][1]['name'] == 'Option 4' >>> assert d['options'][0]['options'][2]['options'][1]['value'] == 'option4' >>> assert d['options'][1]['name'] == 'Option 5' >>> assert d['options'][1]['value'] == 'option5' """
[docs] def __init__(self, tool: Optional["Tool"], input_source, context=None): input_source = ensure_input_source(input_source) ToolParameter.__init__(self, tool, input_source) self.multiple = input_source.get_bool("multiple", False) self.display = input_source.get("display", None) self.hierarchy = input_source.get("hierarchy", "exact") # exact or recurse self.separator = input_source.get("separator", ",") tool_data_path = self.tool.app.config.tool_data_path if self.tool else None drill_down_dynamic_options = input_source.parse_drill_down_dynamic_options(tool_data_path) if drill_down_dynamic_options is not None: self.is_dynamic = True self.dynamic_options = drill_down_dynamic_options.from_code_block() self.options = [] else: self.is_dynamic = False self.dynamic_options = None self.options = input_source.parse_drill_down_static_options(tool_data_path)
def _get_options_from_code(self, trans=None, other_values=None): assert self.dynamic_options, Exception("dynamic_options was not specifed") call_other_values = ExpressionContext({"__trans__": trans, "__value__": None}) if other_values: call_other_values.parent = other_values.parent call_other_values.update(other_values.dict) try: assert self.tool return eval(self.dynamic_options, self.tool.code_namespace, call_other_values) except Exception: return []
[docs] def get_options(self, trans=None, other_values=None) -> list[DrillDownOptionsDict]: other_values = other_values or {} if self.is_dynamic: if self.dynamic_options: return self._get_options_from_code(trans=trans, other_values=other_values) return [] return self.options
[docs] def from_json(self, value, trans, other_values=None): other_values = other_values or {} legal_values = self.get_legal_values(trans, other_values, value) if not legal_values and trans.workflow_building_mode: if self.multiple: if value == "": # No option selected value = None else: value = value.split("\n") return value elif value is None: if self.optional: return None raise ParameterValueError(f"an invalid option ({value!r}) was selected", self.name, value) elif not legal_values: raise ParameterValueError("requires a value, but no legal values defined", self.name) if not isinstance(value, list): value = [value] if len(value) > 1 and not self.multiple: raise ParameterValueError( "multiple values provided but parameter is not expecting multiple values", self.name ) rval = [] for val in value: if val not in legal_values: raise ParameterValueError( f"an invalid option ({val!r}) was selected (valid options: {','.join(legal_values)})", self.name, val, ) rval.append(val) return rval
[docs] def to_param_dict_string(self, value, other_values=None): other_values = other_values or {} def get_options_list(value): def get_base_option(value, options: list[DrillDownOptionsDict]): for option in options: if value == option["value"]: return option rval = get_base_option(value, option["options"] or []) if rval: return rval return None # not found def recurse_option(option_list, option: DrillDownOptionsDict): if not option["options"]: option_list.append(option["value"]) else: for opt in option["options"]: recurse_option(option_list, opt) rval: list[str] = [] options = self.get_options(other_values=other_values) if base_option := get_base_option(value, options): recurse_option(rval, base_option) return rval or [value] if value is None: return "None" rval = [] if self.hierarchy == "exact": rval = value else: for val in value: options = get_options_list(val) rval.extend(options) rval = list(dict.fromkeys(rval)) if len(rval) > 1 and not self.multiple: raise ParameterValueError( "multiple values provided but parameter is not expecting multiple values", self.name ) rval = self.separator.join(rval) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: rval = self.sanitizer.sanitize_param(rval) else: rval = sanitize_param(rval) return rval
[docs] def get_initial_value(self, trans, other_values): def recurse_options(initial_values, options: list[DrillDownOptionsDict]): for option in options: if option["selected"]: initial_values.append(option["value"]) recurse_options(initial_values, option["options"] or []) # More working around dynamic options for workflow options = self.get_options(trans=trans, other_values=other_values) if not options: return None initial_values: list[str] = [] recurse_options(initial_values, options) if len(initial_values) == 0: return None return initial_values
[docs] def to_text(self, value): def get_option_display(value, options: list[DrillDownOptionsDict]): for option in options: if value == option["value"]: return option["name"] rval = get_option_display(value, option["options"] or []) if rval: return rval return None # not found if not value: value = [] elif not isinstance(value, list): value = [value] # FIXME: Currently only translating values back to labels if they # are not dynamic if self.is_dynamic: if value: if isinstance(value, list): rval = value else: rval = [value] else: rval = [] else: rval = [] for val in value: rval.append(get_option_display(val, self.options or val)) if rval: return "\n".join(map(str, rval)) return "Nothing selected."
[docs] def get_dependencies(self): return []
[docs] def to_dict(self, trans, other_values=None): other_values = other_values or {} # skip SelectToolParameter (the immediate parent) bc we need to get options in a different way here d = ToolParameter.to_dict(self, trans) d["options"] = self.get_options(trans=trans, other_values=other_values) d["display"] = self.display d["multiple"] = self.multiple return d
def _carried_state_label(value) -> str: """Reason a rerun input cannot be naturally selected in this history. Stable string surfaced to the client as the entry name prefix. ``"not in current history"`` is the default for anything not deleted or hidden, including the edge case of an in-history HDA whose matcher failed (e.g. format mismatch) — clients only render the prefix, so the slight imprecision there is harmless. """ if value.deleted: return "deleted" if not value.visible: return "hidden" return "not in current history" def _is_connected_value(value) -> bool: """True iff ``value`` is a workflow ``ConnectedValue`` (an input fed by an upstream step's output rather than chosen at runtime).""" return is_runtime_value(value) and isinstance(runtime_to_object(value), ConnectedValue) def _paginated_visible_datasets( trans: "ProvidesHistoryContext", history: "History", *, extensions: set[str] | None, valid_states: tuple[str, ...] | None, search: str | None = None, offset: int = 0, limit: int = 50, ) -> tuple[list[HistoryDatasetAssociation], int]: """``history.paginated_active_visible_datasets`` memoized on the request. Building one form with many ``data`` parameters (most notably the workflow Run form, which renders every step in a single request) otherwise re-issues the same paginated SQL against the same, unchanging history once per parameter -- O(parameters) round-trips, slow even on an empty history (issue #22927). Results are memoized by signature on the request context's short-term cache (see ``ProvidesUserContext.get_or_set_cache_value``); the cache is shared across every step's proxy work context for the request and never outlives it, so the history cannot change underneath it. """ key = ( "data_param_hda_page", history.id, frozenset(extensions) if extensions is not None else None, tuple(valid_states) if valid_states is not None else None, search or None, offset, limit, ) return trans.get_or_set_cache_value( key, lambda: history.paginated_active_visible_datasets( extensions=extensions, valid_states=valid_states, search=search, offset=offset, limit=limit ), ) def _paginated_dataset_collections( trans: "ProvidesHistoryContext", history: "History", *, visible_only: bool, search: str | None = None, offset: int = 0, limit: int = 50, ) -> tuple[list[HistoryDatasetCollectionAssociation], int]: """``history.paginated_active_dataset_collections`` memoized on the request context's short-term cache (see :func:`_paginated_visible_datasets`).""" key = ("data_param_hdca_page", history.id, bool(visible_only), search or None, offset, limit) return trans.get_or_set_cache_value( key, lambda: history.paginated_active_dataset_collections( visible_only=visible_only, search=search, offset=offset, limit=limit ), )
[docs] class BaseDataToolParameter(ToolParameter): multiple: bool # Sentinel distinguishing "cache not yet populated" from "cache populated # with None" (which is a legitimate return for parameters with no formats). _ACCEPTABLE_EXTENSIONS_UNSET: Any = object()
[docs] def __init__(self, tool: Optional["Tool"], input_source, trans): super().__init__(tool, input_source) self.min = input_source.get("min") self.max = input_source.get("max") if self.min: try: self.min = int(self.min) except ValueError: raise ParameterValueError("attribute 'min' must be an integer", self.name) if self.max: try: self.max = int(self.max) except ValueError: raise ParameterValueError("attribute 'max' must be an integer", self.name) self.refresh_on_change = True # Find datatypes_registry if self.tool is None: if trans: # Must account for "Input Dataset" types, which while not a tool still need access to the real registry. # A handle to the transaction (and thus app) will be given by the module. self.datatypes_registry = trans.app.datatypes_registry else: # This occurs for things such as unit tests import galaxy.datatypes.registry self.datatypes_registry = galaxy.datatypes.registry.Registry() self.datatypes_registry.load_datatypes() else: self.datatypes_registry = ( self.tool.app.datatypes_registry ) # can be None if self.tool.app is a ValidationContext
def _parse_formats(self, trans, input_source): """ Build list of classes for supported data formats """ self.extensions = input_source.parse_extensions() formats = [] if self.datatypes_registry: # This may be None when self.tool.app is a ValidationContext for extension in self.extensions: datatype = self.datatypes_registry.get_datatype_by_extension(extension) if datatype is not None: formats.append(datatype) else: log.warning( f"Datatype class not found for extension '{extension}', which is used in the 'format' attribute of parameter '{self.name}'" ) self.formats = formats def _parse_options(self, input_source): # TODO: Enhance dynamic options for DataToolParameters. Currently, # only the special case key='build' of type='data_meta' is # a valid filter self.options_filter_attribute = None self.options = parse_dynamic_options(self, input_source) if self.options: # TODO: Abstract away XML handling here. options_elem = input_source.elem().find("options") self.options_filter_attribute = options_elem.get("options_filter_attribute", None) self.is_dynamic = self.options is not None def _acceptable_extensions(self) -> set[str] | None: """Return a set of HDA extensions that match this parameter's formats directly or via implicit conversion. ``None`` means no extension filter (the parameter accepts all formats).""" cached = getattr(self, "_acceptable_extensions_cache", self._ACCEPTABLE_EXTENSIONS_UNSET) if cached is not self._ACCEPTABLE_EXTENSIONS_UNSET: return cached formats = getattr(self, "formats", None) if not formats: self._acceptable_extensions_cache: set[str] | None = None return None accepted: set[str] = set(getattr(self, "extensions", [])) if (registry := self.datatypes_registry) is not None: try: all_exts = list(registry.datatypes_by_extension.keys()) except AttributeError: all_exts = [] for ext in all_exts: if ext in accepted: continue direct, converted, _ = registry.find_conversion_destination_for_dataset_by_extensions(ext, formats) if direct or converted: accepted.add(ext) self._acceptable_extensions_cache = accepted return accepted def _uses_python_options_filter(self) -> bool: """True iff matching depends on per-row Python state that cannot be pushed to SQL (dynamic options with ``options_filter_attribute``, or a ``data_destination`` tool that requires public-role checks).""" if self.options is not None: return True if self.tool is not None and getattr(self.tool, "tool_type", None) == "data_destination": return True return False
[docs] def get_initial_value(self, trans, other_values): if trans.workflow_building_mode is workflow_building_modes.ENABLED or trans.app.name == "tool_shed": return RuntimeValue() if self.optional: return None if _is_connected_value((other_values or {}).get(self.name)): # Connected inputs are supplied by an upstream step; there is no # default to pick from the history, so skip the scan (issue #22927). return None if (history := trans.history) is not None: dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) if isinstance(self, DataToolParameter): # Walk the history newest-first in chunks via paginated SQL until # we find a matching HDA. Avoids loading the whole history. chunk_size = MAX_OPTIONS_PAGE_SIZE db_offset = 0 while True: rows, total = _paginated_visible_datasets( trans, history, extensions=self._acceptable_extensions(), valid_states=dataset_matcher_factory.valid_input_states, offset=db_offset, limit=chunk_size, ) if not rows: return None for hda in rows: match = dataset_matcher.hda_match(hda) if match: return match.hda db_offset += len(rows) if db_offset >= total: return None else: dataset_collection_matcher = dataset_matcher_factory.dataset_collection_matcher(dataset_matcher) chunk_size = MAX_OPTIONS_PAGE_SIZE db_offset = 0 while True: collection_rows, total = _paginated_dataset_collections( trans, history, visible_only=True, offset=db_offset, limit=chunk_size, ) if not collection_rows: return None for hdca in collection_rows: if dataset_collection_matcher.hdca_match(hdca): return hdca db_offset += len(collection_rows) if db_offset >= total: return None
[docs] def to_json(self, value, app, use_security): if value not in [None, "", "None"]: if isinstance(value, list) and len(value) > 0: values = [history_item_to_json(v, app, use_security) for v in value] else: values = [history_item_to_json(value, app, use_security)] return {"values": values} return None
[docs] def to_python(self, value, app): if isinstance(value, MutableMapping) and "values" in value: if hasattr(self, "multiple") and self.multiple is True: history_items = [history_item_dict_to_python(v, app, self.name) for v in value["values"]] if len(history_items) == 1 and isinstance(history_items[0], HistoryDatasetCollectionAssociation): return history_items[0] return history_items elif len(value["values"]) > 0: return history_item_dict_to_python(value["values"][0], app, self.name) # Handle legacy string values potentially stored in databases none_values = [None, "", "None"] if value in none_values: return None if isinstance(value, str) and value.find(",") > -1: return [ app.model.context.get(HistoryDatasetAssociation, int(v)) for v in value.split(",") if v not in none_values ] elif str(value).startswith("__collection_reduce__|"): decoded_id = str(value)[len("__collection_reduce__|") :] if not decoded_id.isdigit(): decoded_id = app.security.decode_id(decoded_id) return app.model.context.get(HistoryDatasetCollectionAssociation, int(decoded_id)) elif str(value).startswith("dce:"): return app.model.context.get(DatasetCollectionElement, int(value[len("dce:") :])) elif str(value).startswith("hdca:"): return app.model.context.get(HistoryDatasetCollectionAssociation, int(value[len("hdca:") :])) else: return app.model.context.get(HistoryDatasetAssociation, int(value))
[docs] def validate(self, value, trans=None): def do_validate(v): for validator in self.validators: if ( validator.requires_dataset_metadata and v and hasattr(v, "dataset") and v.dataset.state != Dataset.states.OK ): return else: try: validator.validate(v, trans) except ValueError as e: raise ParameterValueError(str(e), self.name, v) from None dataset_count = 0 if value: if self.multiple: if not isinstance(value, list): value = [value] else: value = [value] for v in value: if isinstance(v, HistoryDatasetCollectionAssociation): for dataset_instance in v.collection.dataset_instances: dataset_count += 1 do_validate(dataset_instance) elif isinstance(v, DatasetCollectionElement): if v.hda: dataset_count += 1 do_validate(v.hda) else: assert v.child_collection for dataset_instance in v.child_collection.dataset_instances: dataset_count += 1 do_validate(dataset_instance) else: dataset_count += 1 do_validate(v) if self.min is not None: if self.min > dataset_count: raise ParameterValueError(f"at least {self.min} datasets are required", self.name) if self.max is not None: if self.max < dataset_count: raise ParameterValueError(f"at most {self.max} datasets are required", self.name)
ItemFromSrcAny = ( DatasetCollectionElement | HistoryDatasetAssociation | HistoryDatasetCollectionAssociation | LibraryDatasetDatasetAssociation | CollectionAdapter ) ItemFromSrcCollection = DatasetCollectionElement | HistoryDatasetCollectionAssociation | CollectionAdapter def _decode_dataset_id(value, security: "IdEncodingHelper", parameter_name: str) -> int: """Coerce a value into an integer dataset PK or raise ParameterValueError. Accepts int, digit string, or 16-char encoded id. Anything else (including ``src:...``-prefixed strings, which should arrive as ``{src, id}`` dicts via :func:`src_id_to_item`) is rejected so malformed input surfaces as a 4xx instead of a SQL crash. """ if isinstance(value, int): return value s = str(value) if s.isdigit(): return int(s) if len(s) == 16: log.warning("Encoded ID where unencoded ID expected.") return int(security.decode_id(s)) raise ParameterValueError(f"invalid dataset id {value!r}", parameter_name)
[docs] def src_id_to_item( sa_session: "Session", value: typing.MutableMapping[str, Any], security: "IdEncodingHelper" ) -> ItemFromSrcAny: adapter_model = None if value["src"] == "CollectionAdapter": adapter_model = validate_collection_adapter_src_dict(value) adapting = adapter_model.adapting if isinstance(adapting, list): elements = [] for item in adapting: element = TransientCollectionAdapterDatasetInstanceElement( item.name, cast(HistoryDatasetAssociation, src_id_to_item(sa_session, item.model_dump(), security)), ) elements.append(element) return recover_adapter(elements, adapter_model) else: value = adapting.model_dump() src_to_class = { "hda": HistoryDatasetAssociation, "ldda": LibraryDatasetDatasetAssociation, "dce": DatasetCollectionElement, "hdca": HistoryDatasetCollectionAssociation, } id_value = value["id"] decoded_id = id_value if isinstance(id_value, int) else security.decode_id(id_value) try: item = sa_session.get(src_to_class[value["src"]], decoded_id) except KeyError: raise ValueError(f"Unknown input source {value['src']} passed to job submission API.") if not item: raise ValueError("Invalid input id passed to job submission API.") if adapter_model is not None: item = recover_adapter(item, adapter_model) item.extra_params = {k: v for k, v in value.items() if k not in ("src", "id")} return item
[docs] def src_id_to_item_collection( sa_session: "Session", value: typing.MutableMapping[str, Any], security: "IdEncodingHelper" ) -> ItemFromSrcCollection: rval = src_id_to_item(sa_session, value, security) if isinstance(rval, (LibraryDatasetDatasetAssociation, HistoryDatasetAssociation)): raise ValueError("Expected to find collection, but got single dataset wrapper") return rval
[docs] class DataToolParameter(BaseDataToolParameter): # TODO, Nate: Make sure the following unit tests appropriately test the dataset security # components. Add as many additional tests as necessary. """ Parameter that takes on one (or many) or a specific set of values. TODO: There should be an alternate display that allows single selects to be displayed as radio buttons and multiple selects as a set of checkboxes TODO: The following must be fixed to test correctly for the new security_check tag in the DataToolParameter (the last test below is broken) Nate's next pass at the dataset security stuff will dramatically alter this anyway. """
[docs] def __init__(self, tool: Optional["Tool"], input_source, trans=None): input_source = ensure_input_source(input_source) super().__init__(tool, input_source, trans) self.load_contents = int(input_source.get("load_contents", 0)) # Add metadata validator if not input_source.get_bool("no_validation", False): self.validators.append(validation.MetadataValidator.default_metadata_validator()) self._parse_formats(trans, input_source) tag = input_source.get("tag") self.multiple = input_source.get_bool("multiple", False) if not self.multiple and (self.min is not None): raise ParameterValueError( "cannot specify 'min' property on single data parameter. Set multiple=\"true\" to enable this option", self.name, ) if not self.multiple and (self.max is not None): raise ParameterValueError( "cannot specify 'max' property on single data parameter. Set multiple=\"true\" to enable this option", self.name, ) self.tag = tag self.is_dynamic = True self._parse_options(input_source) self._parse_allow_uri_if_protocol(input_source) # Load conversions required for the dataset input self.conversions = [] self.default_object = input_source.parse_default() if self.optional and self.default_object is not None: raise ParameterValueError( "Cannot specify a Galaxy tool data parameter to be both optional and have a default value.", self.name ) for name, conv_extension in input_source.parse_conversion_tuples(): assert None not in [ name, conv_extension, ], f"A name ({name}) and type ({conv_extension}) are required for explicit conversion" if self.datatypes_registry: conv_type = self.datatypes_registry.get_datatype_by_extension(conv_extension.lower()) if conv_type is None: raise ParameterValueError( f"datatype class not found for extension '{conv_type}', which is used as 'type' attribute in conversion of data parameter", self.name, ) self.conversions.append((name, conv_extension, [conv_type]))
def _parse_allow_uri_if_protocol(self, input_source): # In case of deferred datasets, if the source URI is prefixed with one of the values in this list, # the dataset will behave as an URI and will not be materialized into a file path. allow_uri_if_protocol = input_source.get("allow_uri_if_protocol", None) self.allow_uri_if_protocol = allow_uri_if_protocol.split(",") if allow_uri_if_protocol else []
[docs] def from_json(self, value, trans, other_values=None): session = trans.sa_session other_values = other_values or {} if trans.workflow_building_mode is workflow_building_modes.ENABLED or is_runtime_value(value): return None if not value and not self.optional and not self.default_object: raise ParameterValueError("specify a dataset of the required format / build for parameter", self.name) if value in [None, "None", ""]: if self.default_object: return raw_to_galaxy(trans.app, trans.history, self.default_object) return None batch_wrapper = False if isinstance(value, MutableMapping) and "values" in value: batch_wrapper = bool(value.get("batch")) value = self.to_python(value, trans.app) if isinstance(value, str) and value.find(",") > 0: value = [int(value_part) for value_part in value.split(",")] rval: list[ DatasetCollectionElement | HistoryDatasetAssociation | HistoryDatasetCollectionAssociation | LibraryDatasetDatasetAssociation | CollectionAdapter ] = [] if isinstance(value, list): found_srcs = set() for single_value in value: if isinstance(single_value, MutableMapping) and "src" in single_value and "id" in single_value: found_srcs.add(single_value["src"]) rval.append( src_id_to_item(sa_session=trans.sa_session, value=single_value, security=trans.security) ) elif isinstance( single_value, ( HistoryDatasetCollectionAssociation, DatasetCollectionElement, HistoryDatasetAssociation, LibraryDatasetDatasetAssociation, ), ): rval.append(single_value) elif single_value in (None, "None", ""): # An unset optional data input (e.g. connected to a multiple # data parameter) arrives as a null list element. Tolerate it # as "no dataset" the way to_python() filters it, rather than # treating null as a malformed id (regression from #22617). continue else: pk = _decode_dataset_id(single_value, trans.security, self.name) rval.append(trans.sa_session.get(HistoryDatasetAssociation, pk)) if len(found_srcs) > 1 and "hdca" in found_srcs: raise ParameterValueError( "if collections are supplied to multiple data input parameter, only collections may be used", self.name, ) elif isinstance(value, (HistoryDatasetAssociation, LibraryDatasetDatasetAssociation)): rval.append(value) elif isinstance(value, MutableMapping) and "src" in value and "id" in value: rval.append(src_id_to_item(sa_session=trans.sa_session, value=value, security=trans.security)) elif str(value).startswith("__collection_reduce__|"): encoded_ids = [v[len("__collection_reduce__|") :] for v in str(value).split(",")] decoded_ids = map(trans.security.decode_id, encoded_ids) rval = [] for decoded_id in decoded_ids: hdca = session.get(HistoryDatasetCollectionAssociation, decoded_id) rval.append(hdca) elif isinstance(value, HistoryDatasetCollectionAssociation) or isinstance(value, DatasetCollectionElement): rval.append(value) else: pk = _decode_dataset_id(value, trans.security, self.name) rval.append(session.get(HistoryDatasetAssociation, pk)) dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) for v in rval: value_to_check: ( DatasetInstance | DatasetCollection | DatasetCollectionElement | HistoryDatasetCollectionAssociation | CollectionAdapter ) = v if isinstance(v, DatasetCollectionElement): if hda := v.hda: value_to_check = hda elif ldda := v.ldda: value_to_check = ldda elif collection := v.child_collection: value_to_check = collection elif v.collection and not v.collection.populated_optimized: raise ParameterValueError("the selected collection has not been populated.", self.name) else: raise ParameterValueError("Collection element in unexpected state", self.name) if isinstance(value_to_check, DatasetInstance): if value_to_check.deleted: raise ParameterValueError( "the previously selected dataset has been deleted.", self.name, value_to_check ) elif value_to_check.dataset and value_to_check.dataset.state in [ Dataset.states.ERROR, Dataset.states.DISCARDED, ]: raise ParameterValueError( "the previously selected dataset has entered an unusable state", self.name ) match = dataset_matcher.hda_match(value_to_check) if match and match.implicit_conversion: value_to_check.implicit_conversion = True # type: ignore[attr-defined] elif isinstance(value_to_check, HistoryDatasetCollectionAssociation): if value_to_check.deleted: raise ParameterValueError( "the previously selected dataset collection has been deleted.", self.name, value_to_check, ) value_to_check = value_to_check.collection if isinstance(value_to_check, DatasetCollection): if value_to_check.elements_deleted: raise ParameterValueError( "the previously selected dataset collection has elements that are deleted.", self.name ) if not self.multiple: if len(rval) > 1: raise ParameterValueError("more than one dataset supplied to single input dataset parameter", self.name) if len(rval) > 0: single_value = rval[0] if isinstance(single_value, HistoryDatasetCollectionAssociation): if batch_wrapper: # A batch wrapper ({"batch": true, "values": [...]}) # only reaches ``from_json`` during tool form building # (``/api/tools/{id}/build``); at execution time # ``expand_meta_parameters`` has already replaced the # wrapper with per-element values. Returning the HDCA # lets the form re-render and preserves the user's # map-over selection. return single_value # A non-multiple data parameter cannot reduce a dataset # collection. Without this check the HDCA is silently # accepted here, then ``collect_input_dataset_collections`` # rewrites the state to a list of the collection's HDAs and # ``wrap_values`` later crashes with a raw ``TypeError`` # (https://github.com/galaxyproject/galaxy/issues/22401). # Map-over is still supported via ``{"batch": true, ...}``; # at execution time that wrapper is expanded to per-element # jobs before reaching ``from_json``. raise ParameterValueError( "dataset collection supplied to single input dataset parameter; " "to run the tool over each element of the collection, use the map-over option", self.name, ) return single_value else: raise ParameterValueError("invalid dataset supplied to single input dataset parameter", self.name) return rval
[docs] def to_param_dict_string(self, value, other_values=None): if value is None: return "None" return value.get_file_name()
[docs] def to_text(self, value): if value and not isinstance(value, list): value = [value] if value: try: return ", ".join(f"{item.hid}: {item.name}" for item in value) except Exception: pass return "No dataset."
[docs] def get_dependencies(self): """ Get the *names* of the other params this param depends on. """ if self.options: return self.options.get_dependency_names() else: return []
[docs] def converter_safe(self, other_values, trans): if ( self.tool is None or self.tool.has_multiple_pages or not hasattr(trans, "workflow_building_mode") or trans.workflow_building_mode ): return False if other_values is None: return True # we don't know other values, so we can't check, assume ok converter_safe = [True] def visitor(prefix, input, value, parent=None): if isinstance(input, SelectToolParameter) and self.name in input.get_dependencies(): if input.is_dynamic and ( input.dynamic_options or (not input.dynamic_options and not input.options) or not input.options.converter_safe ): converter_safe[0] = ( False # This option does not allow for conversion, i.e. uses contents of dataset file to generate options ) self.tool.visit_inputs(other_values, visitor) return False not in converter_safe
[docs] def get_options_filter_attribute(self, value): # HACK to get around current hardcoded limitation of when a set of dynamic options is defined for a DataToolParameter # it always causes available datasets to be filtered by dbkey # this behavior needs to be entirely reworked (in a backwards compatible manner) options_filter_attribute = self.options_filter_attribute if options_filter_attribute is None: return value.get_dbkey() if options_filter_attribute.endswith("()"): call_attribute = True options_filter_attribute = options_filter_attribute[:-2] else: call_attribute = False ref = value for attribute in options_filter_attribute.split("."): ref = getattr(ref, attribute) if call_attribute: ref = ref() return str(ref)
[docs] def to_dict(self, trans, other_values=None, pagination: ParameterPaginationT | None = None): other_values = other_values or {} d = super().to_dict(trans) self._fill_to_dict_static(d) builder = DataOptionsBuilder(trans.security, pagination) builder.write_into(d) history = trans.history if history is None or trans.workflow_building_mode is workflow_building_modes.ENABLED: return d if _is_connected_value(other_values.get(self.name)): # Input is wired to an upstream step: the run form renders it as # "connected" (no dropdown) and never uses these options, so skip the # per-parameter history scan entirely (issue #22927). return d dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) # When rerunning a job, other_values carries the original job's input # values (HDAs, HDCAs, DCEs, LDDAs). Track them as we walk pages so # the survivors land in `pinned` (live HDAs outside the page) or # carry-over `options` entries (deleted/hidden/foreign-history items, # DCEs, LDDAs) — clients rely on this to pre-select the rerun input # regardless of pagination state. job_input_values = util.listify(other_values.get(self.name)) job_input_values = self._page_hda_matches( trans, builder, history, dataset_matcher, dataset_matcher_factory, job_input_values ) unresolved = self._pin_live_hda_inputs(builder, history, dataset_matcher, job_input_values) self._carry_unresolved_inputs(builder, history, unresolved) self._page_hdca_matches( builder, trans, history, dataset_matcher_factory.dataset_collection_matcher(dataset_matcher), ) builder.sort_by_hid() return d
def _fill_to_dict_static(self, d: dict) -> None: """Populate the non-history-dependent fields of the ``to_dict`` response (extensions, EDAM mapping, multiplicity bounds, tag).""" extensions = self.extensions all_edam_formats = ( self.datatypes_registry.edam_formats if hasattr(self.datatypes_registry, "edam_formats") else {} ) all_edam_data = self.datatypes_registry.edam_data if hasattr(self.datatypes_registry, "edam_formats") else {} d["extensions"] = extensions d["edam"] = { "edam_formats": [all_edam_formats.get(ext, None) for ext in extensions], "edam_data": [all_edam_data.get(ext, None) for ext in extensions], } d["multiple"] = self.multiple if self.multiple: # For consistency, should these just always be in the dict? d["min"] = self.min d["max"] = self.max d["tag"] = self.tag def _page_hda_matches( self, trans, builder: DataOptionsBuilder, history, dataset_matcher, dataset_matcher_factory, job_input_values: list, ) -> list: """Emit one page of matching HDAs into ``builder.options['hda']``. Returns ``job_input_values`` with the matched HDAs removed so the caller does not double-add them via the pinned/carried paths. """ _hda_offset, _hda_limit, hda_search = builder.page("hda") acceptable_extensions = self._acceptable_extensions() valid_states = dataset_matcher_factory.valid_input_states def hda_query(*, offset, limit): return _paginated_visible_datasets( trans, history, extensions=acceptable_extensions, valid_states=valid_states, search=hda_search, offset=offset, limit=limit, ) # Pure-SQL path skips chunked iteration when the predicate is fully # captured by the DB filter; chunked path covers tools that need # per-row Python filtering (options_filter_attribute, data_destination). chunked = self._uses_python_options_filter() page_matches, _total, _has_more = builder.paginate( "hda", query=hda_query, filter=dataset_matcher.hda_match, chunked=chunked ) # Dedup implicit conversions by HID; also consume matched HDAs from # ``job_input_values`` so they aren't double-added via pinned. # ``HdaDirectMatch`` has no ``original_hda``; ``getattr`` falls back # to the matched HDA itself for direct matches. matches_by_hid: dict[int, list] = {} for match in page_matches: m = match.hda original = getattr(match, "original_hda", m) job_input_values = [h for h in job_input_values if h != m and h != original] matches_by_hid.setdefault(m.hid, []).append(match) for matches in matches_by_hid.values(): match = matches[0] if len(matches) > 1: # Multiple matches for the same hid → prefer the original HDA and skip implicit conversions. match = next((m for m in matches if len(m.hda.implicitly_converted_parent_datasets) == 0), match) m_name = ( f"{match.original_hda.name} (as {match.target_ext})" if match.implicit_conversion else match.hda.name ) builder.options["hda"].append(make_hda_entry(builder.security, match.hda, m_name)) return job_input_values def _pin_live_hda_inputs( self, builder: DataOptionsBuilder, history, dataset_matcher, job_input_values: list ) -> list: """Pin selected HDAs that still live in this history but landed outside the current page window. Returns the inputs that did not pin (deleted/hidden/foreign-history HDAs, plus all non-HDA values) for the carry-forward step. """ unresolved: list = [] for value in job_input_values: if ( isinstance(value, HistoryDatasetAssociation) and value.history_id == history.id and not value.deleted and value.visible ): match = dataset_matcher.hda_match(value) if match: name = ( f"{match.original_hda.name} (as {match.target_ext})" if match.implicit_conversion else match.hda.name ) builder.pinned["hda"].append(make_hda_entry(builder.security, match.hda, name, keep=True)) continue unresolved.append(value) return unresolved def _carry_unresolved_inputs(self, builder: DataOptionsBuilder, history, unresolved: list) -> None: """Anything left over from the rerun input list has no live HDA match (deleted/hidden/wrong-history, or HDCA/DCE/LDDA from the original job). Carry each forward as an ``options.X`` entry with ``keep=True`` and a state-prefixed name so the client can re-select the original input even though it no longer matches the current history view. """ for value in unresolved: if isinstance(value, HistoryDatasetCollectionAssociation): if value.deleted or not value.visible or value.history != history: state = _carried_state_label(value) builder.options["hdca"].append( make_hdca_entry(builder.security, value, f"({state}) {value.name}", keep=True) ) elif isinstance(value, HistoryDatasetAssociation): state = _carried_state_label(value) builder.options["hda"].append( make_hda_entry(builder.security, value, f"({state}) {value.name}", keep=True) ) elif isinstance(value, DatasetCollectionElement): builder.options["dce"].append(make_dce_entry(builder.security, value)) elif isinstance(value, LibraryDatasetDatasetAssociation): builder.options["ldda"].append(make_ldda_entry(builder.security, value)) def _page_hdca_matches( self, builder: DataOptionsBuilder, trans, history, dataset_collection_matcher, ) -> None: """Emit one page of matching HDCAs into ``builder.options['hdca']``. Collection matching needs per-row Python inspection (subcollection mapping, implicit conversion), so this always uses chunked pagination. """ multiple = self.multiple _offset, _limit, hdca_search = builder.page("hdca") def hdca_query(*, offset, limit): return _paginated_dataset_collections( trans, history, visible_only=True, search=hdca_search, offset=offset, limit=limit ) def hdca_filter(hdca): match = dataset_collection_matcher.hdca_match(hdca) if not match: return None subcollection_type = None if multiple and hdca.collection.collection_type != "list": collection_type_description = self._history_query(trans).can_map_over(hdca) if collection_type_description: subcollection_type = collection_type_description.collection_type else: return None return (hdca, match, subcollection_type) matches, _total, _has_more = builder.paginate("hdca", query=hdca_query, filter=hdca_filter) for hdca, match, subcollection_type in matches: name = hdca.name if match.implicit_conversion: name = f"{name} (with implicit datatype conversion)" builder.options["hdca"].append( make_hdca_entry(builder.security, hdca, name, keep=False, subcollection_type=subcollection_type) ) def _history_query(self, trans): assert self.multiple dataset_collection_type_descriptions = trans.app.dataset_collection_manager.collection_type_descriptions # If multiple data parameter, treat like a list parameter. return query.HistoryQuery.from_collection_type("list", dataset_collection_type_descriptions)
[docs] class DataCollectionToolParameter(BaseDataToolParameter): """ """
[docs] def __init__(self, tool: Optional["Tool"], input_source, trans=None): input_source = ensure_input_source(input_source) super().__init__(tool, input_source, trans) self._parse_formats(trans, input_source) collection_types = input_source.get("collection_type", None) tag = input_source.get("tag") if collection_types: collection_types = [t.strip() for t in collection_types.split(",")] else: collection_types = None self._collection_types = collection_types self.tag = tag self.multiple = False # Accessed on DataToolParameter a lot, may want in future self.is_dynamic = True self._fields = input_source.get("fields", None) self._column_definitions = input_source.get("column_definitions", None) self._parse_options(input_source) # TODO: Review and test. self.default_object = input_source.parse_default() if self.optional and self.default_object is not None: raise ParameterValueError( "Cannot specify a Galaxy tool data parameter to be both optional and have a default value.", self.name )
@property def collection_types(self) -> list[str] | None: return self._collection_types def _history_query(self, trans): dataset_collection_type_descriptions = trans.app.dataset_collection_manager.collection_type_descriptions return query.HistoryQuery.from_parameter(self, dataset_collection_type_descriptions)
[docs] def match_collections(self, trans, history, dataset_collection_matcher): dataset_collections = trans.app.dataset_collection_manager.history_dataset_collections( history, self._history_query(trans) ) for dataset_collection_instance in dataset_collections: match = dataset_collection_matcher.hdca_match(dataset_collection_instance) if not match: continue # Filter sample sheet collections by column_definitions compatibility if self._column_definitions: collection_cols = dataset_collection_instance.collection.column_definitions if not column_definitions_compatible(collection_cols, self._column_definitions): continue yield dataset_collection_instance, match.implicit_conversion
[docs] def match_multirun_collections(self, trans, history, dataset_collection_matcher): for history_dataset_collection in history.active_visible_dataset_collections: if not self._history_query(trans).can_map_over(history_dataset_collection): continue match = dataset_collection_matcher.hdca_match(history_dataset_collection) if match: yield history_dataset_collection, match.implicit_conversion
[docs] def from_json(self, value, trans, other_values=None): session = trans.sa_session other_values = other_values or {} rval: ItemFromSrcCollection | None = None if trans.workflow_building_mode is workflow_building_modes.ENABLED: return None if not value and not self.optional and not self.default_object: raise ParameterValueError("specify a dataset collection of the correct type", self.name) if value in [None, "None"]: if self.default_object: return raw_to_galaxy(trans.app, trans.history, self.default_object) return None if isinstance(value, MutableMapping) and "values" in value: value = self.to_python(value, trans.app) if isinstance(value, str) and value.find(",") > 0: value = [int(value_part) for value_part in value.split(",")] elif isinstance(value, HistoryDatasetCollectionAssociation): rval = value elif isinstance(value, DatasetCollectionElement): # When mapping over nested collection - this parameter will receive # a DatasetCollectionElement instead of a # HistoryDatasetCollectionAssociation. rval = value elif isinstance(value, CollectionAdapter): rval = value elif ( isinstance(value, MutableMapping) and "src" in value and ("id" in value or value["src"] == "CollectionAdapter") ): rval = src_id_to_item_collection(sa_session=trans.sa_session, value=value, security=trans.security) elif isinstance(value, list): if len(value) > 0: value = value[0] if isinstance(value, MutableMapping) and "src" in value and "id" in value: if value["src"] == "hdca": rval = session.get(HistoryDatasetCollectionAssociation, trans.security.decode_id(value["id"])) elif value["src"] == "dce": rval = session.get(DatasetCollectionElement, trans.security.decode_id(value["id"])) elif isinstance(value, str): if value.startswith("dce:"): rval = session.get(DatasetCollectionElement, int(value[len("dce:") :])) elif value.startswith("hdca:"): rval = session.get(HistoryDatasetCollectionAssociation, int(value[len("hdca:") :])) else: rval = session.get(HistoryDatasetCollectionAssociation, int(value)) if rval: if isinstance(rval, HistoryDatasetCollectionAssociation): if rval.deleted: raise ParameterValueError( "the previously selected dataset collection has been deleted", self.name, rval ) if rval.collection.elements_deleted: raise ParameterValueError( "the previously selected dataset collection has elements that are deleted.", self.name ) if isinstance(rval, DatasetCollectionElement): if (child_collection := rval.child_collection) and child_collection.elements_deleted: raise ParameterValueError( "the previously selected dataset collection has elements that are deleted.", self.name ) return rval
[docs] def to_text(self, value): try: if isinstance(value, HistoryDatasetCollectionAssociation): display_text = f"{value.hid}: {value.name}" else: display_text = f"Element {value.identifier_index}:{value.identifier_name}" except AttributeError: display_text = "No dataset collection." return display_text
[docs] def to_dict(self, trans, other_values=None, pagination: ParameterPaginationT | None = None): other_values = other_values or {} d = super().to_dict(trans) d["collection_types"] = self.collection_types d["fields"] = self._fields d["column_definitions"] = self._column_definitions d["extensions"] = self.extensions d["multiple"] = self.multiple d["tag"] = self.tag builder = DataOptionsBuilder(trans.security, pagination, sources=("hda", "hdca", "dce")) builder.write_into(d) history = trans.history if history is None or trans.workflow_building_mode is workflow_building_modes.ENABLED: return d if _is_connected_value(other_values.get(self.name)): # Connected collection input: fed by an upstream step, rendered as # "connected" with no dropdown, so skip the history scan (issue #22927). return d dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) dataset_collection_matcher = dataset_matcher_factory.dataset_collection_matcher(dataset_matcher) # Pin a selected DCE rerun input (collection elements never paginate # through the listing path; they're always carried). Bypasses # ``make_dce_entry`` deliberately: that factory sets ``keep=True`` for # the carry-forward callers, but the legacy pinned-DCE shape has no # ``keep`` field and adding one is a client-visible JSON diff. if isinstance(other_values.get(self.name), DatasetCollectionElement): dce = other_values[self.name] builder.pinned["dce"].append( { "id": trans.security.encode_id(dce.id), "hid": -1, "name": dce.element_identifier, "src": "dce", "tags": [], } ) self._page_hdca_matches(builder, trans, history, dataset_collection_matcher) builder.sort_by_hid("hdca") return d
def _page_hdca_matches( self, builder: DataOptionsBuilder, trans, history, dataset_collection_matcher, ) -> None: """Emit one page of matching HDCAs into ``builder.options['hdca']``. Walks all active HDCAs (incl. hidden) in HID-desc order; the classifier in :meth:`_classify_hdca` demotes hidden HDCAs to direct-only so they cannot appear as multirun (map-over) entries. """ _offset, _limit, hdca_search = builder.page("hdca") history_query = self._history_query(trans) def hdca_query(*, offset, limit): return _paginated_dataset_collections( trans, history, visible_only=False, search=hdca_search, offset=offset, limit=limit ) def hdca_filter(hdca): return self._classify_hdca(hdca, dataset_collection_matcher, history_query) matches, _total, _has_more = builder.paginate("hdca", query=hdca_query, filter=hdca_filter) for kind, hdca, implicit_conversion, subcollection_type in matches: name = f"{hdca.name} (with implicit datatype conversion)" if implicit_conversion else hdca.name builder.options["hdca"].append( make_hdca_entry( builder.security, hdca, name, subcollection_type=subcollection_type if kind == "multirun" else None, include_column_definitions=True, ) ) def _classify_hdca(self, hdca, dataset_collection_matcher, history_query): """Per-HDCA classifier returning 0, 1, or 2 match entries. Both ``direct_match`` and ``can_map_over`` can fire for the same HDCA when the parameter accepts multiple collection types (e.g., ``list,list:list`` with a ``list:list`` HDCA matches ``list:list`` directly AND can be mapped over to feed ``list``); both entries are emitted in that case. Direct entries come first so the stable HID-desc sort places them above the multirun entry. Hidden HDCAs emit only the direct-match entry — they are excluded from multirun. """ match = dataset_collection_matcher.hdca_match(hdca) if not match: return None entries: list = [] if history_query.direct_match(hdca): column_definitions_ok = True if self._column_definitions: collection_cols = hdca.collection.column_definitions column_definitions_ok = column_definitions_compatible(collection_cols, self._column_definitions) if column_definitions_ok: entries.append(("direct", hdca, match.implicit_conversion, None)) if hdca.visible: can_map = history_query.can_map_over(hdca) if can_map: subcollection_type = can_map.collection_type collection_type = hdca.collection.collection_type if subcollection_type == "paired_or_unpaired" and not collection_type.endswith("paired_or_unpaired"): if collection_type.endswith("paired"): subcollection_type = "paired" else: subcollection_type = "single_datasets" entries.append(("multirun", hdca, match.implicit_conversion, subcollection_type)) return entries or None
[docs] class HiddenDataToolParameter(HiddenToolParameter, DataToolParameter): """ Hidden parameter that behaves as a DataToolParameter. As with all hidden parameters, this is a HACK. """
[docs] def __init__(self, tool: Optional["Tool"], elem): DataToolParameter.__init__(self, tool, elem) self.value = "None" self.type = "hidden_data" self.hidden = True # hidden_data params are broken without optional="true" - the job runner's # parameter validation rejects them when no dataset is provided. The only # known tool using hidden_data (cufflinks) sets optional="true". if not self.optional: raise ParameterValueError( 'hidden_data parameters must declare optional="true" to function correctly', self.name, )
[docs] class BaseJsonToolParameter(ToolParameter): """ Class of parameter that tries to keep values as close to JSON as possible. In particular value_to_basic is overloaded to prevent params_to_strings from double encoding JSON and to_python using loads to produce values. """
[docs] def value_to_basic(self, value, app, use_security=False): if is_runtime_value(value): return runtime_to_json(value) return value
[docs] def to_json(self, value, app, use_security): """Convert a value to a string representation suitable for persisting""" return json.dumps(value)
[docs] def to_python(self, value, app): """Convert a value created with to_json back to an object representation""" return json.loads(value)
[docs] class DirectoryUriToolParameter(SimpleTextToolParameter): """galaxy.files URIs for directories."""
[docs] def validate(self, value, trans=None): super().validate(value, trans=trans) if not value: return # value is not set yet, do not validate # Skip file source validation in workflow building mode to allow workflows # referencing removed file sources to be exported/viewed. Users can then # download and edit them. Validation still occurs during tool execution. if trans.workflow_building_mode: return file_source_path = trans.app.file_sources.get_file_source_path(value) file_source = file_source_path.file_source if file_source is None: raise ParameterValueError(f"'{value}' is not a valid file source uri.", self.name) user_context = ProvidesFileSourcesUserContext(trans) user_has_access = file_source.user_has_access(user_context) if not user_has_access: raise ParameterValueError(f"The user cannot access {value}.", self.name)
[docs] def to_param_dict_string(self, value, other_values=None) -> str: """Called via __str__ when used in the Cheetah template""" if value is None: value = "" elif not isinstance(value, str): value = str(value) return value
[docs] class RulesListToolParameter(BaseJsonToolParameter): """ Parameter that allows for the creation of a list of rules using the Galaxy rules DSL. """
[docs] def __init__(self, tool: Optional["Tool"], input_source, context=None): input_source = ensure_input_source(input_source) super().__init__(tool, input_source) self.data_ref = input_source.get("data_ref", None)
[docs] def to_dict(self, trans, other_values=None): other_values = other_values or {} d = ToolParameter.to_dict(self, trans) if target := other_values.get(self.data_ref): if not is_runtime_value(target): d["target"] = { "src": "hdca" if hasattr(target, "collection") else "hda", "id": trans.app.security.encode_id(target.id), } return d
[docs] def validate(self, value, trans=None): super().validate(value, trans=trans) if not isinstance(value, MutableMapping): raise ValueError("No rules specified for rules parameter.") if "rules" not in value: raise ValueError("No rules specified for rules parameter") mappings = value.get("mapping", None) if not mappings: raise ValueError("No column definitions defined for rules parameter.")
[docs] def to_text(self, value): if value: rule_set = RuleSet(value) return rule_set.display else: return ""
# Code from CWL branch to massage in order to be shared across tools and workflows, # and for CWL artifacts as well as Galaxy ones.
[docs] def raw_to_galaxy( app: "MinimalApp", history: "History", as_dict_value: dict[str, Any], commit: bool = True ) -> "HistoryItem": object_class = as_dict_value["class"] if object_class == "File": # TODO: relative_to = "/" location = as_dict_value.get("location") name = ( as_dict_value.get("identifier") or as_dict_value.get("basename") or os.path.basename(urllib.parse.urlparse(location).path) ) extension = as_dict_value.get("format") or "data" dataset = Dataset() source = DatasetSource() source.source_uri = location # TODO: validate this... source.requested_transform = as_dict_value.get("transform") dataset.sources.append(source) for hash_name in HASH_NAMES: # TODO: Convert md5 -> MD5 during tool parsing. if hash_name in as_dict_value: hash_object = DatasetHash() hash_object.hash_function = hash_name hash_object.hash_value = as_dict_value[hash_name] dataset.hashes.append(hash_object) if "created_from_basename" in as_dict_value: dataset.created_from_basename = as_dict_value["created_from_basename"] dataset.state = Dataset.states.DEFERRED primary_data = HistoryDatasetAssociation( name=name, extension=extension, metadata_deferred=True, designation=None, visible=True, dbkey="?", dataset=dataset, flush=False, sa_session=app.model.session, ) primary_data.state = Dataset.states.DEFERRED permissions = app.security_agent.history_get_default_permissions(history) assert primary_data.dataset is not None app.security_agent.set_all_dataset_permissions(primary_data.dataset, permissions, new=True, flush=False) app.model.session.add(primary_data) history.stage_addition(primary_data) history.add_pending_items() if commit: app.model.session.commit() return primary_data else: name = as_dict_value.get("name") collection_type = as_dict_value.get("collection_type") collection = DatasetCollection( collection_type=collection_type, ) hdca = HistoryDatasetCollectionAssociation( name=name, collection=collection, ) app.model.session.add(hdca) def write_elements_to_collection(has_elements, collection_builder): element_dicts = has_elements.get("elements") for element_dict in element_dicts: element_class = element_dict["class"] identifier = element_dict["identifier"] if element_class == "File": # Don't commit for inner elements hda = raw_to_galaxy(app, history, element_dict, commit=False) collection_builder.add_dataset(identifier, hda) else: subcollection_builder = collection_builder.get_level(identifier) write_elements_to_collection(element_dict, subcollection_builder) collection_builder = builder.BoundCollectionBuilder(collection) write_elements_to_collection(as_dict_value, collection_builder) collection_builder.populate() history.stage_addition(hdca) history.add_pending_items() if commit: app.model.session.commit() return hdca
parameter_types: dict[str, type[ToolParameter]] = dict( text=TextToolParameter, integer=IntegerToolParameter, float=FloatToolParameter, boolean=BooleanToolParameter, genomebuild=GenomeBuildParameter, select=SelectToolParameter, color=ColorToolParameter, group_tag=SelectTagParameter, data_column=ColumnListParameter, hidden=HiddenToolParameter, hidden_data=HiddenDataToolParameter, baseurl=BaseURLToolParameter, file=FileToolParameter, ftpfile=FTPFileToolParameter, data=DataToolParameter, data_collection=DataCollectionToolParameter, rules=RulesListToolParameter, directory_uri=DirectoryUriToolParameter, drill_down=DrillDownSelectToolParameter, )
[docs] def history_item_dict_to_python(value, app, name): if not (isinstance(value, MutableMapping) and "src" in value): raise ParameterValueError(f"Invalid value {value}", name) if value["src"] not in ("hda", "dce", "ldda", "hdca", "CollectionAdapter"): raise ParameterValueError(f"Invalid value {value}", name) return src_id_to_item(sa_session=app.model.context, security=app.security, value=value)
[docs] def history_item_to_json(value, app, use_security): src = None # unwrap adapter collection_adapter: CollectionAdapter | None = None if isinstance(value, CollectionAdapter): collection_adapter = value return collection_adapter.to_adapter_model().model_dump() if isinstance(value, MutableMapping) and "src" in value and "id" in value: return value elif isinstance(value, DatasetCollectionElement): src = "dce" elif isinstance(value, HistoryDatasetCollectionAssociation): src = "hdca" elif isinstance(value, LibraryDatasetDatasetAssociation): src = "ldda" elif isinstance(value, HistoryDatasetAssociation) or hasattr(value, "id"): # hasattr 'id' fires a query on persistent objects after a flush so better # to do the isinstance check. Not sure we need the hasattr check anymore - it'd be # nice to drop it. src = "hda" if src is not None: object_id = cached_id(value) rval = {"id": app.security.encode_id(object_id) if use_security else object_id, "src": src} return rval