Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.parameters.meta
from __future__ import print_function
import copy
import itertools
import logging
from collections import OrderedDict
from galaxy import (
    exceptions,
    model,
    util
)
from galaxy.model.dataset_collections import matching, subcollections
from galaxy.util import permutations
from . import visit_input_values
log = logging.getLogger(__name__)
[docs]def expand_workflow_inputs(inputs):
    """
    Expands incoming encoded multiple payloads, into the set of all individual payload combinations
    >>> params, param_keys = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}})
    >>> print(["%s" % (p['1']['input']['hid']) for p in params])
    ['1', '2']
    >>> params, param_keys = expand_workflow_inputs({'1': {'input': {'batch': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}})
    >>> print(["%s" % (p['1']['input']['hid']) for p in params])
    ['1', '2']
    >>> params, param_keys = expand_workflow_inputs({'1': {'input': {'batch': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'values': [{'hid': '3'}, {'hid': '4'}] }}})
    >>> print(["%s%s" % (p['1']['input']['hid'], p['2']['input']['hid']) for p in params])
    ['13', '24']
    >>> params, param_keys = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'values': [{'hid': '3'}, {'hid': '4'}, {'hid': '5'}] }}})
    >>> print(["%s%s" % (p['1']['input']['hid'], p['2']['input']['hid']) for p in params])
    ['13', '23', '14', '24', '15', '25']
    >>> params, param_keys = expand_workflow_inputs({'1': {'input': {'batch': True, 'product': True, 'values': [{'hid': '1'}, {'hid': '2'}] }}, '2': {'input': {'batch': True, 'product': True, 'values': [{'hid': '3'}, {'hid': '4'}, {'hid': '5'}] }}, '3': {'input': {'batch': True, 'product': True, 'values': [{'hid': '6'}, {'hid': '7'}, {'hid': '8'}] }}})
    >>> print(["%s%s%s" % (p['1']['input']['hid'], p['2']['input']['hid'], p['3']['input']['hid']) for p in params])
    ['136', '137', '138', '146', '147', '148', '156', '157', '158', '236', '237', '238', '246', '247', '248', '256', '257', '258']
    """
    linked_n = None
    linked = []
    product = []
    linked_keys = []
    product_keys = []
    for step_id, step in sorted(inputs.items()):
        for key, value in sorted(step.items()):
            if isinstance(value, dict) and 'batch' in value and value['batch'] is True and 'values' in value and isinstance(value['values'], list):
                nval = len(value['values'])
                if 'product' in value and value['product'] is True:
                    product.append(value['values'])
                    product_keys.append((step_id, key))
                else:
                    if linked_n is None:
                        linked_n = nval
                    elif linked_n != nval or nval == 0:
                        raise exceptions.RequestParameterInvalidException('Failed to match linked batch selections. Please select equal number of data files.')
                    linked.append(value['values'])
                    linked_keys.append((step_id, key))
    params = []
    params_keys = []
    linked = linked or [[None]]
    product = product or [[None]]
    linked_keys = linked_keys or [(None, None)]
    product_keys = product_keys or [(None, None)]
    for linked_values, product_values in itertools.product(zip(*linked), itertools.product(*product)):
        new_params = copy.deepcopy(inputs)
        new_keys = []
        for (step_id, key), value in list(zip(linked_keys, linked_values)) + list(zip(product_keys, product_values)):
            if step_id is not None:
                new_params[step_id][key] = value
                new_keys.append(str(value['hid']))
        params_keys.append(new_keys)
        params.append(new_params)
    return params, params_keys
[docs]def process_key(incoming_key, incoming_value, d):
    key_parts = incoming_key.split('|')
    if len(key_parts) == 1:
        # Regular parameter
        if incoming_key in d and not incoming_value:
            # In case we get an empty repeat after we already filled in a repeat element
            return
        d[incoming_key] = incoming_value
    elif key_parts[0].rsplit('_', 1)[-1].isdigit():
        # Repeat
        input_name, index = key_parts[0].rsplit('_', 1)
        index = int(index)
        d.setdefault(input_name, [])
        newlist = [{} for _ in range(index + 1)]
        d[input_name].extend(newlist[len(d[input_name]):])
        subdict = d[input_name][index]
        process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)
    else:
        # Section / Conditional
        input_name = key_parts[0]
        subdict = {}
        d[input_name] = subdict
        process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)
[docs]def expand_meta_parameters(trans, tool, incoming):
    """
    Take in a dictionary of raw incoming parameters and expand to a list
    of expanded incoming parameters (one set of parameters per tool
    execution).
    """
    to_remove = []
    for key in incoming.keys():
        if key.endswith("|__identifier__"):
            to_remove.append(key)
    for key in to_remove:
        incoming.pop(key)
    # If we're going to multiply input dataset combinations
    # order matters, so the following reorders incoming
    # according to tool.inputs (which is ordered).
    incoming_copy = incoming.copy()
    nested_dict = {}
    for incoming_key, incoming_value in incoming_copy.items():
        if not incoming_key.startswith('__'):
            process_key(incoming_key, incoming_value=incoming_value, d=nested_dict)
    reordered_incoming = OrderedDict()
    def visitor(input, value, prefix, prefixed_name, prefixed_label, error, **kwargs):
        if prefixed_name in incoming_copy:
            reordered_incoming[prefixed_name] = incoming_copy[prefixed_name]
            del incoming_copy[prefixed_name]
    visit_input_values(inputs=tool.inputs, input_values=nested_dict, callback=visitor)
    reordered_incoming.update(incoming_copy)
    def classifier(input_key):
        value = incoming[input_key]
        if isinstance(value, dict) and 'values' in value:
            # Explicit meta wrapper for inputs...
            is_batch = value.get('batch', False)
            is_linked = value.get('linked', True)
            if is_batch and is_linked:
                classification = permutations.input_classification.MATCHED
            elif is_batch:
                classification = permutations.input_classification.MULTIPLIED
            else:
                classification = permutations.input_classification.SINGLE
            if __collection_multirun_parameter(value):
                collection_value = value['values'][0]
                values = __expand_collection_parameter(trans, input_key, collection_value, collections_to_match, linked=is_linked)
            else:
                values = value['values']
        else:
            classification = permutations.input_classification.SINGLE
            values = value
        return classification, values
    collections_to_match = matching.CollectionsToMatch()
    # Stick an unexpanded version of multirun keys so they can be replaced,
    # by expand_mult_inputs.
    incoming_template = reordered_incoming
    expanded_incomings = permutations.expand_multi_inputs(incoming_template, classifier)
    if collections_to_match.has_collections():
        collection_info = trans.app.dataset_collections_service.match_collections(collections_to_match)
    else:
        collection_info = None
    return expanded_incomings, collection_info
def __expand_collection_parameter(trans, input_key, incoming_val, collections_to_match, linked=False):
    # If subcollectin multirun of data_collection param - value will
    # be "hdca_id|subcollection_type" else it will just be hdca_id
    if "|" in incoming_val:
        encoded_hdc_id, subcollection_type = incoming_val.split("|", 1)
    else:
        try:
            src = incoming_val["src"]
            if src != "hdca":
                raise exceptions.ToolMetaParameterException("Invalid dataset collection source type %s" % src)
            encoded_hdc_id = incoming_val["id"]
            subcollection_type = incoming_val.get('map_over_type', None)
        except TypeError:
            encoded_hdc_id = incoming_val
            subcollection_type = None
    hdc_id = trans.app.security.decode_id(encoded_hdc_id)
    hdc = trans.sa_session.query(model.HistoryDatasetCollectionAssociation).get(hdc_id)
    collections_to_match.add(input_key, hdc, subcollection_type=subcollection_type, linked=linked)
    if subcollection_type is not None:
        subcollection_elements = subcollections.split_dataset_collection_instance(hdc, subcollection_type)
        return subcollection_elements
    else:
        hdas = []
        for element in hdc.collection.dataset_elements:
            hda = element.dataset_instance
            hda.element_identifier = element.element_identifier
            hdas.append(hda)
        return hdas
def __collection_multirun_parameter(value):
    is_batch = value.get('batch', False)
    if not is_batch:
        return False
    batch_values = util.listify(value['values'])
    if len(batch_values) == 1:
        batch_over = batch_values[0]
        if isinstance(batch_over, dict) and ('src' in batch_over) and (batch_over['src'] == 'hdca'):
            return True
    return False