Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.cwl.schema
"""Abstraction around cwltool and related libraries for loading a CWL artifact."""
import os
from collections import namedtuple
from six.moves.urllib.parse import urldefrag
from .cwltool_deps import (
ensure_cwltool_available,
load_tool,
schema_salad,
workflow,
)
RawProcessReference = namedtuple("RawProcessReference", ["process_object", "uri"])
ProcessDefinition = namedtuple("ProcessDefinition", ["process_object", "metadata", "document_loader", "avsc_names", "raw_process_reference"])
[docs]class SchemaLoader(object):
@property
def raw_document_loader(self):
ensure_cwltool_available()
from cwltool.load_tool import jobloaderctx
return schema_salad.ref_resolver.Loader(jobloaderctx)
[docs] def raw_process_reference(self, path):
uri = "file://" + os.path.abspath(path)
fileuri, _ = urldefrag(uri)
return RawProcessReference(self.raw_document_loader.fetch(fileuri), uri)
[docs] def raw_process_reference_for_object(self, object, uri=None):
if uri is None:
uri = "galaxy://"
return RawProcessReference(object, uri)
[docs] def process_definition(self, raw_reference):
document_loader, avsc_names, process_object, metadata, uri = load_tool.validate_document(
self.raw_document_loader,
raw_reference.process_object,
raw_reference.uri,
)
process_def = ProcessDefinition(
process_object,
metadata,
document_loader,
avsc_names,
raw_reference,
)
return process_def
[docs] def tool(self, **kwds):
process_definition = kwds.get("process_definition", None)
if process_definition is None:
raw_process_reference = kwds.get("raw_process_reference", None)
if raw_process_reference is None:
raw_process_reference = self.raw_process_reference(kwds["path"])
process_definition = self.process_definition(raw_process_reference)
make_tool = kwds.get("make_tool", workflow.defaultMakeTool)
tool = load_tool.make_tool(
process_definition.document_loader,
process_definition.avsc_names,
process_definition.metadata,
process_definition.raw_process_reference.uri,
make_tool,
{"strict": self._strict},
)
return tool
schema_loader = SchemaLoader()
non_strict_schema_loader = SchemaLoader(strict=False)