Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.parameters.grouping

"""
Constructs for grouping tool parameters
"""
import io
import logging
import os
import unicodedata

from galaxy.datatypes import sniff
from galaxy.exceptions import (
    AdminRequiredException,
    ConfigDoesNotAllowException,
)
from galaxy.util import (
    asbool,
    inflector,
    relpath,
    sanitize_for_filename
)
from galaxy.util.bunch import Bunch
from galaxy.util.dictifiable import Dictifiable
from galaxy.util.expressions import ExpressionContext

log = logging.getLogger(__name__)
URI_PREFIXES = ["%s://" % x for x in ["http", "https", "ftp", "file", "gxfiles", "gximport", "gxuserimport", "gxftp"]]


[docs]class Group(Dictifiable): dict_collection_visible_keys = ['name', 'type']
[docs] def __init__(self): self.name = None
@property def visible(self): return True
[docs] def value_to_basic(self, value, app, use_security=False): """ Convert value to a (possibly nested) representation using only basic types (dict, list, tuple, string_types, int, long, float, bool, None) """ return value
[docs] def value_from_basic(self, value, app, ignore_errors=False): """ Convert a basic representation as produced by `value_to_basic` back into the preferred value form. """ return value
[docs] def get_initial_value(self, trans, context): """ Return the initial state/value for this group """ raise TypeError("Not implemented")
[docs] def to_dict(self, trans): group_dict = super().to_dict() return group_dict
[docs]class Repeat(Group): dict_collection_visible_keys = ['name', 'type', 'title', 'help', 'default', 'min', 'max'] type = "repeat"
[docs] def __init__(self): Group.__init__(self) self._title = None self.inputs = None self.help = None self.default = 0 self.min = None self.max = None
@property def title(self): return self._title or self.name @title.setter def title(self, value): self._title = value @property def title_plural(self): return inflector.pluralize(self.title)
[docs] def label(self): return "Repeat (%s)" % self.title
[docs] def value_to_basic(self, value, app, use_security=False): rval = [] for d in value: rval_dict = {} # Propogate __index__ if '__index__' in d: rval_dict['__index__'] = d['__index__'] for input in self.inputs.values(): if input.name in d: rval_dict[input.name] = input.value_to_basic(d[input.name], app, use_security) rval.append(rval_dict) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): rval = [] try: for i, d in enumerate(value): rval_dict = {} # If the special __index__ key is not set, create it (for backward # compatibility) rval_dict['__index__'] = d.get('__index__', i) # Restore child inputs for input in self.inputs.values(): if ignore_errors and input.name not in d: # If we do not have a value, and are ignoring errors, we simply # do nothing. There will be no value for the parameter in the # conditional's values dictionary. pass else: rval_dict[input.name] = input.value_from_basic(d[input.name], app, ignore_errors) rval.append(rval_dict) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_initial_value(self, trans, context): rval = [] for i in range(self.default): rval_dict = {'__index__': i} for input in self.inputs.values(): rval_dict[input.name] = input.get_initial_value(trans, context) rval.append(rval_dict) return rval
[docs] def to_dict(self, trans): repeat_dict = super().to_dict(trans) def input_to_dict(input): return input.to_dict(trans) repeat_dict["inputs"] = list(map(input_to_dict, self.inputs.values())) return repeat_dict
[docs]class Section(Group): dict_collection_visible_keys = ['name', 'type', 'title', 'help', 'expanded'] type = "section"
[docs] def __init__(self): Group.__init__(self) self.title = None self.inputs = None self.help = None self.expanded = False
@property def title_plural(self): return inflector.pluralize(self.title)
[docs] def label(self): return "Section (%s)" % self.title
[docs] def value_to_basic(self, value, app, use_security=False): rval = {} for input in self.inputs.values(): if input.name in value: # parameter might be absent in unverified workflow rval[input.name] = input.value_to_basic(value[input.name], app, use_security) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): rval = {} try: for input in self.inputs.values(): if not ignore_errors or input.name in value: rval[input.name] = input.value_from_basic(value[input.name], app, ignore_errors) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_initial_value(self, trans, context): rval = {} child_context = ExpressionContext(rval, context) for child_input in self.inputs.values(): rval[child_input.name] = child_input.get_initial_value(trans, child_context) return rval
[docs] def to_dict(self, trans): section_dict = super().to_dict(trans) def input_to_dict(input): return input.to_dict(trans) section_dict["inputs"] = list(map(input_to_dict, self.inputs.values())) return section_dict
[docs]class UploadDataset(Group): type = "upload_dataset"
[docs] def __init__(self): Group.__init__(self) self.title = None self.inputs = None self.file_type_name = 'file_type' self.default_file_type = 'txt' self.file_type_to_ext = {'auto': self.default_file_type} self.metadata_ref = 'files_metadata'
[docs] def get_composite_dataset_name(self, context): # FIXME: HACK # Special case of using 'base_name' metadata for use as Dataset name needs to be done in a General Fashion, as defined within a particular Datatype. # We get two different types of contexts here, one straight from submitted parameters, the other after being parsed into tool inputs dataset_name = context.get('files_metadata|base_name', None) if dataset_name is None: dataset_name = context.get('files_metadata', {}).get('base_name', None) if dataset_name is None: filenames = list() for composite_file in context.get('files', []): if not composite_file.get('ftp_files', ''): filenames.append((composite_file.get('file_data') or {}).get('filename', '')) else: filenames.append(composite_file.get('ftp_files', [])[0]) dataset_name = os.path.commonprefix(filenames).rstrip('.') or None if dataset_name is None: dataset_name = 'Uploaded Composite Dataset (%s)' % self.get_file_type(context) return dataset_name
[docs] def get_file_base_name(self, context): fd = context.get('files_metadata|base_name', 'Galaxy_Composite_file') return fd
[docs] def get_file_type(self, context, parent_context=None): file_type = context.get(self.file_type_name, None) if file_type == "": if parent_context: file_type = parent_context.get(self.file_type_name, self.default_file_type) else: file_type = self.default_file_type return file_type
[docs] def get_dbkey(self, context, parent_context=None): dbkey = context.get("dbkey", None) if dbkey == "": if parent_context: dbkey = parent_context.get("dbkey", dbkey) return dbkey
[docs] def get_datatype_ext(self, trans, context, parent_context=None): ext = self.get_file_type(context, parent_context=parent_context) if ext in self.file_type_to_ext: ext = self.file_type_to_ext[ext] # when using autodetect, we will use composite info from 'text', i.e. only the main file return ext
[docs] def get_datatype(self, trans, context, parent_context=None): ext = self.get_datatype_ext(trans, context, parent_context=parent_context) return trans.app.datatypes_registry.get_datatype_by_extension(ext)
@property def title_plural(self): return inflector.pluralize(self.title)
[docs] def group_title(self, context): return "{} ({})".format(self.title, context.get(self.file_type_name, self.default_file_type))
[docs] def title_by_index(self, trans, index, context): d_type = self.get_datatype(trans, context) for i, (composite_name, composite_file) in enumerate(d_type.writable_files.items()): if i == index: rval = composite_name if composite_file.description: rval = f"{rval} ({composite_file.description})" if composite_file.optional: rval = "%s [optional]" % rval return rval if index < self.get_file_count(trans, context): return "Extra primary file" return None
[docs] def value_to_basic(self, value, app, use_security=False): rval = [] for d in value: rval_dict = {} # Propogate __index__ if '__index__' in d: rval_dict['__index__'] = d['__index__'] for input in self.inputs.values(): if input.name in d: rval_dict[input.name] = input.value_to_basic(d[input.name], app, use_security) rval.append(rval_dict) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): rval = [] for i, d in enumerate(value): try: rval_dict = {} # If the special __index__ key is not set, create it (for backward # compatibility) rval_dict['__index__'] = d.get('__index__', i) # Restore child inputs for input in self.inputs.values(): if ignore_errors and input.name not in d: # this wasn't tested rval_dict[input.name] = input.get_initial_value(None, d) else: rval_dict[input.name] = input.value_from_basic(d[input.name], app, ignore_errors) rval.append(rval_dict) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_file_count(self, trans, context): file_count = context.get("file_count", "auto") if file_count == "auto": d_type = self.get_datatype(trans, context) return len(d_type.writable_files) if d_type else 1 else: return int(file_count)
[docs] def get_initial_value(self, trans, context): file_count = self.get_file_count(trans, context) rval = [] for i in range(file_count): rval_dict = {} rval_dict['__index__'] = i # create __index__ for input in self.inputs.values(): rval_dict[input.name] = input.get_initial_value(trans, context) rval.append(rval_dict) return rval
[docs] def get_uploaded_datasets(self, trans, context, override_name=None, override_info=None): def get_data_file_filename(data_file, override_name=None, override_info=None, purge=True): dataset_name = override_name def get_file_name(file_name): file_name = file_name.split('\\')[-1] file_name = file_name.split('/')[-1] return file_name try: # Use the existing file if not dataset_name and 'filename' in data_file: dataset_name = get_file_name(data_file['filename']) return Bunch(type='file', path=data_file['local_filename'], name=dataset_name, purge_source=purge) except Exception: # The uploaded file should've been persisted by the upload tool action return Bunch(type=None, path=None, name=None) def get_url_paste_urls_or_filename(group_incoming, override_name=None, override_info=None): url_paste_file = group_incoming.get('url_paste', None) if url_paste_file is not None: url_paste = open(url_paste_file).read() def start_of_url(content): start_of_url_paste = content.lstrip()[0:10].lower() looks_like_url = False for url_prefix in URI_PREFIXES: if start_of_url_paste.startswith(url_prefix): looks_like_url = True break return looks_like_url if start_of_url(url_paste): url_paste = url_paste.replace('\r', '').split('\n') for line in url_paste: line = line.strip() if line: if not start_of_url(line): continue # non-url line, ignore if "file://" in line: if not trans.user_is_admin: raise AdminRequiredException() elif not trans.app.config.allow_path_paste: raise ConfigDoesNotAllowException() upload_path = line[len("file://"):] dataset_name = os.path.basename(upload_path) else: dataset_name = line if override_name: dataset_name = override_name yield Bunch(type='url', path=line, name=dataset_name) else: dataset_name = 'Pasted Entry' # we need to differentiate between various url pastes here if override_name: dataset_name = override_name yield Bunch(type='file', path=url_paste_file, name=dataset_name) def get_one_filename(context): data_file = context['file_data'] url_paste = context['url_paste'] ftp_files = context['ftp_files'] name = context.get('NAME', None) info = context.get('INFO', None) uuid = context.get('uuid', None) or None # Turn '' to None file_type = context.get('file_type', None) dbkey = self.get_dbkey(context) warnings = [] to_posix_lines = False if context.get('to_posix_lines', None) not in ["None", None, False]: to_posix_lines = True auto_decompress = False if context.get('auto_decompress', None) not in ["None", None, False]: auto_decompress = True space_to_tab = False if context.get('space_to_tab', None) not in ["None", None, False]: space_to_tab = True file_bunch = get_data_file_filename(data_file, override_name=name, override_info=info) if file_bunch.path: if url_paste is not None and url_paste.strip(): warnings.append("All file contents specified in the paste box were ignored.") if ftp_files: warnings.append("All FTP uploaded file selections were ignored.") elif url_paste is not None and url_paste.strip(): # we need to use url_paste for file_bunch in get_url_paste_urls_or_filename(context, override_name=name, override_info=info): if file_bunch.path: break if file_bunch.path and ftp_files is not None: warnings.append("All FTP uploaded file selections were ignored.") elif ftp_files is not None and trans.user is not None: # look for files uploaded via FTP user_ftp_dir = trans.user_ftp_dir assert not os.path.islink(user_ftp_dir), "User FTP directory cannot be a symbolic link" for dirpath, _dirnames, filenames in os.walk(user_ftp_dir): for filename in filenames: for ftp_filename in ftp_files: if ftp_filename == filename: path = relpath(os.path.join(dirpath, filename), user_ftp_dir) if not os.path.islink(os.path.join(dirpath, filename)): ftp_data_file = {'local_filename': os.path.abspath(os.path.join(user_ftp_dir, path)), 'filename': os.path.basename(path)} purge = getattr(trans.app.config, 'ftp_upload_purge', True) file_bunch = get_data_file_filename( ftp_data_file, override_name=name, override_info=info, purge=purge, ) if file_bunch.path: break if file_bunch.path: break if file_bunch.path: break file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab file_bunch.uuid = uuid if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey return file_bunch, warnings def get_filenames(context): rval = [] data_file = context['file_data'] ftp_files = context['ftp_files'] uuid = context.get('uuid', None) or None # Turn '' to None name = context.get('NAME', None) info = context.get('INFO', None) file_type = context.get('file_type', None) dbkey = self.get_dbkey(context) to_posix_lines = False if context.get('to_posix_lines', None) not in ["None", None, False]: to_posix_lines = True auto_decompress = False if context.get('auto_decompress', None) not in ["None", None, False]: auto_decompress = True space_to_tab = False if context.get('space_to_tab', None) not in ["None", None, False]: space_to_tab = True file_bunch = get_data_file_filename(data_file, override_name=name, override_info=info) file_bunch.uuid = uuid if file_bunch.path: file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey rval.append(file_bunch) for file_bunch in get_url_paste_urls_or_filename(context, override_name=name, override_info=info): if file_bunch.path: file_bunch.uuid = uuid file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey rval.append(file_bunch) # look for files uploaded via FTP valid_files = [] if ftp_files is not None: # Normalize input paths to ensure utf-8 encoding is normal form c. # This allows for comparison when the filesystem uses a different encoding than the browser. ftp_files = [unicodedata.normalize('NFC', f) for f in ftp_files if isinstance(f, str)] if trans.user is None: log.warning('Anonymous user passed values in ftp_files: %s' % ftp_files) ftp_files = [] # TODO: warning to the user (could happen if session has become invalid) else: user_ftp_dir = trans.user_ftp_dir assert not os.path.islink(user_ftp_dir), "User FTP directory cannot be a symbolic link" for dirpath, _dirnames, filenames in os.walk(user_ftp_dir): for filename in filenames: path = relpath(os.path.join(dirpath, filename), user_ftp_dir) if not os.path.islink(os.path.join(dirpath, filename)): # Normalize filesystem paths if isinstance(path, str): valid_files.append(unicodedata.normalize('NFC', path)) else: valid_files.append(path) else: ftp_files = [] for ftp_file in ftp_files: if ftp_file not in valid_files: log.warning('User passed an invalid file path in ftp_files: %s' % ftp_file) continue # TODO: warning to the user (could happen if file is already imported) ftp_data_file = {'local_filename': os.path.abspath(os.path.join(user_ftp_dir, ftp_file)), 'filename': os.path.basename(ftp_file)} purge = getattr(trans.app.config, 'ftp_upload_purge', True) file_bunch = get_data_file_filename(ftp_data_file, override_name=name, override_info=info, purge=purge) if file_bunch.path: file_bunch.to_posix_lines = to_posix_lines file_bunch.auto_decompress = auto_decompress file_bunch.space_to_tab = space_to_tab if file_type is not None: file_bunch.file_type = file_type if dbkey is not None: file_bunch.dbkey = dbkey rval.append(file_bunch) return rval file_type = self.get_file_type(context) file_count = self.get_file_count(trans, context) d_type = self.get_datatype(trans, context) dbkey = self.get_dbkey(context) tag_using_filenames = context.get('tag_using_filenames', False) tags = context.get('tags', False) force_composite = asbool(context.get('force_composite', 'False')) writable_files = d_type.writable_files writable_files_offset = 0 groups_incoming = [None for _ in range(file_count)] for group_incoming in context.get(self.name, []): i = int(group_incoming['__index__']) groups_incoming[i] = group_incoming if d_type.composite_type is not None or force_composite: # handle uploading of composite datatypes # Only one Dataset can be created dataset = Bunch() dataset.type = 'composite' dataset.file_type = file_type dataset.dbkey = dbkey dataset.datatype = d_type dataset.warnings = [] dataset.metadata = {} dataset.composite_files = {} dataset.uuid = None dataset.tag_using_filenames = None dataset.tags = None # load metadata files_metadata = context.get(self.metadata_ref, {}) metadata_name_substition_default_dict = {composite_file.substitute_name_with_metadata: d_type.metadata_spec[composite_file.substitute_name_with_metadata].default for composite_file in d_type.composite_files.values() if composite_file.substitute_name_with_metadata} for meta_name, meta_spec in d_type.metadata_spec.items(): if meta_spec.set_in_upload: if meta_name in files_metadata: meta_value = files_metadata[meta_name] if meta_name in metadata_name_substition_default_dict: meta_value = sanitize_for_filename(meta_value, default=metadata_name_substition_default_dict[meta_name]) dataset.metadata[meta_name] = meta_value dataset.name = self.get_composite_dataset_name(context) if dataset.datatype.composite_type == 'auto_primary_file': # replace sniff here with just creating an empty file temp_name = sniff.stream_to_file(io.StringIO(d_type.generate_primary_file(dataset)), prefix='upload_auto_primary_file') dataset.primary_file = temp_name dataset.to_posix_lines = True dataset.auto_decompress = True dataset.space_to_tab = False else: file_bunch, warnings = get_one_filename(groups_incoming[0]) writable_files_offset = 1 dataset.primary_file = file_bunch.path dataset.to_posix_lines = file_bunch.to_posix_lines dataset.auto_decompress = file_bunch.auto_decompress dataset.space_to_tab = file_bunch.space_to_tab if file_bunch.file_type: dataset.file_type = file_type if file_bunch.dbkey: dataset.dbkey = dbkey dataset.warnings.extend(warnings) if dataset.primary_file is None: # remove this before finish, this should create an empty dataset raise Exception('No primary dataset file was available for composite upload') if not force_composite: keys = [value.name for value in writable_files.values()] else: keys = [str(index) for index in range(file_count)] for i, group_incoming in enumerate(groups_incoming[writable_files_offset:]): key = keys[i + writable_files_offset] if not force_composite and group_incoming is None and not writable_files[list(writable_files.keys())[keys.index(key)]].optional: dataset.warnings.append("A required composite file (%s) was not specified." % (key)) dataset.composite_files[key] = None else: file_bunch, warnings = get_one_filename(group_incoming) dataset.warnings.extend(warnings) if file_bunch.path: if force_composite: key = group_incoming.get("NAME") or i dataset.composite_files[key] = file_bunch.__dict__ elif not force_composite: dataset.composite_files[key] = None if not writable_files[list(writable_files.keys())[keys.index(key)]].optional: dataset.warnings.append("A required composite file (%s) was not specified." % (key)) return [dataset] else: rval = [] for i, file_contexts in enumerate(context[self.name]): datasets = get_filenames(file_contexts) for dataset in datasets: override_file_type = self.get_file_type(context[self.name][i], parent_context=context) d_type = self.get_datatype(trans, context[self.name][i], parent_context=context) dataset.file_type = override_file_type dataset.datatype = d_type dataset.ext = self.get_datatype_ext(trans, context[self.name][i], parent_context=context) dataset.dbkey = self.get_dbkey(context[self.name][i], parent_context=context) dataset.tag_using_filenames = tag_using_filenames dataset.tags = tags rval.append(dataset) return rval
[docs]class Conditional(Group): type = "conditional"
[docs] def __init__(self): Group.__init__(self) self.test_param = None self.cases = [] self.value_ref = None self.value_ref_in_group = True # When our test_param is not part of the conditional Group, this is False
@property def label(self): return "Conditional (%s)" % self.name
[docs] def get_current_case(self, value): # Convert value to user representation str_value = self.test_param.to_param_dict_string(value) # Find the matching case for index, case in enumerate(self.cases): if str_value == case.value: return index raise ValueError("No case matched value:", self.name, str_value)
[docs] def value_to_basic(self, value, app, use_security=False): rval = dict() rval[self.test_param.name] = self.test_param.value_to_basic(value[self.test_param.name], app) current_case = rval['__current_case__'] = self.get_current_case(value[self.test_param.name]) for input in self.cases[current_case].inputs.values(): if input.name in value: # parameter might be absent in unverified workflow rval[input.name] = input.value_to_basic(value[input.name], app, use_security=use_security) return rval
[docs] def value_from_basic(self, value, app, ignore_errors=False): rval = dict() try: rval[self.test_param.name] = self.test_param.value_from_basic(value.get(self.test_param.name), app, ignore_errors) current_case = rval['__current_case__'] = self.get_current_case(rval[self.test_param.name]) # Inputs associated with current case for input in self.cases[current_case].inputs.values(): # If we do not have a value, and are ignoring errors, we simply # do nothing. There will be no value for the parameter in the # conditional's values dictionary. if not ignore_errors or input.name in value: rval[input.name] = input.value_from_basic(value[input.name], app, ignore_errors) except Exception as e: if not ignore_errors: raise e return rval
[docs] def get_initial_value(self, trans, context): # State for a conditional is a plain dictionary. rval = {} # Get the default value for the 'test element' and use it # to determine the current case test_value = self.test_param.get_initial_value(trans, context) current_case = self.get_current_case(test_value) # Store the current case in a special value rval['__current_case__'] = current_case # Store the value of the test element rval[self.test_param.name] = test_value # Fill in state for selected case child_context = ExpressionContext(rval, context) for child_input in self.cases[current_case].inputs.values(): rval[child_input.name] = child_input.get_initial_value(trans, child_context) return rval
[docs] def to_dict(self, trans): cond_dict = super().to_dict(trans) def nested_to_dict(input): return input.to_dict(trans) cond_dict["cases"] = list(map(nested_to_dict, self.cases)) cond_dict["test_param"] = nested_to_dict(self.test_param) return cond_dict
[docs]class ConditionalWhen(Dictifiable): dict_collection_visible_keys = ['value']
[docs] def __init__(self): self.value = None self.inputs = None
[docs] def to_dict(self, trans): when_dict = super().to_dict() def input_to_dict(input): return input.to_dict(trans) when_dict["inputs"] = list(map(input_to_dict, self.inputs.values())) return when_dict