Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.cwl.representation

""" This module is responsible for converting between Galaxy's tool
input description and the CWL description for a job json. """

import json
import logging
import os
from enum import Enum
from typing import Any, NamedTuple, Optional

from galaxy.exceptions import RequestParameterInvalidException
from galaxy.util import safe_makedirs, string_as_bool
from .util import set_basename_and_derived_properties


log = logging.getLogger(__name__)

NOT_PRESENT = object()

NO_GALAXY_INPUT = object()


[docs]class INPUT_TYPE(str, Enum): DATA = "data" INTEGER = "integer" FLOAT = "float" TEXT = "text" BOOLEAN = "boolean" SELECT = "select" FIELD = "field" CONDITIONAL = "conditional" DATA_COLLECTON = "data_collection"
# There are two approaches to mapping CWL tool state to Galaxy tool state # one is to map CWL types to compound Galaxy tool parameters combinations # with conditionals and the other is to use a new Galaxy parameter type that # allows unions, optional specifications, etc.... The problem with the former # is that it doesn't work with the workflow parameters for instance and is # very complex on the backend. The problem with the latter is that the GUI # for this parameter type is undefined curently. USE_FIELD_TYPES = True # There are two approaches to mapping CWL workflow inputs to Galaxy workflow # steps. The first is to simply map everything to expressions and stick them into # files and use data inputs - the second is to use parameter_input steps with # fields types. We are dispatching on USE_FIELD_TYPES for now - to choose but # may diverge later? # There are open issues with each approach: # - Mapping everything to files makes the GUI harder to imagine but the backend # easier to manage in someways. USE_STEP_PARAMETERS = USE_FIELD_TYPES
[docs]class TypeRepresentation(NamedTuple): name: str galaxy_param_type: Any label: str collection_type: Optional[str] @property def uses_param(self): return self.galaxy_param_type is not NO_GALAXY_INPUT
TYPE_REPRESENTATIONS = [ TypeRepresentation("null", NO_GALAXY_INPUT, "no input", None), TypeRepresentation("integer", INPUT_TYPE.INTEGER, "an integer", None), TypeRepresentation("float", INPUT_TYPE.FLOAT, "a decimal number", None), TypeRepresentation("double", INPUT_TYPE.FLOAT, "a decimal number", None), TypeRepresentation("file", INPUT_TYPE.DATA, "a dataset", None), TypeRepresentation("directory", INPUT_TYPE.DATA, "a directory", None), TypeRepresentation("boolean", INPUT_TYPE.BOOLEAN, "a boolean", None), TypeRepresentation("text", INPUT_TYPE.TEXT, "a simple text field", None), TypeRepresentation("record", INPUT_TYPE.DATA_COLLECTON, "record as a dataset collection", "record"), TypeRepresentation("json", INPUT_TYPE.TEXT, "arbitrary JSON structure", None), TypeRepresentation("array", INPUT_TYPE.DATA_COLLECTON, "as a dataset list", "list"), TypeRepresentation("enum", INPUT_TYPE.TEXT, "enum value", None), # TODO: make this a select... TypeRepresentation("field", INPUT_TYPE.FIELD, "arbitrary JSON structure", None), ] FIELD_TYPE_REPRESENTATION = TYPE_REPRESENTATIONS[-1] if not USE_FIELD_TYPES: CWL_TYPE_TO_REPRESENTATIONS = { "Any": ["integer", "float", "file", "boolean", "text", "record", "json"], "array": ["array"], "string": ["text"], "boolean": ["boolean"], "int": ["integer"], "float": ["float"], "File": ["file"], "Directory": ["directory"], "null": ["null"], "record": ["record"], } else: CWL_TYPE_TO_REPRESENTATIONS = { "Any": ["field"], "array": ["array"], "string": ["text"], "boolean": ["boolean"], "int": ["integer"], "float": ["float"], "File": ["file"], "Directory": ["directory"], "null": ["null"], "record": ["record"], "enum": ["enum"], "double": ["double"], }
[docs]def type_representation_from_name(type_representation_name): for type_representation in TYPE_REPRESENTATIONS: if type_representation.name == type_representation_name: return type_representation else: raise ValueError(f"No type representation for {type_representation_name}")
[docs]def type_descriptions_for_field_types(field_types): type_representation_names = set() for field_type in field_types: if isinstance(field_type, dict) and field_type.get("type"): field_type = field_type.get("type") try: type_representation_names_for_field_type = CWL_TYPE_TO_REPRESENTATIONS.get(field_type) except TypeError: raise Exception("Failed to convert field_type %s" % field_type) if type_representation_names_for_field_type is None: raise Exception("Failed to convert type %s" % field_type) type_representation_names.update(type_representation_names_for_field_type) type_representations = [] for type_representation in TYPE_REPRESENTATIONS: if type_representation.name in type_representation_names: type_representations.append(type_representation) return type_representations
[docs]def dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper): if dataset_wrapper.ext == "expression.json": with open(dataset_wrapper.file_name) as f: return json.load(f) if dataset_wrapper.ext == "directory": return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper) extra_files_path = dataset_wrapper.extra_files_path secondary_files_path = os.path.join(extra_files_path, "__secondary_files__") path = str(dataset_wrapper) raw_file_object = {"class": "File"} if os.path.exists(secondary_files_path): safe_makedirs(inputs_dir) name = os.path.basename(path) new_input_path = os.path.join(inputs_dir, name) os.symlink(path, new_input_path) secondary_files = [] for secondary_file_name in os.listdir(secondary_files_path): secondary_file_path = os.path.join(secondary_files_path, secondary_file_name) target = os.path.join(inputs_dir, secondary_file_name) log.info(f"linking [{secondary_file_path}] to [{target}]") os.symlink(secondary_file_path, target) is_dir = os.path.isdir(os.path.realpath(secondary_file_path)) secondary_files.append({"class": "File" if not is_dir else "Directory", "location": target}) raw_file_object["secondaryFiles"] = secondary_files path = new_input_path raw_file_object["location"] = path # Verify it isn't a NoneDataset if dataset_wrapper.unsanitized: raw_file_object["size"] = int(dataset_wrapper.get_size()) set_basename_and_derived_properties(raw_file_object, str(dataset_wrapper.created_from_basename or dataset_wrapper.name)) return raw_file_object
[docs]def dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper): assert dataset_wrapper.ext == "directory" # get directory name archive_name = str(dataset_wrapper.created_from_basename or dataset_wrapper.name) nameroot, nameext = os.path.splitext(archive_name) directory_name = nameroot # assume archive file name contains the directory name # get archive location try: archive_location = dataset_wrapper.unsanitized.file_name except Exception: archive_location = None directory_json = {"location": dataset_wrapper.extra_files_path, "class": "Directory", "name": directory_name, "archive_location": archive_location, "archive_nameext": nameext, "archive_nameroot": nameroot} return directory_json
[docs]def collection_wrapper_to_array(inputs_dir, wrapped_value): rval = [] for value in wrapped_value: rval.append(dataset_wrapper_to_file_json(inputs_dir, value)) return rval
[docs]def collection_wrapper_to_record(inputs_dir, wrapped_value): rval = {} for key, value in wrapped_value.items(): rval[key] = dataset_wrapper_to_file_json(inputs_dir, value) return rval
[docs]def to_cwl_job(tool, param_dict, local_working_directory): """ tool is Galaxy's representation of the tool and param_dict is the parameter dictionary with wrapped values. """ tool_proxy = tool._cwl_tool_proxy input_fields = tool_proxy.input_fields() inputs = tool.inputs input_json = {} inputs_dir = os.path.join(local_working_directory, "_inputs") def simple_value(input, param_dict_value, type_representation_name=None): type_representation = type_representation_from_name(type_representation_name) # Hmm... cwl_type isn't really the cwl type in every case, # like in the case of json for instance. if type_representation.galaxy_param_type == NO_GALAXY_INPUT: assert param_dict_value is None return None if type_representation.name == "file": dataset_wrapper = param_dict_value return dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper) elif type_representation.name == "directory": dataset_wrapper = param_dict_value return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper) elif type_representation.name == "integer": return int(str(param_dict_value)) elif type_representation.name == "long": return int(str(param_dict_value)) elif type_representation.name in ["float", "double"]: return float(str(param_dict_value)) elif type_representation.name == "boolean": return string_as_bool(param_dict_value) elif type_representation.name == "text": return str(param_dict_value) elif type_representation.name == "enum": return str(param_dict_value) elif type_representation.name == "json": raw_value = param_dict_value.value return json.loads(raw_value) elif type_representation.name == "field": if param_dict_value is None: return None if hasattr(param_dict_value, "value"): # Is InputValueWrapper rval = param_dict_value.value if isinstance(rval, dict) and "src" in rval and rval["src"] == "json": # needed for wf_step_connect_undeclared_param, so non-file defaults? return rval["value"] return rval elif not param_dict_value.is_collection: # Is DatasetFilenameWrapper return dataset_wrapper_to_file_json(inputs_dir, param_dict_value) else: # Is DatasetCollectionWrapper hdca_wrapper = param_dict_value if hdca_wrapper.collection_type == "list": # TODO: generalize to lists of lists and lists of non-files... return collection_wrapper_to_array(inputs_dir, hdca_wrapper) elif hdca_wrapper.collection_type.collection_type == "record": return collection_wrapper_to_record(inputs_dir, hdca_wrapper) elif type_representation.name == "array": # TODO: generalize to lists of lists and lists of non-files... return collection_wrapper_to_array(inputs_dir, param_dict_value) elif type_representation.name == "record": return collection_wrapper_to_record(inputs_dir, param_dict_value) else: return str(param_dict_value) for input_name, input in inputs.items(): if input.type == "repeat": only_input = next(iter(input.inputs.values())) array_value = [] for instance in param_dict[input_name]: array_value.append(simple_value(only_input, instance[input_name[:-len("_repeat")]])) input_json[input_name[:-len("_repeat")]] = array_value elif input.type == "conditional": assert input_name in param_dict, f"No value for {input_name} in {param_dict}" current_case = param_dict[input_name]["_cwl__type_"] if str(current_case) != "null": # str because it is a wrapped... case_index = input.get_current_case(current_case) case_input = input.cases[case_index].inputs["_cwl__value_"] case_value = param_dict[input_name]["_cwl__value_"] input_json[input_name] = simple_value(case_input, case_value, current_case) else: matched_field = None for field in input_fields: if field["name"] == input_name: matched_field = field field_type = field_to_field_type(matched_field) if isinstance(field_type, list): assert USE_FIELD_TYPES type_descriptions = [FIELD_TYPE_REPRESENTATION] else: type_descriptions = type_descriptions_for_field_types([field_type]) assert len(type_descriptions) == 1 type_description_name = type_descriptions[0].name input_json[input_name] = simple_value(input, param_dict[input_name], type_description_name) log.debug("Galaxy Tool State is CWL State is %s" % input_json) return input_json
[docs]def to_galaxy_parameters(tool, as_dict): """ Tool is Galaxy's representation of the tool and as_dict is a Galaxified representation of the input json (no paths, HDA references for instance). """ inputs = tool.inputs galaxy_request = {} def from_simple_value(input, param_dict_value, type_representation_name=None): if type_representation_name == "json": return json.dumps(param_dict_value) else: return param_dict_value for input_name, input in inputs.items(): as_dict_value = as_dict.get(input_name, NOT_PRESENT) galaxy_input_type = input.type if galaxy_input_type == "repeat": if input_name not in as_dict: continue only_input = next(iter(input.inputs.values())) for value in as_dict_value: key = f"{input_name}_repeat_0|{only_input.name}" galaxy_value = from_simple_value(only_input, value) galaxy_request[key] = galaxy_value elif galaxy_input_type == "conditional": case_strings = input.case_strings # TODO: less crazy handling of defaults... if (as_dict_value is NOT_PRESENT or as_dict_value is None) and "null" in case_strings: type_representation_name = "null" elif (as_dict_value is NOT_PRESENT or as_dict_value is None): raise RequestParameterInvalidException( "Cannot translate CWL datatype - value [{}] of type [{}] with case_strings [{}]. Non-null property must be set.".format( as_dict_value, type(as_dict_value), case_strings ) ) elif isinstance(as_dict_value, bool) and "boolean" in case_strings: type_representation_name = "boolean" elif isinstance(as_dict_value, int) and "integer" in case_strings: type_representation_name = "integer" elif isinstance(as_dict_value, int) and "long" in case_strings: type_representation_name = "long" elif isinstance(as_dict_value, (int, float)) and "float" in case_strings: type_representation_name = "float" elif isinstance(as_dict_value, (int, float)) and "double" in case_strings: type_representation_name = "double" elif isinstance(as_dict_value, str) and "string" in case_strings: type_representation_name = "string" elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "file" in case_strings: type_representation_name = "file" elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "directory" in case_strings: # TODO: can't disambiuate with above if both are available... type_representation_name = "directory" elif "field" in case_strings: type_representation_name = "field" elif "json" in case_strings and as_dict_value is not None: type_representation_name = "json" else: raise RequestParameterInvalidException( "Cannot translate CWL datatype - value [{}] of type [{}] with case_strings [{}].".format( as_dict_value, type(as_dict_value), case_strings ) ) galaxy_request["%s|_cwl__type_" % input_name] = type_representation_name if type_representation_name != "null": current_case_index = input.get_current_case(type_representation_name) current_case_inputs = input.cases[current_case_index].inputs current_case_input = current_case_inputs["_cwl__value_"] galaxy_value = from_simple_value(current_case_input, as_dict_value, type_representation_name) galaxy_request["%s|_cwl__value_" % input_name] = galaxy_value elif as_dict_value is NOT_PRESENT: continue else: galaxy_value = from_simple_value(input, as_dict_value) galaxy_request[input_name] = galaxy_value log.info("Converted galaxy_request is %s" % galaxy_request) return galaxy_request
[docs]def field_to_field_type(field): field_type = field["type"] if isinstance(field_type, dict): field_type = field_type["type"] if isinstance(field_type, list): field_type_length = len(field_type) if field_type_length == 0: raise Exception("Zero-length type list encountered, invalid CWL?") elif len(field_type) == 1: field_type = field_type[0] return field_type