Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.cwl.representation

""" This module is responsible for converting between Galaxy's tool
input description and the CWL description for a job json. """

import collections
import json
import logging
import os

from six import string_types

from galaxy.exceptions import RequestParameterInvalidException
from galaxy.util import safe_makedirs, string_as_bool
from galaxy.util.bunch import Bunch
from .util import set_basename_and_derived_properties

log = logging.getLogger(__name__)

NOT_PRESENT = object()

NO_GALAXY_INPUT = object()

INPUT_TYPE = Bunch(
    DATA="data",
    INTEGER="integer",
    FLOAT="float",
    TEXT="text",
    BOOLEAN="boolean",
    SELECT="select",
    FIELD="field",
    CONDITIONAL="conditional",
    DATA_COLLECTON="data_collection",
)

# There are two approaches to mapping CWL tool state to Galaxy tool state
# one is to map CWL types to compound Galaxy tool parameters combinations
# with conditionals and the other is to use a new Galaxy parameter type that
# allows unions, optional specifications, etc.... The problem with the former
# is that it doesn't work with the workflow parameters for instance and is
# very complex on the backend. The problem with the latter is that the GUI
# for this parameter type is undefined curently.
USE_FIELD_TYPES = True

# There are two approaches to mapping CWL workflow inputs to Galaxy workflow
# steps. The first is to simply map everything to expressions and stick them into
# files and use data inputs - the second is to use parameter_input steps with
# fields types. We are dispatching on USE_FIELD_TYPES for now - to choose but
# may diverge later?
# There are open issues with each approach:
#  - Mapping everything to files makes the GUI harder to imagine but the backend
#     easier to manage in someways.
USE_STEP_PARAMETERS = USE_FIELD_TYPES

TypeRepresentation = collections.namedtuple("TypeRepresentation", ["name", "galaxy_param_type", "label", "collection_type"])
TYPE_REPRESENTATIONS = [
    TypeRepresentation("null", NO_GALAXY_INPUT, "no input", None),
    TypeRepresentation("integer", INPUT_TYPE.INTEGER, "an integer", None),
    TypeRepresentation("float", INPUT_TYPE.FLOAT, "a decimal number", None),
    TypeRepresentation("double", INPUT_TYPE.FLOAT, "a decimal number", None),
    TypeRepresentation("file", INPUT_TYPE.DATA, "a dataset", None),
    TypeRepresentation("directory", INPUT_TYPE.DATA, "a directory", None),
    TypeRepresentation("boolean", INPUT_TYPE.BOOLEAN, "a boolean", None),
    TypeRepresentation("text", INPUT_TYPE.TEXT, "a simple text field", None),
    TypeRepresentation("record", INPUT_TYPE.DATA_COLLECTON, "record as a dataset collection", "record"),
    TypeRepresentation("json", INPUT_TYPE.TEXT, "arbitrary JSON structure", None),
    TypeRepresentation("array", INPUT_TYPE.DATA_COLLECTON, "as a dataset list", "list"),
    TypeRepresentation("enum", INPUT_TYPE.TEXT, "enum value", None),  # TODO: make this a select...
    TypeRepresentation("field", INPUT_TYPE.FIELD, "arbitrary JSON structure", None),
]
FIELD_TYPE_REPRESENTATION = TYPE_REPRESENTATIONS[-1]
TypeRepresentation.uses_param = lambda self: self.galaxy_param_type is not NO_GALAXY_INPUT

if not USE_FIELD_TYPES:
    CWL_TYPE_TO_REPRESENTATIONS = {
        "Any": ["integer", "float", "file", "boolean", "text", "record", "json"],
        "array": ["array"],
        "string": ["text"],
        "boolean": ["boolean"],
        "int": ["integer"],
        "float": ["float"],
        "File": ["file"],
        "Directory": ["directory"],
        "null": ["null"],
        "record": ["record"],
    }
else:
    CWL_TYPE_TO_REPRESENTATIONS = {
        "Any": ["field"],
        "array": ["array"],
        "string": ["text"],
        "boolean": ["boolean"],
        "int": ["integer"],
        "float": ["float"],
        "File": ["file"],
        "Directory": ["directory"],
        "null": ["null"],
        "record": ["record"],
        "enum": ["enum"],
        "double": ["double"],
    }


[docs]def type_representation_from_name(type_representation_name): for type_representation in TYPE_REPRESENTATIONS: if type_representation.name == type_representation_name: return type_representation assert False
[docs]def type_descriptions_for_field_types(field_types): type_representation_names = set([]) for field_type in field_types: if isinstance(field_type, dict) and field_type.get("type"): field_type = field_type.get("type") try: type_representation_names_for_field_type = CWL_TYPE_TO_REPRESENTATIONS.get(field_type) except TypeError: raise Exception("Failed to convert field_type %s" % field_type) assert type_representation_names_for_field_type is not None, field_type type_representation_names.update(type_representation_names_for_field_type) type_representations = [] for type_representation in TYPE_REPRESENTATIONS: if type_representation.name in type_representation_names: type_representations.append(type_representation) return type_representations
[docs]def dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper): if dataset_wrapper.ext == "expression.json": with open(dataset_wrapper.file_name, "r") as f: return json.load(f) if dataset_wrapper.ext == "directory": return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper) extra_files_path = dataset_wrapper.extra_files_path secondary_files_path = os.path.join(extra_files_path, "__secondary_files__") path = str(dataset_wrapper) raw_file_object = {"class": "File"} if os.path.exists(secondary_files_path): safe_makedirs(inputs_dir) name = os.path.basename(path) new_input_path = os.path.join(inputs_dir, name) os.symlink(path, new_input_path) secondary_files = [] for secondary_file_name in os.listdir(secondary_files_path): secondary_file_path = os.path.join(secondary_files_path, secondary_file_name) target = os.path.join(inputs_dir, secondary_file_name) log.info("linking [%s] to [%s]" % (secondary_file_path, target)) os.symlink(secondary_file_path, target) is_dir = os.path.isdir(os.path.realpath(secondary_file_path)) secondary_files.append({"class": "File" if not is_dir else "Directory", "location": target}) raw_file_object["secondaryFiles"] = secondary_files path = new_input_path raw_file_object["location"] = path raw_file_object["size"] = int(dataset_wrapper.get_size()) set_basename_and_derived_properties(raw_file_object, str(dataset_wrapper.cwl_filename or dataset_wrapper.name)) return raw_file_object
[docs]def dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper): assert dataset_wrapper.ext == "directory" return {"location": dataset_wrapper.extra_files_path, "class": "Directory"}
[docs]def collection_wrapper_to_array(inputs_dir, wrapped_value): rval = [] for value in wrapped_value: rval.append(dataset_wrapper_to_file_json(inputs_dir, value)) return rval
[docs]def collection_wrapper_to_record(inputs_dir, wrapped_value): rval = dict() # TODO: THIS NEEDS TO BE ORDERED BUT odict not json serializable! for key, value in wrapped_value.items(): rval[key] = dataset_wrapper_to_file_json(inputs_dir, value) return rval
[docs]def to_cwl_job(tool, param_dict, local_working_directory): """ tool is Galaxy's representation of the tool and param_dict is the parameter dictionary with wrapped values. """ tool_proxy = tool._cwl_tool_proxy input_fields = tool_proxy.input_fields() inputs = tool.inputs input_json = {} inputs_dir = os.path.join(local_working_directory, "_inputs") def simple_value(input, param_dict_value, type_representation_name=None): type_representation = type_representation_from_name(type_representation_name) # Hmm... cwl_type isn't really the cwl type in every case, # like in the case of json for instance. if type_representation.galaxy_param_type == NO_GALAXY_INPUT: assert param_dict_value is None return None if type_representation.name == "file": dataset_wrapper = param_dict_value return dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper) elif type_representation.name == "directory": dataset_wrapper = param_dict_value return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper) elif type_representation.name == "integer": return int(str(param_dict_value)) elif type_representation.name == "long": return int(str(param_dict_value)) elif type_representation.name in ["float", "double"]: return float(str(param_dict_value)) elif type_representation.name == "boolean": return string_as_bool(param_dict_value) elif type_representation.name == "text": return str(param_dict_value) elif type_representation.name == "enum": return str(param_dict_value) elif type_representation.name == "json": raw_value = param_dict_value.value return json.loads(raw_value) elif type_representation.name == "field": if param_dict_value is None: return None if hasattr(param_dict_value, "value"): # Is InputValueWrapper return param_dict_value.value["value"] elif not param_dict_value.is_collection: # Is DatasetFilenameWrapper return dataset_wrapper_to_file_json(inputs_dir, param_dict_value) else: # Is DatasetCollectionWrapper hdca_wrapper = param_dict_value if hdca_wrapper.collection_type == "list": # TODO: generalize to lists of lists and lists of non-files... return collection_wrapper_to_array(inputs_dir, hdca_wrapper) elif hdca_wrapper.collection_type.collection_type == "record": return collection_wrapper_to_record(inputs_dir, hdca_wrapper) elif type_representation.name == "array": # TODO: generalize to lists of lists and lists of non-files... return collection_wrapper_to_array(inputs_dir, param_dict_value) elif type_representation.name == "record": return collection_wrapper_to_record(inputs_dir, param_dict_value) else: return str(param_dict_value) for input_name, input in inputs.items(): if input.type == "repeat": only_input = next(iter(input.inputs.values())) array_value = [] for instance in param_dict[input_name]: array_value.append(simple_value(only_input, instance[input_name[:-len("_repeat")]])) input_json[input_name[:-len("_repeat")]] = array_value elif input.type == "conditional": assert input_name in param_dict, "No value for %s in %s" % (input_name, param_dict) current_case = param_dict[input_name]["_cwl__type_"] if str(current_case) != "null": # str because it is a wrapped... case_index = input.get_current_case(current_case) case_input = input.cases[case_index].inputs["_cwl__value_"] case_value = param_dict[input_name]["_cwl__value_"] input_json[input_name] = simple_value(case_input, case_value, current_case) else: matched_field = None for field in input_fields: if field["name"] == input_name: matched_field = field field_type = field_to_field_type(matched_field) if isinstance(field_type, list): assert USE_FIELD_TYPES type_descriptions = [FIELD_TYPE_REPRESENTATION] else: type_descriptions = type_descriptions_for_field_types([field_type]) assert len(type_descriptions) == 1 type_description_name = type_descriptions[0].name input_json[input_name] = simple_value(input, param_dict[input_name], type_description_name) log.debug("Galaxy Tool State is CWL State is %s" % input_json) return input_json
[docs]def to_galaxy_parameters(tool, as_dict): """ Tool is Galaxy's representation of the tool and as_dict is a Galaxified representation of the input json (no paths, HDA references for instance). """ inputs = tool.inputs galaxy_request = {} def from_simple_value(input, param_dict_value, type_representation_name=None): if type_representation_name == "json": return json.dumps(param_dict_value) else: return param_dict_value for input_name, input in inputs.items(): as_dict_value = as_dict.get(input_name, NOT_PRESENT) galaxy_input_type = input.type if galaxy_input_type == "repeat": if input_name not in as_dict: continue only_input = next(iter(input.inputs.values())) for index, value in enumerate(as_dict_value): key = "%s_repeat_0|%s" % (input_name, only_input.name) galaxy_value = from_simple_value(only_input, value) galaxy_request[key] = galaxy_value elif galaxy_input_type == "conditional": case_strings = input.case_strings # TODO: less crazy handling of defaults... if (as_dict_value is NOT_PRESENT or as_dict_value is None) and "null" in case_strings: type_representation_name = "null" elif (as_dict_value is NOT_PRESENT or as_dict_value is None): raise RequestParameterInvalidException( "Cannot translate CWL datatype - value [%s] of type [%s] with case_strings [%s]. Non-null property must be set." % ( as_dict_value, type(as_dict_value), case_strings ) ) elif isinstance(as_dict_value, bool) and "boolean" in case_strings: type_representation_name = "boolean" elif isinstance(as_dict_value, int) and "integer" in case_strings: type_representation_name = "integer" elif isinstance(as_dict_value, int) and "long" in case_strings: type_representation_name = "long" elif isinstance(as_dict_value, (int, float)) and "float" in case_strings: type_representation_name = "float" elif isinstance(as_dict_value, (int, float)) and "double" in case_strings: type_representation_name = "double" elif isinstance(as_dict_value, string_types) and "string" in case_strings: type_representation_name = "string" elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "file" in case_strings: type_representation_name = "file" elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "directory" in case_strings: # TODO: can't disambiuate with above if both are available... type_representation_name = "directory" elif "field" in case_strings: type_representation_name = "field" elif "json" in case_strings and as_dict_value is not None: type_representation_name = "json" else: raise RequestParameterInvalidException( "Cannot translate CWL datatype - value [%s] of type [%s] with case_strings [%s]." % ( as_dict_value, type(as_dict_value), case_strings ) ) galaxy_request["%s|_cwl__type_" % input_name] = type_representation_name if type_representation_name != "null": current_case_index = input.get_current_case(type_representation_name) current_case_inputs = input.cases[current_case_index].inputs current_case_input = current_case_inputs["_cwl__value_"] galaxy_value = from_simple_value(current_case_input, as_dict_value, type_representation_name) galaxy_request["%s|_cwl__value_" % input_name] = galaxy_value elif as_dict_value is NOT_PRESENT: continue else: galaxy_value = from_simple_value(input, as_dict_value) galaxy_request[input_name] = galaxy_value log.info("Converted galaxy_request is %s" % galaxy_request) return galaxy_request
[docs]def field_to_field_type(field): field_type = field["type"] if isinstance(field_type, dict): field_type = field_type["type"] if isinstance(field_type, list): field_type_length = len(field_type) if field_type_length == 0: raise Exception("Zero-length type list encountered, invalid CWL?") elif len(field_type) == 1: field_type = field_type[0] return field_type