Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.cwl.parser
""" This module provides proxy objects around objects from the common
workflow language reference implementation library cwltool. These proxies
adapt cwltool to Galaxy features and abstract the library away from the rest
of the framework.
"""
from __future__ import absolute_import
import json
import logging
import os
from abc import ABCMeta, abstractmethod
import six
from galaxy.util import safe_makedirs
from galaxy.util.bunch import Bunch
from galaxy.util.odict import odict
from .cwltool_deps import (
ensure_cwltool_available,
process,
)
from .schema import non_strict_schema_loader, schema_loader
log = logging.getLogger(__name__)
JOB_JSON_FILE = ".cwl_job.json"
SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__"
SUPPORTED_TOOL_REQUIREMENTS = [
"CreateFileRequirement",
"DockerRequirement",
"EnvVarRequirement",
"InlineJavascriptRequirement",
]
SUPPORTED_WORKFLOW_REQUIREMENTS = SUPPORTED_TOOL_REQUIREMENTS + [
]
[docs]def tool_proxy(tool_path, strict_cwl_validation=True):
""" Provide a proxy object to cwltool data structures to just
grab relevant data.
"""
ensure_cwltool_available()
tool = to_cwl_tool_object(tool_path, strict_cwl_validation=strict_cwl_validation)
return tool
[docs]def workflow_proxy(workflow_path, strict_cwl_validation=True):
ensure_cwltool_available()
workflow = to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation)
return workflow
[docs]def load_job_proxy(job_directory, strict_cwl_validation=True):
ensure_cwltool_available()
job_objects_path = os.path.join(job_directory, JOB_JSON_FILE)
job_objects = json.load(open(job_objects_path, "r"))
tool_path = job_objects["tool_path"]
job_inputs = job_objects["job_inputs"]
output_dict = job_objects["output_dict"]
cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation)
cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory)
return cwl_job
def to_cwl_tool_object(tool_path, strict_cwl_validation=True):
proxy_class = None
cwl_tool = _schema_loader(strict_cwl_validation).tool(path=tool_path)
if isinstance(cwl_tool, int):
raise Exception("Failed to load tool.")
raw_tool = cwl_tool.tool
check_requirements(raw_tool)
if "class" not in raw_tool:
raise Exception("File does not declare a class, not a valid Draft 3+ CWL tool.")
process_class = raw_tool["class"]
if process_class == "CommandLineTool":
proxy_class = CommandLineToolProxy
elif process_class == "ExpressionTool":
proxy_class = ExpressionToolProxy
else:
raise Exception("File not a CWL CommandLineTool.")
if "cwlVersion" not in raw_tool:
raise Exception("File does not declare a CWL version, pre-draft 3 CWL tools are not supported.")
proxy = proxy_class(cwl_tool, tool_path)
return proxy
def to_cwl_workflow_object(workflow_path, strict_cwl_validation=None):
proxy_class = WorkflowProxy
cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path)
raw_workflow = cwl_workflow.tool
check_requirements(raw_workflow, tool=False)
proxy = proxy_class(cwl_workflow, workflow_path)
return proxy
def _schema_loader(strict_cwl_validation):
target_schema_loader = schema_loader if strict_cwl_validation else non_strict_schema_loader
return target_schema_loader
def check_requirements(rec, tool=True):
if isinstance(rec, dict):
if "requirements" in rec:
for r in rec["requirements"]:
if tool:
possible = SUPPORTED_TOOL_REQUIREMENTS
else:
possible = SUPPORTED_WORKFLOW_REQUIREMENTS
if r["class"] not in possible:
raise Exception("Unsupported requirement %s" % r["class"])
for d in rec:
check_requirements(rec[d], tool=tool)
if isinstance(rec, list):
for d in rec:
check_requirements(d, tool=tool)
@six.add_metaclass(ABCMeta)
class ToolProxy(object):
def __init__(self, tool, tool_path):
self._tool = tool
self._tool_path = tool_path
def job_proxy(self, input_dict, output_dict, job_directory="."):
""" Build a cwltool.job.Job describing computation using a input_json
Galaxy will generate mapping the Galaxy description of the inputs into
a cwltool compatible variant.
"""
return JobProxy(self, input_dict, output_dict, job_directory=job_directory)
@abstractmethod
def input_instances(self):
""" Return InputInstance objects describing mapping to Galaxy inputs. """
@abstractmethod
def output_instances(self):
""" Return OutputInstance objects describing mapping to Galaxy inputs. """
@abstractmethod
def docker_identifier(self):
""" Return docker identifier for embedding in tool description. """
@abstractmethod
def description(self):
""" Return description to tool. """
@abstractmethod
def label(self):
""" Return label for tool. """
class CommandLineToolProxy(ToolProxy):
def description(self):
return self._tool.tool.get('doc')
def label(self):
return self._tool.tool.get('label')
def input_instances(self):
return self._find_inputs(self._tool.inputs_record_schema)
def _find_inputs(self, schema):
schema_type = schema["type"]
if isinstance(schema_type, list):
raise Exception("Union types not yet implemented.")
elif isinstance(schema_type, dict):
return self._find_inputs(schema_type)
else:
if schema_type in self._tool.schemaDefs:
schema = self._tool.schemaDefs[schema_type]
if schema["type"] == "record":
return [_simple_field_to_input(_) for _ in schema["fields"]]
def output_instances(self):
outputs_schema = self._tool.outputs_record_schema
return self._find_outputs(outputs_schema)
def _find_outputs(self, schema):
rval = []
if not rval and schema["type"] == "record":
for output in schema["fields"]:
# output_type = output.get("type", None)
# if output_type != "File":
# template = "Unhandled output type [%s] encountered."
# raise Exception(template % output_type)
rval.append(_simple_field_to_output(output))
return rval
def docker_identifier(self):
tool = self._tool.tool
reqs_and_hints = tool.get("requirements", []) + tool.get("hints", [])
for hint in reqs_and_hints:
if hint["class"] == "DockerRequirement":
if "dockerImageId" in hint:
return hint["dockerImageId"]
else:
return hint["dockerPull"]
return None
class ExpressionToolProxy(CommandLineToolProxy):
pass
class JobProxy(object):
def __init__(self, tool_proxy, input_dict, output_dict, job_directory):
self._tool_proxy = tool_proxy
self._input_dict = input_dict
self._output_dict = output_dict
self._job_directory = job_directory
self._final_output = []
self._ok = True
self._cwl_job = None
self._is_command_line_job = None
def cwl_job(self):
self._ensure_cwl_job_initialized()
return self._cwl_job
@property
def is_command_line_job(self):
self._ensure_cwl_job_initialized()
assert self._is_command_line_job is not None
return self._is_command_line_job
def _ensure_cwl_job_initialized(self):
if self._cwl_job is None:
self._cwl_job = next(self._tool_proxy._tool.job(
self._input_dict,
self._output_callback,
basedir=self._job_directory,
select_resources=self._select_resources,
use_container=False
))
self._is_command_line_job = hasattr(self._cwl_job, "command_line")
def _select_resources(self, request):
new_request = request.copy()
new_request["cores"] = "$GALAXY_SLOTS"
return new_request
@property
def command_line(self):
if self.is_command_line_job:
return self.cwl_job().command_line
else:
return ["true"]
@property
def stdin(self):
if self.is_command_line_job:
return self.cwl_job().stdin
else:
return None
@property
def stdout(self):
if self.is_command_line_job:
return self.cwl_job().stdout
else:
return None
@property
def environment(self):
if self.is_command_line_job:
return self.cwl_job().environment
else:
return {}
@property
def generate_files(self):
if self.is_command_line_job:
return self.cwl_job().generatefiles
else:
return {}
def _output_callback(self, out, process_status):
if process_status == "success":
self._final_output = out
else:
self._ok = False
log.info("Output are %s, status is %s" % (out, process_status))
def collect_outputs(self, tool_working_directory):
if not self.is_command_line_job:
self.cwl_job().run(
)
return self._final_output
else:
return self.cwl_job().collect_outputs(tool_working_directory)
def save_job(self):
job_file = JobProxy._job_file(self._job_directory)
job_objects = {
"tool_path": os.path.abspath(self._tool_proxy._tool_path),
"job_inputs": self._input_dict,
"output_dict": self._output_dict,
}
json.dump(job_objects, open(job_file, "w"))
def _output_extra_files_dir(self, output_name):
output_id = self.output_id(output_name)
return os.path.join(self._job_directory, "dataset_%s_files" % output_id)
def output_id(self, output_name):
output_id = self._output_dict[output_name]["id"]
return output_id
def output_path(self, output_name):
output_id = self._output_dict[output_name]["path"]
return output_id
def output_secondary_files_dir(self, output_name, create=False):
extra_files_dir = self._output_extra_files_dir(output_name)
secondary_files_dir = os.path.join(extra_files_dir, SECONDARY_FILES_EXTRA_PREFIX)
if create and not os.path.exists(secondary_files_dir):
safe_makedirs(secondary_files_dir)
return secondary_files_dir
def stage_files(self):
cwl_job = self.cwl_job()
if hasattr(cwl_job, "pathmapper"):
process.stageFiles(self.cwl_job().pathmapper, os.symlink, ignoreWritable=True)
# else: expression tools do not have a path mapper.
@staticmethod
def _job_file(job_directory):
return os.path.join(job_directory, JOB_JSON_FILE)
class WorkflowProxy(object):
def __init__(self, workflow, workflow_path):
self._workflow = workflow
self._workflow_path = workflow_path
def step_proxies(self):
proxies = []
for step in self._workflow.steps:
proxies.append(StepProxy(self, step))
return proxies
@property
def runnables(self):
runnables = []
for step in self._workflow.steps:
if "run" in step.tool:
runnables.append(step.tool["run"])
return runnables
def to_dict(self):
name = os.path.basename(self._workflow_path)
steps = {}
index = 0
for i, input_dict in self._workflow.tool['inputs']:
index += 1
steps[index] = input_dict
for i, step_proxy in enumerate(self.step_proxies()):
index += 1
steps[index] = step_proxy.to_dict()
return {
'name': name,
'steps': steps,
}
class StepProxy(object):
def __init__(self, workflow_proxy, step):
self._workflow_proxy = workflow_proxy
self._step = step
def to_dict(self):
return {}
def _simple_field_union(field):
field_type = _field_to_field_type(field) # Must be a list if in here?
def any_of_in_field_type(types):
return any([t in field_type for t in types])
name, label, description = _field_metadata(field)
case_name = "_cwl__type_"
case_label = "Specify Parameter %s As" % label
def value_input(**kwds):
value_name = "_cwl__value_"
value_label = label
value_description = description
return InputInstance(
value_name,
value_label,
value_description,
**kwds
)
select_options = []
case_options = []
if "null" in field_type:
select_options.append({"value": "null", "label": "None", "selected": True})
case_options.append(("null", []))
if any_of_in_field_type(["Any", "string"]):
select_options.append({"value": "string", "label": "Simple String"})
case_options.append(("string", [value_input(input_type=INPUT_TYPE.TEXT)]))
if any_of_in_field_type(["Any", "boolean"]):
select_options.append({"value": "boolean", "label": "Boolean"})
case_options.append(("boolean", [value_input(input_type=INPUT_TYPE.BOOLEAN)]))
if any_of_in_field_type(["Any", "int"]):
select_options.append({"value": "int", "label": "Integer"})
case_options.append(("int", [value_input(input_type=INPUT_TYPE.INTEGER)]))
if any_of_in_field_type(["Any", "float"]):
select_options.append({"value": "float", "label": "Floating Point Number"})
case_options.append(("float", [value_input(input_type=INPUT_TYPE.FLOAT)]))
if any_of_in_field_type(["Any", "File"]):
select_options.append({"value": "data", "label": "Dataset"})
case_options.append(("data", [value_input(input_type=INPUT_TYPE.DATA)]))
if "Any" in field_type:
select_options.append({"value": "json", "label": "JSON Data Structure"})
case_options.append(("json", [value_input(input_type=INPUT_TYPE.TEXT, area=True)]))
case_input = SelectInputInstance(
name=case_name,
label=case_label,
description=False,
options=select_options,
)
return ConditionalInstance(name, case_input, case_options)
def _simple_field_to_input(field):
field_type = _field_to_field_type(field)
if isinstance(field_type, list):
# Length must be greater than 1...
return _simple_field_union(field)
name, label, description = _field_metadata(field)
type_kwds = _simple_field_to_input_type_kwds(field)
return InputInstance(name, label, description, **type_kwds)
def _simple_field_to_input_type_kwds(field, field_type=None):
simple_map_type_map = {
"File": INPUT_TYPE.DATA,
"int": INPUT_TYPE.INTEGER,
"long": INPUT_TYPE.INTEGER,
"float": INPUT_TYPE.INTEGER,
"double": INPUT_TYPE.INTEGER,
"string": INPUT_TYPE.TEXT,
"boolean": INPUT_TYPE.BOOLEAN,
}
if field_type is None:
field_type = _field_to_field_type(field)
if field_type in simple_map_type_map.keys():
input_type = simple_map_type_map[field_type]
return {"input_type": input_type, "array": False}
elif field_type == "array":
if isinstance(field["type"], dict):
array_type = field["type"]["items"]
else:
array_type = field["items"]
if array_type in simple_map_type_map.keys():
input_type = simple_map_type_map[array_type]
return {"input_type": input_type, "array": True}
else:
raise Exception("Unhandled simple field type encountered - [%s]." % field_type)
def _field_to_field_type(field):
field_type = field["type"]
if isinstance(field_type, dict):
field_type = field_type["type"]
if isinstance(field_type, list):
field_type_length = len(field_type)
if field_type_length == 0:
raise Exception("Zero-length type list encountered, invalid CWL?")
elif len(field_type) == 1:
field_type = field_type[0]
if field_type == "Any":
field_type = ["Any"]
return field_type
def _field_metadata(field):
name = field["name"]
label = field.get("label", None)
description = field.get("doc", None)
return name, label, description
def _simple_field_to_output(field):
name = field["name"]
output_data_class = field["type"]
output_instance = OutputInstance(
name,
output_data_type=output_data_class,
output_type=OUTPUT_TYPE.GLOB
)
return output_instance
INPUT_TYPE = Bunch(
DATA="data",
INTEGER="integer",
FLOAT="float",
TEXT="text",
BOOLEAN="boolean",
SELECT="select",
CONDITIONAL="conditional",
)
class ConditionalInstance(object):
def __init__(self, name, case, whens):
self.input_type = INPUT_TYPE.CONDITIONAL
self.name = name
self.case = case
self.whens = whens
def to_dict(self):
as_dict = dict(
name=self.name,
type=INPUT_TYPE.CONDITIONAL,
test=self.case.to_dict(),
when=odict(),
)
for value, block in self.whens:
as_dict["when"][value] = [i.to_dict() for i in block]
return as_dict
class SelectInputInstance(object):
def __init__(self, name, label, description, options):
self.input_type = INPUT_TYPE.SELECT
self.name = name
self.label = label
self.description = description
self.options = options
def to_dict(self):
# TODO: serialize options...
as_dict = dict(
name=self.name,
label=self.label or self.name,
help=self.description,
type=self.input_type,
options=self.options,
)
return as_dict
class InputInstance(object):
def __init__(self, name, label, description, input_type, array=False, area=False):
self.input_type = input_type
self.name = name
self.label = label
self.description = description
self.required = True
self.array = array
self.area = area
def to_dict(self, itemwise=True):
if itemwise and self.array:
as_dict = dict(
type="repeat",
name="%s_repeat" % self.name,
title="%s" % self.name,
blocks=[
self.to_dict(itemwise=False)
]
)
else:
as_dict = dict(
name=self.name,
label=self.label or self.name,
help=self.description,
type=self.input_type,
optional=not self.required,
)
if self.area:
as_dict["area"] = True
if self.input_type == INPUT_TYPE.INTEGER:
as_dict["value"] = "0"
if self.input_type == INPUT_TYPE.FLOAT:
as_dict["value"] = "0.0"
return as_dict
OUTPUT_TYPE = Bunch(
GLOB="glob",
STDOUT="stdout",
)
class OutputInstance(object):
def __init__(self, name, output_data_type, output_type, path=None):
self.name = name
self.output_data_type = output_data_type
self.output_type = output_type
self.path = path
__all__ = (
'tool_proxy',
'load_job_proxy',
)