Source code for galaxy.tools.parameters.basic

"""
Basic tool parameters.
"""
from __future__ import print_function

import json
import logging
import os
import os.path
import re
from xml.etree.ElementTree import XML

from six import string_types
from webob.compat import cgi_FieldStorage

import galaxy.model
from galaxy import util
from galaxy.tool_util.parser import get_input_source as ensure_input_source
from galaxy.util import (
    sanitize_param,
    string_as_bool,
    unicodify
)
from galaxy.util.bunch import Bunch
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.expressions import ExpressionContext
from galaxy.util.rules_dsl import RuleSet
from galaxy.web import url_for
from . import (
    dynamic_options,
    history_query,
    validation
)
from .dataset_matcher import (
    get_dataset_matcher_factory,
)
from .sanitize import ToolParameterSanitizer

log = logging.getLogger(__name__)

workflow_building_modes = Bunch(DISABLED=False, ENABLED=True, USE_HISTORY=1)

WORKFLOW_PARAMETER_REGULAR_EXPRESSION = re.compile(r'\$\{.+?\}')


[docs]class ImplicitConversionRequired(Exception): pass
[docs]def contains_workflow_parameter(value, search=False): if not isinstance(value, string_types): return False if search and WORKFLOW_PARAMETER_REGULAR_EXPRESSION.search(value): return True if not search and WORKFLOW_PARAMETER_REGULAR_EXPRESSION.match(value): return True return False
[docs]def is_runtime_value(value): return isinstance(value, RuntimeValue) or (isinstance(value, dict) and value.get("__class__") in ["RuntimeValue", "ConnectedValue"])
[docs]def is_runtime_context(trans, other_values): if trans.workflow_building_mode: return True for context_value in other_values.values(): if is_runtime_value(context_value): return True for v in util.listify(context_value): if isinstance(v, trans.app.model.HistoryDatasetAssociation) and \ ((hasattr(v, 'state') and v.state != galaxy.model.Dataset.states.OK) or hasattr(v, 'implicit_conversion')): return True return False
[docs]def parse_dynamic_options(param, input_source): options_elem = input_source.parse_dynamic_options_elem() if options_elem is not None: return dynamic_options.DynamicOptions(options_elem, param) return None
[docs]class ToolParameter(Dictifiable): """ Describes a parameter accepted by a tool. This is just a simple stub at the moment but in the future should encapsulate more complex parameters (lists of valid choices, validation logic, ...) >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None) >>> p = ToolParameter(None, XML('<param argument="--parameter-name" type="text" value="default" />')) >>> assert p.name == 'parameter_name' >>> assert sorted(p.to_dict(trans).items()) == [('argument', '--parameter-name'), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ToolParameter'), ('name', 'parameter_name'), ('optional', False), ('refresh_on_change', False), ('type', 'text'), ('value', None)] """ dict_collection_visible_keys = ['name', 'argument', 'type', 'label', 'help', 'refresh_on_change']
[docs] def __init__(self, tool, input_source, context=None): input_source = ensure_input_source(input_source) self.tool = tool self.argument = input_source.get("argument") self.name = self.__class__.parse_name(input_source) self.type = input_source.get("type") self.hidden = input_source.get_bool("hidden", False) self.load_contents = int(input_source.get("load_contents", 0)) self.refresh_on_change = input_source.get_bool("refresh_on_change", False) self.optional = input_source.parse_optional() self.is_dynamic = False self.label = input_source.parse_label() self.help = input_source.parse_help() sanitizer_elem = input_source.parse_sanitizer_elem() if sanitizer_elem is not None: self.sanitizer = ToolParameterSanitizer.from_element(sanitizer_elem) else: self.sanitizer = None self.validators = [] for elem in input_source.parse_validator_elems(): self.validators.append(validation.Validator.from_element(self, elem))
@property def visible(self): """Return true if the parameter should be rendered on the form""" return True
[docs] def get_label(self): """Return user friendly name for the parameter""" return self.label if self.label else self.name
[docs] def from_json(self, value, trans=None, other_values={}): """ Convert a value from an HTML POST into the parameters preferred value format. """ return value
[docs] def get_initial_value(self, trans, other_values): """ Return the starting value of the parameter """ return None
[docs] def get_required_enctype(self): """ If this parameter needs the form to have a specific encoding return it, otherwise return None (indicating compatibility with any encoding) """ return None
[docs] def get_dependencies(self): """ Return the names of any other parameters this parameter depends on """ return []
[docs] def to_json(self, value, app, use_security): """Convert a value to a string representation suitable for persisting""" return unicodify(value)
[docs] def to_python(self, value, app): """Convert a value created with to_json back to an object representation""" return value
[docs] def value_to_basic(self, value, app, use_security=False): if is_runtime_value(value): return runtime_to_json(value) return self.to_json(value, app, use_security)
[docs] def value_from_basic(self, value, app, ignore_errors=False): # Handle Runtime and Unvalidated values if is_runtime_value(value): return runtime_to_object(value) elif isinstance(value, dict) and value.get('__class__') == 'UnvalidatedValue': return value['value'] # Delegate to the 'to_python' method if ignore_errors: try: return self.to_python(value, app) except Exception: return value else: return self.to_python(value, app)
[docs] def value_to_display_text(self, value): if is_runtime_value(value): return "Not available." return self.to_text(value)
[docs] def to_text(self, value): """ Convert a value to a text representation suitable for displaying to the user >>> p = ToolParameter(None, XML('<param name="_name" />')) >>> print(p.to_text(None)) Not available. >>> print(p.to_text('')) Empty. >>> print(p.to_text('text')) text >>> print(p.to_text(True)) True >>> print(p.to_text(False)) False >>> print(p.to_text(0)) 0 """ if value is not None: str_value = unicodify(value) if not str_value: return "Empty." return str_value return "Not available."
[docs] def to_param_dict_string(self, value, other_values={}): """Called via __str__ when used in the Cheetah template""" if value is None: value = "" elif not isinstance(value, string_types): value = str(value) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: value = self.sanitizer.sanitize_param(value) else: value = sanitize_param(value) return value
[docs] def validate(self, value, trans=None): if value in ["", None] and self.optional: return for validator in self.validators: validator.validate(value, trans)
[docs] def to_dict(self, trans, other_values={}): """ to_dict tool parameter. This can be overridden by subclasses. """ tool_dict = super(ToolParameter, self).to_dict() tool_dict['model_class'] = self.__class__.__name__ tool_dict['optional'] = self.optional tool_dict['hidden'] = self.hidden tool_dict['is_dynamic'] = self.is_dynamic tool_dict['value'] = self.value_to_basic(self.get_initial_value(trans, other_values), trans.app, use_security=True) return tool_dict
[docs] @classmethod def build(cls, tool, input_source): """Factory method to create parameter of correct type""" input_source = ensure_input_source(input_source) param_name = cls.parse_name(input_source) param_type = input_source.get('type') if not param_type: raise ValueError("parameter '%s' requires a 'type'" % (param_name)) elif param_type not in parameter_types: raise ValueError("parameter '%s' uses an unknown type '%s'" % (param_name, param_type)) else: return parameter_types[param_type](tool, input_source)
[docs] @staticmethod def parse_name(input_source): return input_source.parse_name()
[docs]class TextToolParameter(ToolParameter): """ Parameter that can take on any text value. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None) >>> p = TextToolParameter(None, XML('<param name="_name" type="text" value="default" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('area', False), ('argument', None), ('datalist', []), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'TextToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'text'), ('value', u'default')] """
[docs] def __init__(self, tool, input_source): input_source = ensure_input_source(input_source) super(TextToolParameter, self).__init__(tool, input_source) self.datalist = [] for (title, value, selected) in input_source.parse_static_options(): self.datalist.append({'label' : title, 'value': value}) self.value = input_source.get('value') self.area = input_source.get_bool('area', False)
[docs] def to_json(self, value, app, use_security): """Convert a value to a string representation suitable for persisting""" if value is None: rval = '' else: rval = unicodify(value) return rval
[docs] def validate(self, value, trans=None): search = self.type == "text" if not (trans and trans.workflow_building_mode is workflow_building_modes.ENABLED and contains_workflow_parameter(value, search=search)): return super(TextToolParameter, self).validate(value, trans)
[docs] def get_initial_value(self, trans, other_values): return self.value
[docs] def to_dict(self, trans, other_values={}): d = super(TextToolParameter, self).to_dict(trans) d['area'] = self.area d['datalist'] = self.datalist return d
[docs]class IntegerToolParameter(TextToolParameter): """ Parameter that takes an integer value. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=True) >>> p = IntegerToolParameter(None, XML('<param name="_name" type="integer" value="10" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('area', False), ('argument', None), ('datalist', []), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('max', None), ('min', None), ('model_class', 'IntegerToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'integer'), ('value', u'10')] >>> assert type(p.from_json("10", trans)) == int >>> type(p.from_json("_string", trans)) Traceback (most recent call last): ... ValueError: parameter '_name': an integer or workflow parameter is required """ dict_collection_visible_keys = ToolParameter.dict_collection_visible_keys + ['min', 'max']
[docs] def __init__(self, tool, input_source): super(IntegerToolParameter, self).__init__(tool, input_source) if self.value: try: int(self.value) except ValueError: raise ValueError("parameter '%s': the attribute 'value' must be an integer" % self.name) elif self.value is None and not self.optional: raise ValueError("parameter '%s': the attribute 'value' must be set for non optional parameters" % self.name) self.min = input_source.get('min') self.max = input_source.get('max') if self.min: try: self.min = int(self.min) except ValueError: raise ValueError("parameter '%s': attribute 'min' must be an integer" % self.name) if self.max: try: self.max = int(self.max) except ValueError: raise ValueError("parameter '%s': attribute 'max' must be an integer" % self.name) if self.min is not None or self.max is not None: self.validators.append(validation.InRangeValidator(None, self.min, self.max))
[docs] def from_json(self, value, trans, other_values={}): try: return int(value) except (TypeError, ValueError): if contains_workflow_parameter(value) and trans.workflow_building_mode is workflow_building_modes.ENABLED: return value if not value and self.optional: return "" if trans.workflow_building_mode is workflow_building_modes.ENABLED: raise ValueError("parameter '%s': an integer or workflow parameter is required" % self.name) else: raise ValueError("parameter '%s': the attribute 'value' must be set for non optional parameters" % self.name)
[docs] def to_python(self, value, app): try: return int(value) except (TypeError, ValueError) as err: if contains_workflow_parameter(value): return value if not value and self.optional: return None raise err
[docs] def get_initial_value(self, trans, other_values): if self.value: return int(self.value) else: return None
[docs]class FloatToolParameter(TextToolParameter): """ Parameter that takes a real number value. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=True) >>> p = FloatToolParameter(None, XML('<param name="_name" type="float" value="3.141592" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('area', False), ('argument', None), ('datalist', []), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('max', None), ('min', None), ('model_class', 'FloatToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'float'), ('value', u'3.141592')] >>> assert type(p.from_json("36.1", trans)) == float >>> type(p.from_json("_string", trans)) Traceback (most recent call last): ... ValueError: parameter '_name': an integer or workflow parameter is required """ dict_collection_visible_keys = ToolParameter.dict_collection_visible_keys + ['min', 'max']
[docs] def __init__(self, tool, input_source): super(FloatToolParameter, self).__init__(tool, input_source) self.min = input_source.get('min') self.max = input_source.get('max') if self.value: try: float(self.value) except ValueError: raise ValueError("parameter '%s': the attribute 'value' must be a real number" % self.name) elif self.value is None and not self.optional: raise ValueError("parameter '%s': the attribute 'value' must be set for non optional parameters" % self.name) if self.min: try: self.min = float(self.min) except ValueError: raise ValueError("parameter '%s': attribute 'min' must be a real number" % self.name) if self.max: try: self.max = float(self.max) except ValueError: raise ValueError("parameter '%s': attribute 'max' must be a real number" % self.name) if self.min is not None or self.max is not None: self.validators.append(validation.InRangeValidator(None, self.min, self.max))
[docs] def from_json(self, value, trans, other_values={}): try: return float(value) except (TypeError, ValueError): if contains_workflow_parameter(value) and trans.workflow_building_mode is workflow_building_modes.ENABLED: return value if not value and self.optional: return "" if trans.workflow_building_mode is workflow_building_modes.ENABLED: raise ValueError("parameter '%s': an integer or workflow parameter is required" % self.name) else: raise ValueError("parameter '%s': the attribute 'value' must be set for non optional parameters" % self.name)
[docs] def to_python(self, value, app): try: return float(value) except (TypeError, ValueError) as err: if contains_workflow_parameter(value): return value if not value and self.optional: return None raise err
[docs] def get_initial_value(self, trans, other_values): try: return float(self.value) except Exception: return None
[docs]class BooleanToolParameter(ToolParameter): """ Parameter that takes one of two values. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch()) >>> p = BooleanToolParameter(None, XML('<param name="_name" type="boolean" checked="yes" truevalue="_truevalue" falsevalue="_falsevalue" />')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('falsevalue', '_falsevalue'), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'BooleanToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('truevalue', '_truevalue'), ('type', 'boolean'), ('value', 'true')] >>> print(p.from_json('true')) True >>> print(p.to_param_dict_string(True)) _truevalue >>> print(p.from_json('false')) False >>> print(p.to_param_dict_string(False)) _falsevalue """
[docs] def __init__(self, tool, input_source): input_source = ensure_input_source(input_source) super(BooleanToolParameter, self).__init__(tool, input_source) self.truevalue = input_source.get('truevalue', 'true') self.falsevalue = input_source.get('falsevalue', 'false') self.checked = input_source.get_bool('checked', False)
[docs] def from_json(self, value, trans=None, other_values={}): return self.to_python(value)
[docs] def to_python(self, value, app=None): return (value in [True, 'True', 'true'])
[docs] def to_json(self, value, app, use_security): if self.to_python(value, app): return 'true' else: return 'false'
[docs] def get_initial_value(self, trans, other_values): return self.checked
[docs] def to_param_dict_string(self, value, other_values={}): if self.to_python(value): return self.truevalue else: return self.falsevalue
[docs] def to_dict(self, trans, other_values={}): d = super(BooleanToolParameter, self).to_dict(trans) d['truevalue'] = self.truevalue d['falsevalue'] = self.falsevalue return d
@property def legal_values(self): return [self.truevalue, self.falsevalue]
[docs]class FileToolParameter(ToolParameter): """ Parameter that takes an uploaded file as a value. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch()) >>> p = FileToolParameter(None, XML('<param name="_name" type="file"/>')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'FileToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'file'), ('value', None)] """
[docs] def __init__(self, tool, input_source): super(FileToolParameter, self).__init__(tool, input_source)
[docs] def from_json(self, value, trans=None, other_values={}): # Middleware or proxies may encode files in special ways (TODO: this # should be pluggable) if type(value) == dict: if 'session_id' in value: # handle api upload session_id = value["session_id"] upload_store = trans.app.config.new_file_path if re.match(r'^[\w-]+$', session_id) is None: raise ValueError("Invald session id format.") local_filename = os.path.abspath(os.path.join(upload_store, session_id)) else: # handle nginx upload upload_store = trans.app.config.nginx_upload_store assert upload_store, "Request appears to have been processed by nginx_upload_module but Galaxy is not configured to recognize it." local_filename = os.path.abspath(value['path']) assert local_filename.startswith(upload_store), "Filename provided by nginx (%s) is not in correct directory (%s)." % (local_filename, upload_store) value = dict(filename=value["name"], local_filename=local_filename) return value
[docs] def get_required_enctype(self): """ File upload elements require the multipart/form-data encoding """ return "multipart/form-data"
[docs] def to_json(self, value, app, use_security): if value in [None, '']: return None elif isinstance(value, string_types): return value elif isinstance(value, dict): # or should we jsonify? try: return value['local_filename'] except KeyError: return None elif isinstance(value, cgi_FieldStorage): return value.filename raise Exception("FileToolParameter cannot be persisted")
[docs] def to_python(self, value, app): if value is None: return None elif isinstance(value, string_types): return value else: raise Exception("FileToolParameter cannot be persisted")
[docs]class FTPFileToolParameter(ToolParameter): """ Parameter that takes a file uploaded via FTP as a value. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch(), user=None) >>> p = FTPFileToolParameter(None, XML('<param name="_name" type="ftpfile"/>')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'FTPFileToolParameter'), ('multiple', True), ('name', '_name'), ('optional', True), ('refresh_on_change', False), ('type', 'ftpfile'), ('value', None)] """
[docs] def __init__(self, tool, input_source): input_source = ensure_input_source(input_source) super(FTPFileToolParameter, self).__init__(tool, input_source) self.multiple = input_source.get_bool('multiple', True) self.optional = input_source.parse_optional(True) self.user_ftp_dir = ''
[docs] def get_initial_value(self, trans, other_values): if trans is not None: if trans.user is not None: self.user_ftp_dir = "%s/" % trans.user_ftp_dir return None
@property def visible(self): if self.tool.app.config.ftp_upload_dir is None or self.tool.app.config.ftp_upload_site is None: return False return True
[docs] def to_param_dict_string(self, value, other_values={}): if value == '': return 'None' lst = ['%s%s' % (self.user_ftp_dir, dataset) for dataset in value] if self.multiple: return lst else: return lst[0]
[docs] def from_json(self, value, trans=None, other_values={}): return self.to_python(value, trans.app, validate=True)
[docs] def to_json(self, value, app, use_security): return self.to_python(value, app)
[docs] def to_python(self, value, app, validate=False): if not isinstance(value, list): value = [value] lst = [] for val in value: if val in [None, '']: lst = [] break if isinstance(val, dict): lst.append(val['name']) else: lst.append(val) if len(lst) == 0: if not self.optional and validate: raise ValueError("Please select a valid FTP file.") return None if validate and self.tool.app.config.ftp_upload_dir is None: raise ValueError("The FTP directory is not configured.") return lst
[docs] def to_dict(self, trans, other_values=None): d = super(FTPFileToolParameter, self).to_dict(trans) d['multiple'] = self.multiple return d
[docs]class GenomespaceFileToolParameter(ToolParameter): """ Parameter that takes one of two values. """
[docs] def __init__(self, tool, input_source): super(GenomespaceFileToolParameter, self).__init__(tool, input_source) self.value = input_source.get('value')
[docs] def get_initial_value(self, trans, other_values): return self.value
[docs]class HiddenToolParameter(ToolParameter): """ Parameter that takes one of two values. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch()) >>> p = HiddenToolParameter(None, XML('<param name="_name" type="hidden" value="_value"/>')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('help', ''), ('hidden', True), ('is_dynamic', False), ('label', ''), ('model_class', 'HiddenToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'hidden'), ('value', u'_value')] """
[docs] def __init__(self, tool, input_source): super(HiddenToolParameter, self).__init__(tool, input_source) self.value = input_source.get('value') self.hidden = True
[docs] def get_initial_value(self, trans, other_values): return self.value
[docs] def get_label(self): return None
[docs]class ColorToolParameter(ToolParameter): """ Parameter that stores a color. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch()) >>> p = ColorToolParameter(None, XML('<param name="_name" type="color" value="#ffffff"/>')) >>> print(p.name) _name >>> print(p.to_param_dict_string("#ffffff")) #ffffff >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ColorToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'color'), ('value', u'#ffffff')] >>> p = ColorToolParameter(None, XML('<param name="_name" type="color"/>')) >>> print(p.get_initial_value(trans, {})) #000000 >>> p = ColorToolParameter(None, XML('<param name="_name" type="color" value="#ffffff" rgb="True"/>')) >>> print(p.to_param_dict_string("#ffffff")) (255, 255, 255) >>> print(p.to_param_dict_string(None)) Traceback (most recent call last): ... ValueError: Failed to convert 'None' to RGB. """
[docs] def __init__(self, tool, input_source): input_source = ensure_input_source(input_source) super(ColorToolParameter, self).__init__(tool, input_source) self.value = input_source.get('value', '#000000') self.rgb = input_source.get('rgb', False)
[docs] def get_initial_value(self, trans, other_values): if self.value is not None: return self.value.lower()
[docs] def to_param_dict_string(self, value, other_values={}): if self.rgb: try: return str(tuple(int(value.lstrip('#')[i : i + 2], 16) for i in (0, 2, 4))) except Exception: raise ValueError("Failed to convert \'%s\' to RGB." % value) return str(value)
[docs]class BaseURLToolParameter(HiddenToolParameter): """ Returns a parameter that contains its value prepended by the current server base url. Used in all redirects. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch()) >>> p = BaseURLToolParameter(None, XML('<param name="_name" type="base_url" value="_value"/>')) >>> print(p.name) _name >>> assert sorted(p.to_dict(trans).items()) == [('argument', None), ('help', ''), ('hidden', True), ('is_dynamic', False), ('label', ''), ('model_class', 'BaseURLToolParameter'), ('name', '_name'), ('optional', False), ('refresh_on_change', False), ('type', 'base_url'), ('value', u'_value')] """
[docs] def __init__(self, tool, input_source): super(BaseURLToolParameter, self).__init__(tool, input_source) self.value = input_source.get('value', '')
[docs] def get_initial_value(self, trans, other_values): return self._get_value()
[docs] def from_json(self, value=None, trans=None, other_values={}): return self._get_value()
def _get_value(self): try: return url_for(self.value, qualified=True) except Exception as e: log.debug('Url creation failed for "%s": %s', self.name, unicodify(e)) return self.value
[docs] def to_dict(self, trans, other_values={}): d = super(BaseURLToolParameter, self).to_dict(trans) return d
[docs]class SelectToolParameter(ToolParameter): """ Parameter that takes on one (or many) or a specific set of values. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch(), workflow_building_mode=False) >>> p = SelectToolParameter(None, XML( ... ''' ... <param name="_name" type="select"> ... <option value="x">x_label</option> ... <option value="y" selected="true">y_label</option> ... <option value="z">z_label</option> ... </param> ... ''')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('display', None), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'SelectToolParameter'), ('multiple', False), ('name', '_name'), ('optional', False), ('options', [('x_label', 'x', False), ('y_label', 'y', True), ('z_label', 'z', False)]), ('refresh_on_change', False), ('textable', False), ('type', 'select'), ('value', 'y')] >>> p = SelectToolParameter(None, XML( ... ''' ... <param name="_name" type="select" multiple="true"> ... <option value="x">x_label</option> ... <option value="y" selected="true">y_label</option> ... <option value="z" selected="true">z_label</option> ... </param> ... ''')) >>> print(p.name) _name >>> sorted(p.to_dict(trans).items()) [('argument', None), ('display', None), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'SelectToolParameter'), ('multiple', True), ('name', '_name'), ('optional', True), ('options', [('x_label', 'x', False), ('y_label', 'y', True), ('z_label', 'z', True)]), ('refresh_on_change', False), ('textable', False), ('type', 'select'), ('value', ['y', 'z'])] >>> print(p.to_param_dict_string(["y", "z"])) y,z """
[docs] def __init__(self, tool, input_source, context=None): input_source = ensure_input_source(input_source) super(SelectToolParameter, self).__init__(tool, input_source) self.multiple = input_source.get_bool('multiple', False) # Multiple selects are optional by default, single selection is the inverse. self.optional = input_source.parse_optional(self.multiple) self.display = input_source.get('display', None) self.separator = input_source.get('separator', ',') self.legal_values = set() self.dynamic_options = input_source.get('dynamic_options', None) self.options = parse_dynamic_options(self, input_source) if self.options is not None: for validator in self.options.validators: self.validators.append(validator) if self.dynamic_options is None and self.options is None: self.static_options = input_source.parse_static_options() for (title, value, selected) in self.static_options: self.legal_values.add(value) self.is_dynamic = ((self.dynamic_options is not None) or (self.options is not None))
def _get_dynamic_options_call_other_values(self, trans, other_values): call_other_values = ExpressionContext({'__trans__': trans}) if other_values: call_other_values.parent = other_values.parent call_other_values.update(other_values.dict) return call_other_values
[docs] def get_options(self, trans, other_values): if self.options: return self.options.get_options(trans, other_values) elif self.dynamic_options: call_other_values = self._get_dynamic_options_call_other_values(trans, other_values) try: return eval(self.dynamic_options, self.tool.code_namespace, call_other_values) except Exception as e: log.debug("Error determining dynamic options for parameter '%s' in tool '%s':", self.name, self.tool.id, exc_info=e) return [] else: return self.static_options
[docs] def from_json(self, value, trans, other_values={}, require_legal_value=True): try: legal_values = self.get_legal_values(trans, other_values) except ImplicitConversionRequired: return value if (not legal_values or not require_legal_value) and is_runtime_context(trans, other_values): if self.multiple: # While it is generally allowed that a select value can be '', # we do not allow this to be the case in a dynamically # generated multiple select list being set in workflow building # mode we instead treat '' as 'No option Selected' (None) if value == '': value = None else: if isinstance(value, string_types): # Split on all whitespace. This not only provides flexibility # in interpreting values but also is needed because many browsers # use \r\n to separate lines. value = value.split() return value elif value is None: if self.optional: return None raise ValueError("parameter '%s': an invalid option (None) was selected, please verify" % self.name) elif not legal_values: if self.optional and self.tool.profile < 18.09: # Covers optional parameters with default values that reference other optional parameters. # These will have a value but no legal_values. return None raise ValueError("parameter '%s': requires a value, but no legal values defined" % self.name) if isinstance(value, list): if not self.multiple: raise ValueError("parameter '%s': multiple values provided but parameter is not expecting multiple values" % (self.name)) rval = [] for v in value: if v not in legal_values: raise ValueError("parameter '%s': an invalid option (%r) was selected (valid options: %s)" % (self.name, v, ",".join(legal_values))) rval.append(v) return rval else: value_is_none = (value == "None" and "None" not in legal_values) if value_is_none or not value: if self.multiple: if self.optional: return [] else: raise ValueError("parameter '%s': no option was selected for non optional parameter" % (self.name)) if value not in legal_values and require_legal_value: raise ValueError("parameter '%s': an invalid option (%r) was selected (valid options: %s)" % (self.name, value, ",".join(legal_values))) return value
[docs] def to_param_dict_string(self, value, other_values={}): if value is None: return "None" if isinstance(value, list): if not self.multiple: raise ValueError("parameter '%s': multiple values provided but parameter is not expecting multiple values" % (self.name)) value = list(map(str, value)) else: value = str(value) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: value = self.sanitizer.sanitize_param(value) else: value = sanitize_param(value) if isinstance(value, list): value = self.separator.join(value) return value
[docs] def to_json(self, value, app, use_security): return value
[docs] def get_initial_value(self, trans, other_values): try: options = list(self.get_options(trans, other_values)) except ImplicitConversionRequired: return None if not options: return None value = [optval for _, optval, selected in options if selected] if len(value) == 0: if not self.optional and not self.multiple and options: # Nothing selected, but not optional and not a multiple select, with some values, # so we have to default to something (the HTML form will anyway) value = options[0][1] else: value = None elif len(value) == 1 or not self.multiple: value = value[0] return value
[docs] def to_text(self, value): if not isinstance(value, list): value = [value] # FIXME: Currently only translating values back to labels if they # are not dynamic if self.is_dynamic: rval = map(str, value) else: options = list(self.static_options) rval = [] for t, v, s in options: if v in value: rval.append(t) if rval: return "\n".join(rval) return "Nothing selected."
[docs] def get_dependencies(self): """ Get the *names* of the other params this param depends on. """ if self.options: return self.options.get_dependency_names() else: return []
[docs] def to_dict(self, trans, other_values={}): d = super(SelectToolParameter, self).to_dict(trans, other_values) # Get options, value. options = self.get_options(trans, other_values) d['options'] = options d['display'] = self.display d['multiple'] = self.multiple d['textable'] = is_runtime_context(trans, other_values) return d
[docs]class GenomeBuildParameter(SelectToolParameter): """ Select list that sets the last used genome build for the current history as "selected". >>> # Create a mock transaction with 'hg17' as the current build >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=util.read_dbnames(None)) >>> p = GenomeBuildParameter(None, XML('<param name="_name" type="genomebuild" value="hg17" />')) >>> print(p.name) _name >>> d = p.to_dict(trans) >>> o = d['options'] >>> [i for i in o if i[2] == True] [('Human May 2004 (NCBI35/hg17) (hg17)', 'hg17', True)] >>> [i for i in o if i[1] == 'hg18'] [('Human Mar. 2006 (NCBI36/hg18) (hg18)', 'hg18', False)] """
[docs] def __init__(self, *args, **kwds): super(GenomeBuildParameter, self).__init__(*args, **kwds) if self.tool: self.static_options = [(value, key, False) for key, value in self._get_dbkey_names()]
[docs] def get_options(self, trans, other_values): last_used_build = object() if trans.history: last_used_build = trans.history.genome_build for dbkey, build_name in self._get_dbkey_names(trans=trans): yield build_name, dbkey, (dbkey == last_used_build)
[docs] def to_dict(self, trans, other_values={}): # skip SelectToolParameter (the immediate parent) bc we need to get options in a different way here d = ToolParameter.to_dict(self, trans) # Get options, value - options is a generator here, so compile to list options = list(self.get_options(trans, {})) value = options[0][1] for option in options: if option[2]: # Found selected option. value = option[1] d.update({ 'options' : options, 'value' : value, 'display' : self.display, 'multiple' : self.multiple, }) return d
def _get_dbkey_names(self, trans=None): if not self.tool: # Hack for unit tests, since we have no tool return util.read_dbnames(None) return self.tool.app.genome_builds.get_genome_build_names(trans=trans)
[docs]class SelectTagParameter(SelectToolParameter): """ Select set that is composed of a set of tags available for an input. """
[docs] def __init__(self, tool, input_source): input_source = ensure_input_source(input_source) super(SelectTagParameter, self).__init__(tool, input_source) self.tool = tool self.tag_key = input_source.get("group", False) self.optional = input_source.get("optional", False) self.multiple = input_source.get("multiple", False) self.accept_default = input_source.get_bool("accept_default", False) if self.accept_default: self.optional = True self.data_ref = input_source.get("data_ref", None) self.ref_input = None # Legacy style default value specification... self.default_value = input_source.get("default_value", None) if self.default_value is None: # Newer style... more in line with other parameters. self.default_value = input_source.get("value", None) self.is_dynamic = True
[docs] def from_json(self, value, trans, other_values={}): if self.multiple: tag_list = [] # split on newline and , if isinstance(value, list) or isinstance(value, string_types): if not isinstance(value, list): value = value.split('\n') for tag_str in value: for tag in str(tag_str).split(','): tag = tag.strip() if tag: tag_list.append(tag) value = tag_list else: if not value: value = None # We skip requiring legal values -- this is similar to optional, but allows only subset of datasets to be positive # TODO: May not actually be required for (nested) collection input ? return super(SelectTagParameter, self).from_json(value, trans, other_values, require_legal_value=False)
[docs] def get_tag_list(self, other_values): """ Generate a select list containing the tags of the associated dataset (if found). """ # Get the value of the associated data reference (a dataset) history_items = other_values.get(self.data_ref, None) # Check if a dataset is selected if is_runtime_value(history_items): return [] if not history_items: return [] tags = set() for history_item in util.listify(history_items): if hasattr(history_item, 'dataset_instances'): for dataset in history_item.dataset_instances: for tag in dataset.tags: if tag.user_tname == 'group': tags.add(tag.user_value) else: for tag in history_item.tags: if tag.user_tname == 'group': tags.add(tag.user_value) return list(tags)
[docs] def get_options(self, trans, other_values): """ Show tags """ options = [] for tag in self.get_tag_list(other_values): options.append(('Tags: ' + tag, tag, False)) return options
[docs] def get_initial_value(self, trans, other_values): if self.default_value is not None: return self.default_value return SelectToolParameter.get_initial_value(self, trans, other_values)
[docs] def get_dependencies(self): return [self.data_ref]
[docs] def to_dict(self, trans, other_values={}): d = super(SelectTagParameter, self).to_dict(trans, other_values=other_values) d['data_ref'] = self.data_ref return d
[docs]class ColumnListParameter(SelectToolParameter): """ Select list that consists of either the total number of columns or only those columns that contain numerical values in the associated DataToolParameter. # TODO: we need better testing here, but not sure how to associate a DatatoolParameter with a ColumnListParameter # from a twill perspective... >>> # Mock up a history (not connected to database) >>> from galaxy.model import History, HistoryDatasetAssociation >>> from galaxy.util.bunch import Bunch >>> from galaxy.model.mapping import init >>> sa_session = init("/tmp", "sqlite:///:memory:", create_tables=True).session >>> hist = History() >>> sa_session.add(hist) >>> sa_session.flush() >>> hda = hist.add_dataset(HistoryDatasetAssociation(id=1, extension='interval', create_dataset=True, sa_session=sa_session)) >>> dtp = DataToolParameter(None, XML('<param name="blah" type="data" format="interval"/>')) >>> print(dtp.name) blah >>> clp = ColumnListParameter(None, XML('<param name="numerical_column" type="data_column" data_ref="blah" numerical="true"/>')) >>> print(clp.name) numerical_column """
[docs] def __init__(self, tool, input_source): input_source = ensure_input_source(input_source) super(ColumnListParameter, self).__init__(tool, input_source) self.numerical = input_source.get_bool("numerical", False) self.optional = input_source.parse_optional(False) self.accept_default = input_source.get_bool("accept_default", False) if self.accept_default: self.optional = True self.data_ref = input_source.get("data_ref", None) self.ref_input = None # Legacy style default value specification... self.default_value = input_source.get("default_value", None) if self.default_value is None: # Newer style... more in line with other parameters. self.default_value = input_source.get("value", None) if self.default_value is not None: self.default_value = ColumnListParameter._strip_c(self.default_value) self.is_dynamic = True self.usecolnames = input_source.get_bool("use_header_names", False)
[docs] def from_json(self, value, trans, other_values={}): """ Label convention prepends column number with a 'c', but tool uses the integer. This removes the 'c' when entered into a workflow. """ if self.multiple: # split on newline and , if isinstance(value, list) or isinstance(value, string_types): column_list = [] if not isinstance(value, list): value = value.split('\n') for column in value: for column2 in str(column).split(','): column2 = column2.strip() if column2: column_list.append(column2) value = list(map(ColumnListParameter._strip_c, column_list)) else: value = [] else: if value: value = ColumnListParameter._strip_c(value) else: value = None if not value and self.accept_default: value = self.default_value or '1' return [value] if self.multiple else value return super(ColumnListParameter, self).from_json(value, trans, other_values)
@staticmethod def _strip_c(column): if isinstance(column, string_types): if column.startswith('c'): column = column.strip().lower()[1:] return column
[docs] def get_column_list(self, trans, other_values): """ Generate a select list containing the columns of the associated dataset (if found). """ # Get the value of the associated data reference (a dataset) dataset = other_values.get(self.data_ref) # Check if a dataset is selected if not dataset: return [] column_list = None for dataset in util.listify(dataset): # Use representative dataset if a dataset collection is parsed if isinstance(dataset, trans.app.model.HistoryDatasetCollectionAssociation): dataset = dataset.to_hda_representative() if isinstance(dataset, trans.app.model.HistoryDatasetAssociation) and self.ref_input and self.ref_input.formats: target_ext, converted_dataset = dataset.find_conversion_destination(self.ref_input.formats) if target_ext: if not converted_dataset: raise ImplicitConversionRequired else: dataset = converted_dataset # Columns can only be identified if the dataset is ready and metadata is available if not hasattr(dataset, 'metadata') or \ not hasattr(dataset.metadata, 'columns') or \ not dataset.metadata.columns: return [] # Build up possible columns for this dataset this_column_list = [] if self.numerical: # If numerical was requested, filter columns based on metadata for i, col in enumerate(dataset.metadata.column_types): if col == 'int' or col == 'float': this_column_list.append(str(i + 1)) else: this_column_list = [str(i) for i in range(1, dataset.metadata.columns + 1)] # Take the intersection of these columns with the other columns. if column_list is None: column_list = this_column_list else: column_list = [c for c in column_list if c in this_column_list] return column_list
[docs] def get_options(self, trans, other_values): """ Show column labels rather than c1..cn if use_header_names=True """ options = [] if self.usecolnames: # read first row - assume is a header with metadata useful for making good choices dataset = other_values.get(self.data_ref, None) try: with open(dataset.get_file_name(), 'r') as f: head = f.readline() cnames = head.rstrip().split('\t') column_list = [('%d' % (i + 1), 'c%d: %s' % (i + 1, x)) for i, x in enumerate(cnames)] if self.numerical: # If numerical was requested, filter columns based on metadata if hasattr(dataset, 'metadata') and hasattr(dataset.metadata, 'column_types'): if len(dataset.metadata.column_types) >= len(cnames): numerics = [i for i, x in enumerate(dataset.metadata.column_types) if x in ['int', 'float']] column_list = [column_list[i] for i in numerics] except Exception: column_list = self.get_column_list(trans, other_values) else: column_list = self.get_column_list(trans, other_values) for col in column_list: if isinstance(col, tuple) and len(col) == 2: options.append((col[1], col[0], False)) else: options.append(('Column: ' + col, col, False)) return options
[docs] def get_initial_value(self, trans, other_values): if self.default_value is not None: return self.default_value return super(ColumnListParameter, self).get_initial_value(trans, other_values)
[docs] def get_dependencies(self): return [self.data_ref]
[docs] def to_dict(self, trans, other_values={}): d = super(ColumnListParameter, self).to_dict(trans, other_values=other_values) d['data_ref'] = self.data_ref d['numerical'] = self.numerical return d
[docs]class DrillDownSelectToolParameter(SelectToolParameter): """ Parameter that takes on one (or many) of a specific set of values. Creating a hierarchical select menu, which allows users to 'drill down' a tree-like set of options. >>> from galaxy.util.bunch import Bunch >>> trans = Bunch(app=None, history=Bunch(genome_build='hg17'), db_builds=util.read_dbnames(None)) >>> p = DrillDownSelectToolParameter(None, XML( ... ''' ... <param name="_name" type="drill_down" display="checkbox" hierarchy="recurse" multiple="true"> ... <options> ... <option name="Heading 1" value="heading1"> ... <option name="Option 1" value="option1"/> ... <option name="Option 2" value="option2"/> ... <option name="Heading 2" value="heading2"> ... <option name="Option 3" value="option3"/> ... <option name="Option 4" value="option4"/> ... </option> ... </option> ... <option name="Option 5" value="option5"/> ... </options> ... </param> ... ''')) >>> print(p.name) _name >>> d = p.to_dict(trans) >>> assert d['multiple'] == True >>> assert d['display'] == 'checkbox' >>> assert d['options'][0]['name'] == 'Heading 1' >>> assert d['options'][0]['value'] == 'heading1' >>> assert d['options'][0]['options'][0]['name'] == 'Option 1' >>> assert d['options'][0]['options'][0]['value'] == 'option1' >>> assert d['options'][0]['options'][1]['name'] == 'Option 2' >>> assert d['options'][0]['options'][1]['value'] == 'option2' >>> assert d['options'][0]['options'][2]['name'] == 'Heading 2' >>> assert d['options'][0]['options'][2]['value'] == 'heading2' >>> assert d['options'][0]['options'][2]['options'][0]['name'] == 'Option 3' >>> assert d['options'][0]['options'][2]['options'][0]['value'] == 'option3' >>> assert d['options'][0]['options'][2]['options'][1]['name'] == 'Option 4' >>> assert d['options'][0]['options'][2]['options'][1]['value'] == 'option4' >>> assert d['options'][1]['name'] == 'Option 5' >>> assert d['options'][1]['value'] == 'option5' """
[docs] def __init__(self, tool, input_source, context=None): def recurse_option_elems(cur_options, option_elems): for option_elem in option_elems: selected = string_as_bool(option_elem.get('selected', False)) cur_options.append({'name': option_elem.get('name'), 'value': option_elem.get('value'), 'options': [], 'selected': selected}) recurse_option_elems(cur_options[-1]['options'], option_elem.findall('option')) input_source = ensure_input_source(input_source) ToolParameter.__init__(self, tool, input_source) # TODO: abstract XML out of here - so non-XML InputSources can # specify DrillDown parameters. elem = input_source.elem() self.multiple = string_as_bool(elem.get('multiple', False)) self.display = elem.get('display', None) self.hierarchy = elem.get('hierarchy', 'exact') # exact or recurse self.separator = elem.get('separator', ',') from_file = elem.get('from_file', None) if from_file: if not os.path.isabs(from_file): from_file = os.path.join(tool.app.config.tool_data_path, from_file) elem = XML("<root>%s</root>" % open(from_file).read()) self.dynamic_options = elem.get('dynamic_options', None) if self.dynamic_options: self.is_dynamic = True self.options = [] self.filtered = {} if elem.find('filter'): self.is_dynamic = True for filter in elem.findall('filter'): # currently only filtering by metadata key matching input file is allowed if filter.get('type') == 'data_meta': if filter.get('data_ref') not in self.filtered: self.filtered[filter.get('data_ref')] = {} if filter.get('meta_key') not in self.filtered[filter.get('data_ref')]: self.filtered[filter.get('data_ref')][filter.get('meta_key')] = {} if filter.get('value') not in self.filtered[filter.get('data_ref')][filter.get('meta_key')]: self.filtered[filter.get('data_ref')][filter.get('meta_key')][filter.get('value')] = [] recurse_option_elems(self.filtered[filter.get('data_ref')][filter.get('meta_key')][filter.get('value')], filter.find('options').findall('option')) elif not self.dynamic_options: recurse_option_elems(self.options, elem.find('options').findall('option'))
def _get_options_from_code(self, trans=None, value=None, other_values=None): assert self.dynamic_options, Exception("dynamic_options was not specifed") call_other_values = ExpressionContext({'__trans__': trans, '__value__': value}) if other_values: call_other_values.parent = other_values.parent call_other_values.update(other_values.dict) try: return eval(self.dynamic_options, self.tool.code_namespace, call_other_values) except Exception: return []
[docs] def get_options(self, trans=None, value=None, other_values={}): if self.is_dynamic: if self.dynamic_options: options = self._get_options_from_code(trans=trans, value=value, other_values=other_values) else: options = [] for filter_key, filter_value in self.filtered.items(): dataset = other_values.get(filter_key) if dataset.__class__.__name__.endswith("DatasetFilenameWrapper"): # this is a bad way to check for this, but problems importing class (due to circular imports?) dataset = dataset.dataset if dataset: for meta_key, meta_dict in filter_value.items(): if hasattr(dataset, 'metadata') and hasattr(dataset.metadata, 'spec'): check_meta_val = dataset.metadata.spec[meta_key].param.to_string(dataset.metadata.get(meta_key)) if check_meta_val in meta_dict: options.extend(meta_dict[check_meta_val]) return options return self.options
[docs] def from_json(self, value, trans, other_values={}): legal_values = self.get_legal_values(trans, other_values) if not legal_values and trans.workflow_building_mode: if self.multiple: if value == '': # No option selected value = None else: value = value.split("\n") return value elif value is None: if self.optional: return None raise ValueError("parameter '%s': an invalid option (%r) was selected" % (self.name, value)) elif not legal_values: raise ValueError("parameter '%s': requires a value, but no legal values defined" % (self.name)) if not isinstance(value, list): value = [value] if len(value) > 1 and not self.multiple: raise ValueError("parameter '%s': multiple values provided but parameter is not expecting multiple values" % (self.name)) rval = [] for val in value: if val not in legal_values: raise ValueError("parameter '%s': an invalid option (%r) was selected (valid options: %s)" % (self.name, val, ",".join(legal_values))) rval.append(val) return rval
[docs] def to_param_dict_string(self, value, other_values={}): def get_options_list(value): def get_base_option(value, options): for option in options: if value == option['value']: return option rval = get_base_option(value, option['options']) if rval: return rval return None # not found def recurse_option(option_list, option): if not option['options']: option_list.append(option['value']) else: for opt in option['options']: recurse_option(option_list, opt) rval = [] recurse_option(rval, get_base_option(value, self.get_options(other_values=other_values))) return rval or [value] if value is None: return "None" rval = [] if self.hierarchy == "exact": rval = value else: for val in value: options = get_options_list(val) rval.extend(options) if len(rval) > 1 and not self.multiple: raise ValueError("parameter '%s': multiple values provided but parameter is not expecting multiple values" % (self.name)) rval = self.separator.join(rval) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: rval = self.sanitizer.sanitize_param(rval) else: rval = sanitize_param(rval) return rval
[docs] def get_initial_value(self, trans, other_values): def recurse_options(initial_values, options): for option in options: if option['selected']: initial_values.append(option['value']) recurse_options(initial_values, option['options']) # More working around dynamic options for workflow options = self.get_options(trans=trans, other_values=other_values) if not options: return None initial_values = [] recurse_options(initial_values, options) if len(initial_values) == 0: initial_values = None return initial_values
[docs] def to_text(self, value): def get_option_display(value, options): for option in options: if value == option['value']: return option['name'] rval = get_option_display(value, option['options']) if rval: return rval return None # not found if not value: value = [] elif not isinstance(value, list): value = [value] # FIXME: Currently only translating values back to labels if they # are not dynamic if self.is_dynamic: if value: if isinstance(value, list): rval = value else: rval = [value] else: rval = [] else: rval = [] for val in value: rval.append(get_option_display(val, self.options) or val) if rval: return "\n".join(map(str, rval)) return "Nothing selected."
[docs] def get_dependencies(self): """ Get the *names* of the other params this param depends on. """ return list(self.filtered.keys())
[docs] def to_dict(self, trans, other_values={}): # skip SelectToolParameter (the immediate parent) bc we need to get options in a different way here d = ToolParameter.to_dict(self, trans) d['options'] = self.get_options(trans=trans, other_values=other_values) d['display'] = self.display d['multiple'] = self.multiple return d
[docs]class BaseDataToolParameter(ToolParameter):
[docs] def __init__(self, tool, input_source, trans): super(BaseDataToolParameter, self).__init__(tool, input_source) self.refresh_on_change = True # Find datatypes_registry if self.tool is None: if trans: # Must account for "Input Dataset" types, which while not a tool still need access to the real registry. # A handle to the transaction (and thus app) will be given by the module. self.datatypes_registry = trans.app.datatypes_registry else: # This occurs for things such as unit tests import galaxy.datatypes.registry self.datatypes_registry = galaxy.datatypes.registry.Registry() self.datatypes_registry.load_datatypes() else: self.datatypes_registry = self.tool.app.datatypes_registry # can be None if self.tool.app is a ValidationContext
def _parse_formats(self, trans, input_source): """ Build list of classes for supported data formats """ self.extensions = input_source.get('format', 'data').split(",") formats = [] if self.datatypes_registry: # This may be None when self.tool.app is a ValidationContext normalized_extensions = [extension.strip().lower() for extension in self.extensions] for extension in normalized_extensions: datatype = self.datatypes_registry.get_datatype_by_extension(extension) if datatype is not None: formats.append(datatype) else: log.warning("Datatype class not found for extension '%s', which is used in the 'format' attribute of parameter '%s'" % (extension, self.name)) self.formats = formats def _parse_options(self, input_source): # TODO: Enhance dynamic options for DataToolParameters. Currently, # only the special case key='build' of type='data_meta' is # a valid filter self.options_filter_attribute = None self.options = parse_dynamic_options(self, input_source) if self.options: # TODO: Abstract away XML handling here. options_elem = input_source.elem().find('options') self.options_filter_attribute = options_elem.get('options_filter_attribute', None) self.is_dynamic = self.options is not None
[docs] def get_initial_value(self, trans, other_values): if trans.workflow_building_mode is workflow_building_modes.ENABLED or trans.app.name == 'tool_shed': return RuntimeValue() if self.optional: return None history = trans.history if history is not None: dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) if isinstance(self, DataToolParameter): for hda in reversed(history.active_visible_datasets_and_roles): match = dataset_matcher.hda_match(hda) if match: return match.hda else: dataset_collection_matcher = dataset_matcher_factory.dataset_collection_matcher(dataset_matcher) for hdca in reversed(history.active_visible_dataset_collections): if dataset_collection_matcher.hdca_match(hdca): return hdca
[docs] def to_json(self, value, app, use_security): def single_to_json(value): src = None if isinstance(value, dict) and 'src' in value and 'id' in value: return value elif isinstance(value, galaxy.model.DatasetCollectionElement): src = 'dce' elif isinstance(value, app.model.HistoryDatasetCollectionAssociation): src = 'hdca' elif isinstance(value, app.model.LibraryDatasetDatasetAssociation): src = 'ldda' elif isinstance(value, app.model.HistoryDatasetAssociation) or hasattr(value, 'id'): # hasattr 'id' fires a query on persistent objects after a flush so better # to do the isinstance check. Not sure we need the hasattr check anymore - it'd be # nice to drop it. src = 'hda' if src is not None: object_id = galaxy.model.cached_id(value) return {'id' : app.security.encode_id(object_id) if use_security else object_id, 'src' : src} if value not in [None, '', 'None']: if isinstance(value, list) and len(value) > 0: values = [single_to_json(v) for v in value] else: values = [single_to_json(value)] return {'values': values} return None
[docs] def to_python(self, value, app): def single_to_python(value): if isinstance(value, dict) and 'src' in value: id = value['id'] if isinstance(value['id'], int) else app.security.decode_id(value['id']) if value['src'] == 'dce': return app.model.context.query(app.model.DatasetCollectionElement).get(id) elif value['src'] == 'hdca': return app.model.context.query(app.model.HistoryDatasetCollectionAssociation).get(id) elif value['src'] == 'ldda': return app.model.context.query(app.model.LibraryDatasetDatasetAssociation).get(id) else: return app.model.context.query(app.model.HistoryDatasetAssociation).get(id) if isinstance(value, dict) and 'values' in value: if hasattr(self, 'multiple') and self.multiple is True: return [single_to_python(v) for v in value['values']] elif len(value['values']) > 0: return single_to_python(value['values'][0]) # Handle legacy string values potentially stored in databases none_values = [None, '', 'None'] if value in none_values: return None if isinstance(value, string_types) and value.find(',') > -1: return [app.model.context.query(app.model.HistoryDatasetAssociation).get(int(v)) for v in value.split(',') if v not in none_values] elif str(value).startswith("__collection_reduce__|"): decoded_id = str(value)[len("__collection_reduce__|"):] if not decoded_id.isdigit(): decoded_id = app.security.decode_id(decoded_id) return app.model.context.query(app.model.HistoryDatasetCollectionAssociation).get(int(decoded_id)) elif str(value).startswith("dce:"): return app.model.context.query(app.model.DatasetCollectionElement).get(int(value[len("dce:"):])) elif str(value).startswith("hdca:"): return app.model.context.query(app.model.HistoryDatasetCollectionAssociation).get(int(value[len("hdca:"):])) else: return app.model.context.query(app.model.HistoryDatasetAssociation).get(int(value))
[docs]class DataToolParameter(BaseDataToolParameter): # TODO, Nate: Make sure the following unit tests appropriately test the dataset security # components. Add as many additional tests as necessary. """ Parameter that takes on one (or many) or a specific set of values. TODO: There should be an alternate display that allows single selects to be displayed as radio buttons and multiple selects as a set of checkboxes TODO: The following must be fixed to test correctly for the new security_check tag in the DataToolParameter (the last test below is broken) Nate's next pass at the dataset security stuff will dramatically alter this anyway. """
[docs] def __init__(self, tool, input_source, trans=None): input_source = ensure_input_source(input_source) super(DataToolParameter, self).__init__(tool, input_source, trans) # Add metadata validator if not input_source.get_bool('no_validation', False): self.validators.append(validation.MetadataValidator()) self._parse_formats(trans, input_source) self.multiple = input_source.get_bool('multiple', False) self.min = input_source.get('min') self.max = input_source.get('max') if self.min: try: self.min = int(self.min) except ValueError: raise ValueError("parameter '%s': attribute 'min' must be an integer" % self.name) if self.max: try: self.max = int(self.max) except ValueError: raise ValueError("parameter '%s': attribute 'max' must be an integer" % self.name) if not self.multiple and (self.min is not None): raise ValueError("parameter '%s': cannot specify 'min' property on single data parameter '%s'. Set multiple=\"true\" to enable this option" % self.name) if not self.multiple and (self.max is not None): raise ValueError("parameter '%s': cannot specify 'max' property on single data parameter '%s'. Set multiple=\"true\" to enable this option" % self.name) self.is_dynamic = True self._parse_options(input_source) # Load conversions required for the dataset input self.conversions = [] for name, conv_extension in input_source.parse_conversion_tuples(): assert None not in [name, conv_extension], 'A name (%s) and type (%s) are required for explicit conversion' % (name, conv_extension) if self.datatypes_registry: conv_type = self.datatypes_registry.get_datatype_by_extension(conv_extension.lower()) if conv_type is None: raise ValueError("parameter '%s': datatype class not found for extension '%s', which is used as 'type' attribute in conversion of data parameter" % (self.name, conv_type)) self.conversions.append((name, conv_extension, [conv_type]))
[docs] def from_json(self, value, trans, other_values={}): if trans.workflow_building_mode is workflow_building_modes.ENABLED or is_runtime_value(value): return None if not value and not self.optional: raise ValueError("parameter '%s': specify a dataset of the required format / build for parameter" % self.name) if value in [None, "None", '']: return None if isinstance(value, dict) and 'values' in value: value = self.to_python(value, trans.app) if isinstance(value, string_types) and value.find(",") > 0: value = [int(value_part) for value_part in value.split(",")] if isinstance(value, list): rval = [] found_hdca = False for single_value in value: if isinstance(single_value, dict) and 'src' in single_value and 'id' in single_value: if single_value['src'] == 'hda': decoded_id = trans.security.decode_id(single_value['id']) rval.append(trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(decoded_id)) elif single_value['src'] == 'hdca': found_hdca = True decoded_id = trans.security.decode_id(single_value['id']) rval.append(trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(decoded_id)) elif single_value['src'] == 'ldda': decoded_id = trans.security.decode_id(single_value['id']) rval.append(trans.sa_session.query(trans.app.model.LibraryDatasetDatasetAssociation).get(decoded_id)) else: raise ValueError("Unknown input source %s passed to job submission API." % single_value['src']) elif isinstance(single_value, trans.app.model.HistoryDatasetCollectionAssociation): rval.append(single_value) elif isinstance(single_value, trans.app.model.DatasetCollectionElement): rval.append(single_value) elif isinstance(single_value, trans.app.model.HistoryDatasetAssociation): rval.append(single_value) elif isinstance(single_value, trans.app.model.LibraryDatasetDatasetAssociation): rval.append(single_value) else: if len(str(single_value)) == 16: # Could never really have an ID this big anyway - postgres doesn't # support that for integer column types. log.warning("Encoded ID where unencoded ID expected.") single_value = trans.security.decode_id(single_value) rval.append(trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(single_value)) if found_hdca: for val in rval: if not isinstance(val, trans.app.model.HistoryDatasetCollectionAssociation): raise ValueError("parameter '%s': if collections are supplied to multiple data input parameter, only collections may be used" % self.name) elif isinstance(value, trans.app.model.HistoryDatasetAssociation): rval = value elif isinstance(value, dict) and 'src' in value and 'id' in value: if value['src'] == 'hda': decoded_id = trans.security.decode_id(value['id']) rval = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(decoded_id) elif value['src'] == 'hdca': decoded_id = trans.security.decode_id(value['id']) rval = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(decoded_id) else: raise ValueError("Unknown input source %s passed to job submission API." % value['src']) elif str(value).startswith("__collection_reduce__|"): encoded_ids = [v[len("__collection_reduce__|"):] for v in str(value).split(",")] decoded_ids = map(trans.security.decode_id, encoded_ids) rval = [] for decoded_id in decoded_ids: hdca = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(decoded_id) rval.append(hdca) elif isinstance(value, trans.app.model.HistoryDatasetCollectionAssociation) or isinstance(value, trans.app.model.DatasetCollectionElement): rval = value else: rval = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get(value) values = util.listify(rval) dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) for v in values: if v: if hasattr(v, "deleted") and v.deleted: raise ValueError("parameter '%s': the previously selected dataset has been deleted." % self.name) elif hasattr(v, "dataset") and v.dataset.state in [galaxy.model.Dataset.states.ERROR, galaxy.model.Dataset.states.DISCARDED]: raise ValueError("parameter '%s': the previously selected dataset has entered an unusable state" % self.name) elif hasattr(v, "dataset"): match = dataset_matcher.hda_match(v) if match and match.implicit_conversion: v.implicit_conversion = True if not self.multiple: if len(values) > 1: raise ValueError("parameter '%s': more than one dataset supplied to single input dataset parameter" % self.name) if len(values) > 0: rval = values[0] else: raise ValueError("parameter '%s': invalid dataset supplied to single input dataset parameter" % self.name) return rval
[docs] def to_param_dict_string(self, value, other_values={}): if value is None: return "None" return value.file_name
[docs] def to_text(self, value): if value and not isinstance(value, list): value = [value] if value: try: return ", ".join(["%s: %s" % (item.hid, item.name) for item in value]) except Exception: pass return "No dataset."
[docs] def validate(self, value, trans=None): dataset_count = 0 for validator in self.validators: def do_validate(v): if validator.requires_dataset_metadata and v and hasattr(v, 'dataset') and v.dataset.state != galaxy.model.Dataset.states.OK: return else: validator.validate(v, trans) if value and self.multiple: if not isinstance(value, list): value = [value] for v in value: if isinstance(v, galaxy.model.HistoryDatasetCollectionAssociation): for dataset_instance in v.collection.dataset_instances: dataset_count += 1 do_validate(dataset_instance) elif isinstance(v, galaxy.model.DatasetCollectionElement): for dataset_instance in v.child_collection.dataset_instances: dataset_count += 1 do_validate(dataset_instance) else: dataset_count += 1 do_validate(v) else: if value: dataset_count += 1 do_validate(value) if self.min is not None: if self.min > dataset_count: raise ValueError("At least %d datasets are required for %s" % (self.min, self.name)) if self.max is not None: if self.max < dataset_count: raise ValueError("At most %d datasets are required for %s" % (self.max, self.name))
[docs] def get_dependencies(self): """ Get the *names* of the other params this param depends on. """ if self.options: return self.options.get_dependency_names() else: return []
[docs] def converter_safe(self, other_values, trans): if self.tool is None or self.tool.has_multiple_pages or not hasattr(trans, 'workflow_building_mode') or trans.workflow_building_mode: return False if other_values is None: return True # we don't know other values, so we can't check, assume ok converter_safe = [True] def visitor(prefix, input, value, parent=None): if isinstance(input, SelectToolParameter) and self.name in input.get_dependencies(): if input.is_dynamic and (input.dynamic_options or (not input.dynamic_options and not input.options) or not input.options.converter_safe): converter_safe[0] = False # This option does not allow for conversion, i.e. uses contents of dataset file to generate options self.tool.visit_inputs(other_values, visitor) return False not in converter_safe
[docs] def get_options_filter_attribute(self, value): # HACK to get around current hardcoded limitation of when a set of dynamic options is defined for a DataToolParameter # it always causes available datasets to be filtered by dbkey # this behavior needs to be entirely reworked (in a backwards compatible manner) options_filter_attribute = self.options_filter_attribute if options_filter_attribute is None: return value.get_dbkey() if options_filter_attribute.endswith("()"): call_attribute = True options_filter_attribute = options_filter_attribute[:-2] else: call_attribute = False ref = value for attribute in options_filter_attribute.split('.'): ref = getattr(ref, attribute) if call_attribute: ref = ref() return ref
[docs] def to_dict(self, trans, other_values={}): # create dictionary and fill default parameters d = super(DataToolParameter, self).to_dict(trans) extensions = self.extensions all_edam_formats = self.datatypes_registry.edam_formats if hasattr(self.datatypes_registry, 'edam_formats') else {} all_edam_data = self.datatypes_registry.edam_data if hasattr(self.datatypes_registry, 'edam_formats') else {} edam_formats = [all_edam_formats.get(ext, None) for ext in extensions] edam_data = [all_edam_data.get(ext, None) for ext in extensions] d['extensions'] = extensions d['edam'] = {'edam_formats': edam_formats, 'edam_data': edam_data} d['multiple'] = self.multiple if self.multiple: # For consistency, should these just always be in the dict? d['min'] = self.min d['max'] = self.max d['options'] = {'hda': [], 'hdca': []} # return dictionary without options if context is unavailable history = trans.history if history is None or trans.workflow_building_mode is workflow_building_modes.ENABLED: return d # prepare dataset/collection matching dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) multiple = self.multiple # build and append a new select option def append(list, hda, name, src, keep=False, subcollection_type=None): value = { 'id' : trans.security.encode_id(hda.id), 'hid' : hda.hid, 'name' : name, 'tags' : [t.user_tname if not t.value else "%s:%s" % (t.user_tname, t.value) for t in hda.tags], 'src' : src, 'keep' : keep } if subcollection_type: value["map_over_type"] = subcollection_type return list.append(value) # add datasets hda_list = util.listify(other_values.get(self.name)) # Prefetch all at once, big list of visible, non-deleted datasets. for hda in history.active_visible_datasets_and_roles: match = dataset_matcher.hda_match(hda) if match: m = match.hda hda_list = [h for h in hda_list if h != m and h != hda] m_name = '%s (as %s)' % (match.original_hda.name, match.target_ext) if match.implicit_conversion else m.name append(d['options']['hda'], m, m_name, 'hda') for hda in hda_list: if hasattr(hda, 'hid'): if hda.deleted: hda_state = 'deleted' elif not hda.visible: hda_state = 'hidden' else: hda_state = 'unavailable' append(d['options']['hda'], hda, '(%s) %s' % (hda_state, hda.name), 'hda', True) # add dataset collections dataset_collection_matcher = dataset_matcher_factory.dataset_collection_matcher(dataset_matcher) for hdca in history.active_visible_dataset_collections: match = dataset_collection_matcher.hdca_match(hdca) if match: subcollection_type = None if multiple and hdca.collection.collection_type != 'list': collection_type_description = self._history_query(trans).can_map_over(hdca) if collection_type_description: subcollection_type = collection_type_description.collection_type else: continue name = hdca.name if match.implicit_conversion: name = "%s (with implicit datatype conversion)" % name append(d['options']['hdca'], hdca, name, 'hdca', subcollection_type=subcollection_type) continue # sort both lists d['options']['hda'] = sorted(d['options']['hda'], key=lambda k: k['hid'], reverse=True) d['options']['hdca'] = sorted(d['options']['hdca'], key=lambda k: k['hid'], reverse=True) # return final dictionary return d
def _history_query(self, trans): assert self.multiple dataset_collection_type_descriptions = trans.app.dataset_collections_service.collection_type_descriptions # If multiple data parameter, treat like a list parameter. return history_query.HistoryQuery.from_collection_type("list", dataset_collection_type_descriptions)
[docs]class DataCollectionToolParameter(BaseDataToolParameter): """ """
[docs] def __init__(self, tool, input_source, trans=None): input_source = ensure_input_source(input_source) super(DataCollectionToolParameter, self).__init__(tool, input_source, trans) self._parse_formats(trans, input_source) collection_types = input_source.get("collection_type", None) if collection_types: collection_types = [t.strip() for t in collection_types.split(",")] self._collection_types = collection_types self.multiple = False # Accessed on DataToolParameter a lot, may want in future self.is_dynamic = True self._parse_options(input_source) # TODO: Review and test.
@property def collection_types(self): return self._collection_types def _history_query(self, trans): dataset_collection_type_descriptions = trans.app.dataset_collections_service.collection_type_descriptions return history_query.HistoryQuery.from_parameter(self, dataset_collection_type_descriptions)
[docs] def match_collections(self, trans, history, dataset_collection_matcher): dataset_collections = trans.app.dataset_collections_service.history_dataset_collections(history, self._history_query(trans)) for dataset_collection_instance in dataset_collections: match = dataset_collection_matcher.hdca_match(dataset_collection_instance) if not match: continue yield dataset_collection_instance, match.implicit_conversion
[docs] def match_multirun_collections(self, trans, history, dataset_collection_matcher): for history_dataset_collection in history.active_visible_dataset_collections: if not self._history_query(trans).can_map_over(history_dataset_collection): continue match = dataset_collection_matcher.hdca_match(history_dataset_collection) if match: yield history_dataset_collection, match.implicit_conversion
[docs] def from_json(self, value, trans, other_values={}): rval = None if trans.workflow_building_mode is workflow_building_modes.ENABLED: return None if not value and not self.optional: raise ValueError("parameter '%s': specify a dataset collection of the correct type" % self.name) if value in [None, "None"]: return None if isinstance(value, dict) and 'values' in value: value = self.to_python(value, trans.app) if isinstance(value, string_types) and value.find(",") > 0: value = [int(value_part) for value_part in value.split(",")] elif isinstance(value, trans.app.model.HistoryDatasetCollectionAssociation): rval = value elif isinstance(value, trans.app.model.DatasetCollectionElement): # When mapping over nested collection - this paramter will recieve # a DatasetCollectionElement instead of a # HistoryDatasetCollectionAssociation. rval = value elif isinstance(value, dict) and 'src' in value and 'id' in value: if value['src'] == 'hdca': rval = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(trans.security.decode_id(value['id'])) elif isinstance(value, list): if len(value) > 0: value = value[0] if isinstance(value, dict) and 'src' in value and 'id' in value: if value['src'] == 'hdca': rval = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(trans.security.decode_id(value['id'])) elif isinstance(value, string_types): if value.startswith("dce:"): rval = trans.sa_session.query(trans.app.model.DatasetCollectionElement).get(value[len("dce:"):]) elif value.startswith("hdca:"): rval = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(value[len("hdca:"):]) else: rval = trans.sa_session.query(trans.app.model.HistoryDatasetCollectionAssociation).get(value) if rval and isinstance(rval, trans.app.model.HistoryDatasetCollectionAssociation): if rval.deleted: raise ValueError("parameter '%s': the previously selected dataset collection has been deleted" % self.name) # TODO: Handle error states, implement error states ... return rval
[docs] def to_text(self, value): try: if isinstance(value, galaxy.model.HistoryDatasetCollectionAssociation): display_text = "%s: %s" % (value.hid, value.name) else: display_text = "Element %d:%s" % (value.identifier_index, value.identifier_name) except AttributeError: display_text = "No dataset collection." return display_text
[docs] def validate(self, value, trans=None): return True # TODO
[docs] def to_dict(self, trans, other_values=None): # create dictionary and fill default parameters other_values = other_values or {} d = super(DataCollectionToolParameter, self).to_dict(trans) d['extensions'] = self.extensions d['multiple'] = self.multiple d['options'] = {'hda': [], 'hdca': []} # return dictionary without options if context is unavailable history = trans.history if history is None or trans.workflow_building_mode is workflow_building_modes.ENABLED: return d # prepare dataset/collection matching dataset_matcher_factory = get_dataset_matcher_factory(trans) dataset_matcher = dataset_matcher_factory.dataset_matcher(self, other_values) dataset_collection_matcher = dataset_matcher_factory.dataset_collection_matcher(dataset_matcher) # append directly matched collections for hdca, implicit_conversion in self.match_collections(trans, history, dataset_collection_matcher): name = hdca.name if implicit_conversion: name = "%s (with implicit datatype conversion)" % name d['options']['hdca'].append({ 'id' : trans.security.encode_id(hdca.id), 'hid' : hdca.hid, 'name' : name, 'src' : 'hdca', 'tags' : [t.user_tname if not t.value else "%s:%s" % (t.user_tname, t.value) for t in hdca.tags] }) # append matching subcollections for hdca, implicit_conversion in self.match_multirun_collections(trans, history, dataset_collection_matcher): subcollection_type = self._history_query(trans).can_map_over(hdca).collection_type name = hdca.name if implicit_conversion: name = "%s (with implicit datatype conversion)" % name d['options']['hdca'].append({ 'id' : trans.security.encode_id(hdca.id), 'hid' : hdca.hid, 'name' : name, 'src' : 'hdca', 'tags' : [t.user_tname if not t.value else "%s:%s" % (t.user_tname, t.value) for t in hdca.tags], 'map_over_type': subcollection_type }) # sort both lists d['options']['hdca'] = sorted(d['options']['hdca'], key=lambda k: k['hid'], reverse=True) # return final dictionary return d
[docs]class HiddenDataToolParameter(HiddenToolParameter, DataToolParameter): """ Hidden parameter that behaves as a DataToolParameter. As with all hidden parameters, this is a HACK. """
[docs] def __init__(self, tool, elem): DataToolParameter.__init__(self, tool, elem) self.value = "None" self.type = "hidden_data" self.hidden = True
[docs]class LibraryDatasetToolParameter(ToolParameter): """ Parameter that lets users select a LDDA from a modal window, then use it within the wrapper. """
[docs] def __init__(self, tool, input_source, context=None): input_source = ensure_input_source(input_source) super(LibraryDatasetToolParameter, self).__init__(tool, input_source) self.multiple = input_source.get_bool('multiple', True)
[docs] def from_json(self, value, trans, other_values={}): return self.to_python(value, trans.app, other_values=other_values, validate=True)
[docs] def to_param_dict_string(self, value, other_values={}): if value is None: return 'None' elif self.multiple: return [dataset.get_file_name() for dataset in value] else: return value[0].get_file_name()
# converts values to json representation: # { id: LibraryDatasetDatasetAssociation.id, name: LibraryDatasetDatasetAssociation.name, src: 'lda' }
[docs] def to_json(self, value, app, use_security): if not isinstance(value, list): value = [value] lst = [] for item in value: lda_id = lda_name = None if isinstance(item, app.model.LibraryDatasetDatasetAssociation): lda_id = app.security.encode_id(item.id) if use_security else item.id lda_name = item.name elif isinstance(item, dict): lda_id = item.get('id') lda_name = item.get('name') else: lst = [] break if lda_id is not None: lst.append({ 'id' : lda_id, 'name' : lda_name, 'src' : 'ldda' }) if len(lst) == 0: return None else: return lst
# converts values into python representation: # LibraryDatasetDatasetAssociation # valid input values (incl. arrays of mixed sets) are: # 1. LibraryDatasetDatasetAssociation # 2. LibraryDatasetDatasetAssociation.id # 3. { id: LibraryDatasetDatasetAssociation.id, ... }
[docs] def to_python(self, value, app, other_values={}, validate=False): if not isinstance(value, list): value = [value] lst = [] for item in value: if isinstance(item, app.model.LibraryDatasetDatasetAssociation): lst.append(item) else: lda_id = None if isinstance(item, dict): lda_id = item.get('id') elif isinstance(item, string_types): lda_id = item else: lst = [] break lda = app.model.context.query(app.model.LibraryDatasetDatasetAssociation).get(lda_id if isinstance(lda_id, int) else app.security.decode_id(lda_id)) if lda is not None: lst.append(lda) elif validate: raise ValueError("parameter '%s': one of the selected library datasets is invalid or not available anymore" % self.name) if len(lst) == 0: if not self.optional and validate: raise ValueError("parameter '%s': invalid library dataset selected" % self.name) return None else: return lst
[docs] def to_dict(self, trans, other_values=None): d = super(LibraryDatasetToolParameter, self).to_dict(trans) d['multiple'] = self.multiple return d
[docs]class BaseJsonToolParameter(ToolParameter): """ Class of parameter that tries to keep values as close to JSON as possible. In particular value_to_basic is overloaded to prevent params_to_strings from double encoding JSON and to_python using loads to produce values. """
[docs] def value_to_basic(self, value, app, use_security=False): if is_runtime_value(value): return runtime_to_json(value) return value
[docs] def to_json(self, value, app, use_security): """Convert a value to a string representation suitable for persisting""" return json.dumps(value)
[docs] def to_python(self, value, app): """Convert a value created with to_json back to an object representation""" return json.loads(value)
[docs]class RulesListToolParameter(BaseJsonToolParameter): """ Parameter that allows for the creation of a list of rules using the Galaxy rules DSL. """
[docs] def __init__(self, tool, input_source, context=None): input_source = ensure_input_source(input_source) BaseJsonToolParameter.__init__(self, tool, input_source) self.data_ref = input_source.get("data_ref", None)
[docs] def to_dict(self, trans, other_values={}): d = ToolParameter.to_dict(self, trans) target_name = self.data_ref if target_name in other_values: target = other_values[target_name] if not is_runtime_value(target): d["target"] = { "src": "hdca" if hasattr(target, "collection") else "hda", "id": trans.app.security.encode_id(target.id), } return d
[docs] def validate(self, value, trans=None): super(RulesListToolParameter, self).validate(value, trans=trans) if not isinstance(value, dict): raise ValueError("No rules specified for rules parameter.") if "rules" not in value: raise ValueError("No rules specified for rules parameter") mappings = value.get("mapping", None) if not mappings: raise ValueError("No column definitions defined for rules parameter.")
[docs] def to_text(self, value): if value: rule_set = RuleSet(value) return rule_set.display else: return ""
parameter_types = dict( text=TextToolParameter, integer=IntegerToolParameter, float=FloatToolParameter, boolean=BooleanToolParameter, genomebuild=GenomeBuildParameter, select=SelectToolParameter, color=ColorToolParameter, group_tag=SelectTagParameter, data_column=ColumnListParameter, hidden=HiddenToolParameter, hidden_data=HiddenDataToolParameter, baseurl=BaseURLToolParameter, file=FileToolParameter, ftpfile=FTPFileToolParameter, genomespacefile=GenomespaceFileToolParameter, data=DataToolParameter, data_collection=DataCollectionToolParameter, library_data=LibraryDatasetToolParameter, rules=RulesListToolParameter, drill_down=DrillDownSelectToolParameter )
[docs]def runtime_to_json(runtime_value): if isinstance(runtime_value, ConnectedValue) or (isinstance(runtime_value, dict) and runtime_value["__class__"] == "ConnectedValue"): return {"__class__": "ConnectedValue"} else: return {"__class__": "RuntimeValue"}
[docs]def runtime_to_object(runtime_value): if isinstance(runtime_value, ConnectedValue) or (isinstance(runtime_value, dict) and runtime_value["__class__"] == "ConnectedValue"): return ConnectedValue() else: return RuntimeValue()
[docs]class RuntimeValue(object): """ Wrapper to note a value that is not yet set, but will be required at runtime. """ pass
[docs]class ConnectedValue(RuntimeValue): """ Wrapper to note a value that is not yet set, but will be inferred from a connection. """ pass