Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.cwl.representation
""" This module is responsible for converting between Galaxy's tool
input description and the CWL description for a job json. """
import json
import logging
import os
from enum import Enum
from typing import Any, NamedTuple, Optional
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.util import safe_makedirs, string_as_bool
from .util import set_basename_and_derived_properties
log = logging.getLogger(__name__)
NOT_PRESENT = object()
NO_GALAXY_INPUT = object()
[docs]class INPUT_TYPE(str, Enum):
DATA = "data"
INTEGER = "integer"
FLOAT = "float"
TEXT = "text"
BOOLEAN = "boolean"
SELECT = "select"
FIELD = "field"
CONDITIONAL = "conditional"
DATA_COLLECTON = "data_collection"
# There are two approaches to mapping CWL tool state to Galaxy tool state
# one is to map CWL types to compound Galaxy tool parameters combinations
# with conditionals and the other is to use a new Galaxy parameter type that
# allows unions, optional specifications, etc.... The problem with the former
# is that it doesn't work with the workflow parameters for instance and is
# very complex on the backend. The problem with the latter is that the GUI
# for this parameter type is undefined curently.
USE_FIELD_TYPES = True
# There are two approaches to mapping CWL workflow inputs to Galaxy workflow
# steps. The first is to simply map everything to expressions and stick them into
# files and use data inputs - the second is to use parameter_input steps with
# fields types. We are dispatching on USE_FIELD_TYPES for now - to choose but
# may diverge later?
# There are open issues with each approach:
# - Mapping everything to files makes the GUI harder to imagine but the backend
# easier to manage in someways.
USE_STEP_PARAMETERS = USE_FIELD_TYPES
[docs]class TypeRepresentation(NamedTuple):
name: str
galaxy_param_type: Any
label: str
collection_type: Optional[str]
@property
def uses_param(self):
return self.galaxy_param_type is not NO_GALAXY_INPUT
TYPE_REPRESENTATIONS = [
TypeRepresentation("null", NO_GALAXY_INPUT, "no input", None),
TypeRepresentation("integer", INPUT_TYPE.INTEGER, "an integer", None),
TypeRepresentation("float", INPUT_TYPE.FLOAT, "a decimal number", None),
TypeRepresentation("double", INPUT_TYPE.FLOAT, "a decimal number", None),
TypeRepresentation("file", INPUT_TYPE.DATA, "a dataset", None),
TypeRepresentation("directory", INPUT_TYPE.DATA, "a directory", None),
TypeRepresentation("boolean", INPUT_TYPE.BOOLEAN, "a boolean", None),
TypeRepresentation("text", INPUT_TYPE.TEXT, "a simple text field", None),
TypeRepresentation("record", INPUT_TYPE.DATA_COLLECTON, "record as a dataset collection", "record"),
TypeRepresentation("json", INPUT_TYPE.TEXT, "arbitrary JSON structure", None),
TypeRepresentation("array", INPUT_TYPE.DATA_COLLECTON, "as a dataset list", "list"),
TypeRepresentation("enum", INPUT_TYPE.TEXT, "enum value", None), # TODO: make this a select...
TypeRepresentation("field", INPUT_TYPE.FIELD, "arbitrary JSON structure", None),
]
FIELD_TYPE_REPRESENTATION = TYPE_REPRESENTATIONS[-1]
if not USE_FIELD_TYPES:
CWL_TYPE_TO_REPRESENTATIONS = {
"Any": ["integer", "float", "file", "boolean", "text", "record", "json"],
"array": ["array"],
"string": ["text"],
"boolean": ["boolean"],
"int": ["integer"],
"float": ["float"],
"File": ["file"],
"Directory": ["directory"],
"null": ["null"],
"record": ["record"],
}
else:
CWL_TYPE_TO_REPRESENTATIONS = {
"Any": ["field"],
"array": ["array"],
"string": ["text"],
"boolean": ["boolean"],
"int": ["integer"],
"float": ["float"],
"File": ["file"],
"Directory": ["directory"],
"null": ["null"],
"record": ["record"],
"enum": ["enum"],
"double": ["double"],
}
[docs]def type_representation_from_name(type_representation_name):
for type_representation in TYPE_REPRESENTATIONS:
if type_representation.name == type_representation_name:
return type_representation
else:
raise ValueError(f"No type representation for {type_representation_name}")
[docs]def type_descriptions_for_field_types(field_types):
type_representation_names = set()
for field_type in field_types:
if isinstance(field_type, dict) and field_type.get("type"):
field_type = field_type.get("type")
try:
type_representation_names_for_field_type = CWL_TYPE_TO_REPRESENTATIONS.get(field_type)
except TypeError:
raise Exception("Failed to convert field_type %s" % field_type)
if type_representation_names_for_field_type is None:
raise Exception("Failed to convert type %s" % field_type)
type_representation_names.update(type_representation_names_for_field_type)
type_representations = []
for type_representation in TYPE_REPRESENTATIONS:
if type_representation.name in type_representation_names:
type_representations.append(type_representation)
return type_representations
[docs]def dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper):
if dataset_wrapper.ext == "expression.json":
with open(dataset_wrapper.file_name) as f:
return json.load(f)
if dataset_wrapper.ext == "directory":
return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)
extra_files_path = dataset_wrapper.extra_files_path
secondary_files_path = os.path.join(extra_files_path, "__secondary_files__")
path = str(dataset_wrapper)
raw_file_object = {"class": "File"}
if os.path.exists(secondary_files_path):
safe_makedirs(inputs_dir)
name = os.path.basename(path)
new_input_path = os.path.join(inputs_dir, name)
os.symlink(path, new_input_path)
secondary_files = []
for secondary_file_name in os.listdir(secondary_files_path):
secondary_file_path = os.path.join(secondary_files_path, secondary_file_name)
target = os.path.join(inputs_dir, secondary_file_name)
log.info(f"linking [{secondary_file_path}] to [{target}]")
os.symlink(secondary_file_path, target)
is_dir = os.path.isdir(os.path.realpath(secondary_file_path))
secondary_files.append({"class": "File" if not is_dir else "Directory", "location": target})
raw_file_object["secondaryFiles"] = secondary_files
path = new_input_path
raw_file_object["location"] = path
# Verify it isn't a NoneDataset
if dataset_wrapper.unsanitized:
raw_file_object["size"] = int(dataset_wrapper.get_size())
set_basename_and_derived_properties(raw_file_object, str(dataset_wrapper.created_from_basename or dataset_wrapper.name))
return raw_file_object
[docs]def dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper):
assert dataset_wrapper.ext == "directory"
# get directory name
archive_name = str(dataset_wrapper.created_from_basename or dataset_wrapper.name)
nameroot, nameext = os.path.splitext(archive_name)
directory_name = nameroot # assume archive file name contains the directory name
# get archive location
try:
archive_location = dataset_wrapper.unsanitized.file_name
except Exception:
archive_location = None
directory_json = {"location": dataset_wrapper.extra_files_path,
"class": "Directory",
"name": directory_name,
"archive_location": archive_location,
"archive_nameext": nameext,
"archive_nameroot": nameroot}
return directory_json
[docs]def collection_wrapper_to_array(inputs_dir, wrapped_value):
rval = []
for value in wrapped_value:
rval.append(dataset_wrapper_to_file_json(inputs_dir, value))
return rval
[docs]def collection_wrapper_to_record(inputs_dir, wrapped_value):
rval = {}
for key, value in wrapped_value.items():
rval[key] = dataset_wrapper_to_file_json(inputs_dir, value)
return rval
[docs]def to_cwl_job(tool, param_dict, local_working_directory):
""" tool is Galaxy's representation of the tool and param_dict is the
parameter dictionary with wrapped values.
"""
tool_proxy = tool._cwl_tool_proxy
input_fields = tool_proxy.input_fields()
inputs = tool.inputs
input_json = {}
inputs_dir = os.path.join(local_working_directory, "_inputs")
def simple_value(input, param_dict_value, type_representation_name=None):
type_representation = type_representation_from_name(type_representation_name)
# Hmm... cwl_type isn't really the cwl type in every case,
# like in the case of json for instance.
if type_representation.galaxy_param_type == NO_GALAXY_INPUT:
assert param_dict_value is None
return None
if type_representation.name == "file":
dataset_wrapper = param_dict_value
return dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper)
elif type_representation.name == "directory":
dataset_wrapper = param_dict_value
return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)
elif type_representation.name == "integer":
return int(str(param_dict_value))
elif type_representation.name == "long":
return int(str(param_dict_value))
elif type_representation.name in ["float", "double"]:
return float(str(param_dict_value))
elif type_representation.name == "boolean":
return string_as_bool(param_dict_value)
elif type_representation.name == "text":
return str(param_dict_value)
elif type_representation.name == "enum":
return str(param_dict_value)
elif type_representation.name == "json":
raw_value = param_dict_value.value
return json.loads(raw_value)
elif type_representation.name == "field":
if param_dict_value is None:
return None
if hasattr(param_dict_value, "value"):
# Is InputValueWrapper
rval = param_dict_value.value
if isinstance(rval, dict) and "src" in rval and rval["src"] == "json":
# needed for wf_step_connect_undeclared_param, so non-file defaults?
return rval["value"]
return rval
elif not param_dict_value.is_collection:
# Is DatasetFilenameWrapper
return dataset_wrapper_to_file_json(inputs_dir, param_dict_value)
else:
# Is DatasetCollectionWrapper
hdca_wrapper = param_dict_value
if hdca_wrapper.collection_type == "list":
# TODO: generalize to lists of lists and lists of non-files...
return collection_wrapper_to_array(inputs_dir, hdca_wrapper)
elif hdca_wrapper.collection_type.collection_type == "record":
return collection_wrapper_to_record(inputs_dir, hdca_wrapper)
elif type_representation.name == "array":
# TODO: generalize to lists of lists and lists of non-files...
return collection_wrapper_to_array(inputs_dir, param_dict_value)
elif type_representation.name == "record":
return collection_wrapper_to_record(inputs_dir, param_dict_value)
else:
return str(param_dict_value)
for input_name, input in inputs.items():
if input.type == "repeat":
only_input = next(iter(input.inputs.values()))
array_value = []
for instance in param_dict[input_name]:
array_value.append(simple_value(only_input, instance[input_name[:-len("_repeat")]]))
input_json[input_name[:-len("_repeat")]] = array_value
elif input.type == "conditional":
assert input_name in param_dict, f"No value for {input_name} in {param_dict}"
current_case = param_dict[input_name]["_cwl__type_"]
if str(current_case) != "null": # str because it is a wrapped...
case_index = input.get_current_case(current_case)
case_input = input.cases[case_index].inputs["_cwl__value_"]
case_value = param_dict[input_name]["_cwl__value_"]
input_json[input_name] = simple_value(case_input, case_value, current_case)
else:
matched_field = None
for field in input_fields:
if field["name"] == input_name:
matched_field = field
field_type = field_to_field_type(matched_field)
if isinstance(field_type, list):
assert USE_FIELD_TYPES
type_descriptions = [FIELD_TYPE_REPRESENTATION]
else:
type_descriptions = type_descriptions_for_field_types([field_type])
assert len(type_descriptions) == 1
type_description_name = type_descriptions[0].name
input_json[input_name] = simple_value(input, param_dict[input_name], type_description_name)
log.debug("Galaxy Tool State is CWL State is %s" % input_json)
return input_json
[docs]def to_galaxy_parameters(tool, as_dict):
""" Tool is Galaxy's representation of the tool and as_dict is a Galaxified
representation of the input json (no paths, HDA references for instance).
"""
inputs = tool.inputs
galaxy_request = {}
def from_simple_value(input, param_dict_value, type_representation_name=None):
if type_representation_name == "json":
return json.dumps(param_dict_value)
else:
return param_dict_value
for input_name, input in inputs.items():
as_dict_value = as_dict.get(input_name, NOT_PRESENT)
galaxy_input_type = input.type
if galaxy_input_type == "repeat":
if input_name not in as_dict:
continue
only_input = next(iter(input.inputs.values()))
for value in as_dict_value:
key = f"{input_name}_repeat_0|{only_input.name}"
galaxy_value = from_simple_value(only_input, value)
galaxy_request[key] = galaxy_value
elif galaxy_input_type == "conditional":
case_strings = input.case_strings
# TODO: less crazy handling of defaults...
if (as_dict_value is NOT_PRESENT or as_dict_value is None) and "null" in case_strings:
type_representation_name = "null"
elif (as_dict_value is NOT_PRESENT or as_dict_value is None):
raise RequestParameterInvalidException(
"Cannot translate CWL datatype - value [{}] of type [{}] with case_strings [{}]. Non-null property must be set.".format(
as_dict_value, type(as_dict_value), case_strings
)
)
elif isinstance(as_dict_value, bool) and "boolean" in case_strings:
type_representation_name = "boolean"
elif isinstance(as_dict_value, int) and "integer" in case_strings:
type_representation_name = "integer"
elif isinstance(as_dict_value, int) and "long" in case_strings:
type_representation_name = "long"
elif isinstance(as_dict_value, (int, float)) and "float" in case_strings:
type_representation_name = "float"
elif isinstance(as_dict_value, (int, float)) and "double" in case_strings:
type_representation_name = "double"
elif isinstance(as_dict_value, str) and "string" in case_strings:
type_representation_name = "string"
elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "file" in case_strings:
type_representation_name = "file"
elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "directory" in case_strings:
# TODO: can't disambiuate with above if both are available...
type_representation_name = "directory"
elif "field" in case_strings:
type_representation_name = "field"
elif "json" in case_strings and as_dict_value is not None:
type_representation_name = "json"
else:
raise RequestParameterInvalidException(
"Cannot translate CWL datatype - value [{}] of type [{}] with case_strings [{}].".format(
as_dict_value, type(as_dict_value), case_strings
)
)
galaxy_request["%s|_cwl__type_" % input_name] = type_representation_name
if type_representation_name != "null":
current_case_index = input.get_current_case(type_representation_name)
current_case_inputs = input.cases[current_case_index].inputs
current_case_input = current_case_inputs["_cwl__value_"]
galaxy_value = from_simple_value(current_case_input, as_dict_value, type_representation_name)
galaxy_request["%s|_cwl__value_" % input_name] = galaxy_value
elif as_dict_value is NOT_PRESENT:
continue
else:
galaxy_value = from_simple_value(input, as_dict_value)
galaxy_request[input_name] = galaxy_value
log.info("Converted galaxy_request is %s" % galaxy_request)
return galaxy_request
[docs]def field_to_field_type(field):
field_type = field["type"]
if isinstance(field_type, dict):
field_type = field_type["type"]
if isinstance(field_type, list):
field_type_length = len(field_type)
if field_type_length == 0:
raise Exception("Zero-length type list encountered, invalid CWL?")
elif len(field_type) == 1:
field_type = field_type[0]
return field_type