Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for cwltool.load_tool

"""Loads a CWL document."""

import copy
import hashlib
import logging
import os
import re
import urllib
import uuid
from functools import partial
from typing import (
    Any,
    Dict,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Tuple,
    Union,
    cast,
)

from cwl_utils.parser import cwl_v1_2, cwl_v1_2_utils
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.exceptions import ValidationException
from schema_salad.fetcher import Fetcher
from schema_salad.ref_resolver import Loader, file_uri
from schema_salad.schema import validate_doc
from schema_salad.sourceline import SourceLine, cmap
from schema_salad.utils import (
    ContextType,
    FetcherCallableType,
    IdxResultType,
    ResolveType,
    json_dumps,
)

from . import CWL_CONTENT_TYPES, process, update
from .context import LoadingContext
from .errors import GraphTargetMissingException
from .loghandler import _logger
from .process import Process, get_schema, shortname
from .update import ALLUPDATES
from .utils import CWLObjectType, ResolverType, visit_class

docloaderctx: ContextType = {
    "cwl": "https://w3id.org/cwl/cwl#",
    "cwltool": "http://commonwl.org/cwltool#",
    "path": {"@type": "@id"},
    "location": {"@type": "@id"},
    "id": "@id",
}

jobloader_id_name = "__id"
jobloaderctx: ContextType = {
    "cwl": "https://w3id.org/cwl/cwl#",
    "cwltool": "http://commonwl.org/cwltool#",
    "path": {"@type": "@id"},
    "location": {"@type": "@id"},
    jobloader_id_name: "@id",
}


overrides_ctx: ContextType = {
    "overrideTarget": {"@type": "@id"},
    "cwltool": "http://commonwl.org/cwltool#",
    "http://commonwl.org/cwltool#overrides": {
        "@id": "cwltool:overrides",
        "mapSubject": "overrideTarget",
    },
    "requirements": {
        "@id": "https://w3id.org/cwl/cwl#requirements",
        "mapSubject": "class",
    },
}


[docs]def default_loader( fetcher_constructor: Optional[FetcherCallableType] = None, enable_dev: bool = False, doc_cache: bool = True, ) -> Loader: return Loader( docloaderctx, fetcher_constructor=fetcher_constructor, allow_attachments=lambda r: enable_dev, doc_cache=doc_cache, )
def resolve_tool_uri( argsworkflow: str, resolver: Optional[ResolverType] = None, fetcher_constructor: Optional[FetcherCallableType] = None, document_loader: Optional[Loader] = None, ) -> Tuple[str, str]: uri = None # type: Optional[str] split = urllib.parse.urlsplit(argsworkflow) # In case of Windows path, urlsplit misjudge Drive letters as scheme, here we are skipping that if split.scheme and split.scheme in ["http", "https", "file"]: uri = argsworkflow elif os.path.exists(os.path.abspath(argsworkflow)): uri = file_uri(str(os.path.abspath(argsworkflow))) elif resolver is not None: uri = resolver(document_loader or default_loader(fetcher_constructor), argsworkflow) if uri is None: raise ValidationException("Not found: '%s'" % argsworkflow) if argsworkflow != uri: _logger.info("Resolved '%s' to '%s'", argsworkflow, uri) fileuri = urllib.parse.urldefrag(uri)[0] return uri, fileuri def fetch_document( argsworkflow: Union[str, CWLObjectType], loadingContext: Optional[LoadingContext] = None, ) -> Tuple[LoadingContext, CommentedMap, str]: """Retrieve a CWL document.""" if loadingContext is None: loadingContext = LoadingContext() loadingContext.loader = default_loader() else: loadingContext = loadingContext.copy() if loadingContext.loader is None: loadingContext.loader = default_loader( loadingContext.fetcher_constructor, enable_dev=loadingContext.enable_dev, doc_cache=loadingContext.doc_cache, ) if isinstance(argsworkflow, str): uri, fileuri = resolve_tool_uri( argsworkflow, resolver=loadingContext.resolver, document_loader=loadingContext.loader, ) workflowobj = cast( CommentedMap, loadingContext.loader.fetch(fileuri, content_types=CWL_CONTENT_TYPES), ) return loadingContext, workflowobj, uri if isinstance(argsworkflow, MutableMapping): uri = cast(str, argsworkflow["id"]) if argsworkflow.get("id") else "_:" + str(uuid.uuid4()) workflowobj = cast(CommentedMap, cmap(cast(Dict[str, Any], argsworkflow), fn=uri)) loadingContext.loader.idx[uri] = workflowobj return loadingContext, workflowobj, uri raise ValidationException("Must be URI or object: '%s'" % argsworkflow) def _convert_stdstreams_to_files( workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str] ) -> None: if isinstance(workflowobj, MutableMapping): if workflowobj.get("class") == "CommandLineTool": with SourceLine( workflowobj, "outputs", ValidationException, _logger.isEnabledFor(logging.DEBUG), ): outputs = workflowobj.get("outputs", []) if not isinstance(outputs, CommentedSeq): raise ValidationException('"outputs" section is not ' "valid.") for out in cast(MutableSequence[CWLObjectType], workflowobj.get("outputs", [])): if not isinstance(out, CommentedMap): raise ValidationException(f"Output {out!r} is not a valid OutputParameter.") for streamtype in ["stdout", "stderr"]: if out.get("type") == streamtype: if "outputBinding" in out: raise ValidationException( "Not allowed to specify outputBinding when" " using %s shortcut." % streamtype ) if streamtype in workflowobj: filename = workflowobj[streamtype] else: filename = str( hashlib.sha1( # nosec json_dumps(workflowobj, sort_keys=True).encode("utf-8") ).hexdigest() ) workflowobj[streamtype] = filename out["type"] = "File" out["outputBinding"] = cmap({"glob": filename}) for inp in cast(MutableSequence[CWLObjectType], workflowobj.get("inputs", [])): if inp.get("type") == "stdin": if "inputBinding" in inp: raise ValidationException( "Not allowed to specify inputBinding when" " using stdin shortcut." ) if "stdin" in workflowobj: raise ValidationException( "Not allowed to specify stdin path when" " using stdin type shortcut." ) else: workflowobj["stdin"] = ( "$(inputs.%s.path)" % cast(str, inp["id"]).rpartition("#")[2].split("/")[-1] ) inp["type"] = "File" else: for entry in workflowobj.values(): _convert_stdstreams_to_files( cast( Union[ CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str, ], entry, ) ) if isinstance(workflowobj, MutableSequence): for entry in workflowobj: _convert_stdstreams_to_files( cast( Union[ CWLObjectType, MutableSequence[Union[CWLObjectType, str, int]], str, ], entry, ) ) def _add_blank_ids( workflowobj: Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]] ) -> None: if isinstance(workflowobj, MutableMapping): if ( "run" in workflowobj and isinstance(workflowobj["run"], MutableMapping) and "id" not in workflowobj["run"] and "$import" not in workflowobj["run"] ): workflowobj["run"]["id"] = str(uuid.uuid4()) for entry in workflowobj.values(): _add_blank_ids( cast( Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], entry, ) ) if isinstance(workflowobj, MutableSequence): for entry in workflowobj: _add_blank_ids( cast( Union[CWLObjectType, MutableSequence[Union[CWLObjectType, str]]], entry, ) ) def _fast_parser_convert_stdstreams_to_files( processobj: Union[cwl_v1_2.Process, MutableSequence[cwl_v1_2.Process]] ) -> None: if isinstance(processobj, cwl_v1_2.CommandLineTool): cwl_v1_2_utils.convert_stdstreams_to_files(processobj) elif isinstance(processobj, cwl_v1_2.Workflow): for st in processobj.steps: _fast_parser_convert_stdstreams_to_files(st.run) elif isinstance(processobj, MutableSequence): for p in processobj: _fast_parser_convert_stdstreams_to_files(p) def _fast_parser_expand_hint_class( hints: Optional[Any], loadingOptions: cwl_v1_2.LoadingOptions ) -> None: if isinstance(hints, MutableSequence): for h in hints: if isinstance(h, MutableMapping) and "class" in h: for k, v in loadingOptions.namespaces.items(): if h["class"].startswith(k + ":"): h["class"] = v + h["class"][len(k) + 1 :] def _fast_parser_handle_hints( processobj: Union[cwl_v1_2.Process, MutableSequence[cwl_v1_2.Process]], loadingOptions: cwl_v1_2.LoadingOptions, ) -> None: if isinstance(processobj, (cwl_v1_2.CommandLineTool, cwl_v1_2.Workflow)): _fast_parser_expand_hint_class(processobj.hints, loadingOptions) if isinstance(processobj, cwl_v1_2.Workflow): for st in processobj.steps: _fast_parser_expand_hint_class(st.hints, loadingOptions) _fast_parser_handle_hints(st.run, loadingOptions) elif isinstance(processobj, MutableSequence): for p in processobj: _fast_parser_handle_hints(p, loadingOptions) def update_index(document_loader: Loader, pr: CommentedMap) -> None: if "id" in pr: document_loader.idx[pr["id"]] = pr def fast_parser( workflowobj: Union[CommentedMap, CommentedSeq, None], fileuri: Optional[str], uri: str, loadingContext: LoadingContext, fetcher: Fetcher, ) -> Tuple[Union[CommentedMap, CommentedSeq], CommentedMap]: lopt = cwl_v1_2.LoadingOptions(idx=loadingContext.codegen_idx, fileuri=fileuri, fetcher=fetcher) if uri not in loadingContext.codegen_idx: cwl_v1_2.load_document_with_metadata( workflowobj, fileuri, loadingOptions=lopt, addl_metadata_fields=["id", "cwlVersion"], ) objects, loadopt = loadingContext.codegen_idx[uri] _fast_parser_convert_stdstreams_to_files(objects) _fast_parser_handle_hints(objects, loadopt) processobj: Union[MutableMapping[str, Any], MutableSequence[Any], float, str, None] processobj = cwl_v1_2.save(objects, relative_uris=False) metadata: Dict[str, Any] = {} metadata["id"] = loadopt.fileuri if loadopt.namespaces: metadata["$namespaces"] = loadopt.namespaces if loadopt.schemas: metadata["$schemas"] = loadopt.schemas if loadopt.baseuri: metadata["$base"] = loadopt.baseuri for k, v in loadopt.addl_metadata.items(): if isinstance(processobj, MutableMapping) and k in processobj: metadata[k] = processobj[k] else: metadata[k] = v if loadingContext.loader: loadingContext.loader.graph += loadopt.graph # Need to match the document loader's index with the fast parser index # Get the base URI (no fragments) for documents that use $graph nofrag = urllib.parse.urldefrag(uri)[0] flag = "fastparser-idx-from:" + nofrag if not loadingContext.loader.idx.get(flag): objects, loadopt = loadingContext.codegen_idx[nofrag] fileobj = cmap( cast( Union[int, float, str, Dict[str, Any], List[Any], None], cwl_v1_2.save(objects, relative_uris=False), ) ) visit_class( fileobj, ("CommandLineTool", "Workflow", "ExpressionTool"), partial(update_index, loadingContext.loader), ) loadingContext.loader.idx[flag] = flag for u in lopt.imports: loadingContext.loader.idx["import:" + u] = "import:" + u for u in lopt.includes: loadingContext.loader.idx["include:" + u] = "include:" + u return cast( Union[CommentedMap, CommentedSeq], cmap(cast(Union[Dict[str, Any], List[Any]], processobj)), ), cast(CommentedMap, cmap(metadata))
[docs]def resolve_and_validate_document( loadingContext: LoadingContext, workflowobj: Union[CommentedMap, CommentedSeq], uri: str, preprocess_only: bool = False, ) -> Tuple[LoadingContext, str]: """Validate a CWL document.""" if not loadingContext.loader: raise ValueError("loadingContext must have a loader.") else: loader = loadingContext.loader loadingContext = loadingContext.copy() if not isinstance(workflowobj, MutableMapping): raise ValueError(f"workflowjobj must be a dict, got {type(workflowobj)!r}: {workflowobj}") jobobj = None if "cwl:tool" in workflowobj: jobobj, _ = loader.resolve_all(workflowobj, uri) uri = urllib.parse.urljoin(uri, workflowobj["https://w3id.org/cwl/cwl#tool"]) del cast(Dict[str, Any], jobobj)["https://w3id.org/cwl/cwl#tool"] workflowobj = fetch_document(uri, loadingContext)[1] fileuri = urllib.parse.urldefrag(uri)[0] metadata: CWLObjectType cwlVersion = loadingContext.metadata.get("cwlVersion") if not cwlVersion: cwlVersion = workflowobj.get("cwlVersion") if not cwlVersion and fileuri != uri: # The tool we're loading is a fragment of a bigger file. Get # the document root element and look for cwlVersion there. metadata = cast(CWLObjectType, fetch_document(fileuri, loadingContext)[1]) cwlVersion = cast(str, metadata.get("cwlVersion")) if not cwlVersion: raise ValidationException( "No cwlVersion found. " "Use the following syntax in your CWL document to declare " "the version: cwlVersion: <version>.\n" "Note: if this is a CWL draft-3 (pre v1.0) document then it " "will need to be upgraded first using https://pypi.org/project/cwl-upgrader/ . " "'sbg:draft-2' documents can be upgraded using " "https://pypi.org/project/sevenbridges-cwl-draft2-upgrader/ ." ) if not isinstance(cwlVersion, str): with SourceLine(workflowobj, "cwlVersion", ValidationException, loadingContext.debug): raise ValidationException(f"'cwlVersion' must be a string, got {type(cwlVersion)}") # strip out version cwlVersion = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", cwlVersion) if cwlVersion not in list(ALLUPDATES): # print out all the Supported Versions of cwlVersion versions = [] for version in list(ALLUPDATES): if "dev" in version: version += " (with --enable-dev flag only)" versions.append(version) versions.sort() raise ValidationException( "The CWL reference runner no longer supports pre CWL v1.0 " "documents. Supported versions are: " "\n{}".format("\n".join(versions)) ) if isinstance(jobobj, CommentedMap) and "http://commonwl.org/cwltool#overrides" in jobobj: loadingContext.overrides_list.extend(resolve_overrides(jobobj, uri, uri)) del jobobj["http://commonwl.org/cwltool#overrides"] if isinstance(jobobj, CommentedMap) and "https://w3id.org/cwl/cwl#requirements" in jobobj: if cwlVersion not in ("v1.1.0-dev1", "v1.1"): raise ValidationException( "`cwl:requirements` in the input object is not part of CWL " "v1.0. You can adjust to use `cwltool:overrides` instead; or you " "can set the cwlVersion to v1.1 or greater." ) loadingContext.overrides_list.append( { "overrideTarget": uri, "requirements": jobobj["https://w3id.org/cwl/cwl#requirements"], } ) del jobobj["https://w3id.org/cwl/cwl#requirements"] (sch_document_loader, avsc_names) = process.get_schema(cwlVersion)[:2] if isinstance(avsc_names, Exception): raise avsc_names processobj: ResolveType document_loader = Loader( sch_document_loader.ctx, schemagraph=sch_document_loader.graph, idx=loader.idx, cache=sch_document_loader.cache, fetcher_constructor=loadingContext.fetcher_constructor, skip_schemas=loadingContext.skip_schemas, doc_cache=loadingContext.doc_cache, ) loadingContext.loader = document_loader if cwlVersion == "v1.0": _add_blank_ids(workflowobj) if cwlVersion != "v1.2": loadingContext.fast_parser = False if loadingContext.skip_resolve_all: # Some integrations (e.g. Arvados) loads documents, makes # in-memory changes to them (which are applied to the objects # in the document_loader index), and then sends them back # through the loading machinery. # # In this case, the functions of resolve_all() have already # happened. Because resolve_all() is expensive, we don't want # to do it again if it's going to be a no-op, so the # skip_resolve_all flag tells us just to use the document # as-is from the loader index. # # Note that at the moment, fast_parser code path is considered # functionally the same as resolve_all() for this case. # processobj, metadata = document_loader.resolve_ref(uri) elif loadingContext.fast_parser: processobj, metadata = fast_parser( workflowobj, fileuri, uri, loadingContext, document_loader.fetcher ) else: document_loader.resolve_all(workflowobj, fileuri) processobj, metadata = document_loader.resolve_ref(uri) if not isinstance(processobj, (CommentedMap, CommentedSeq)): raise ValidationException("Workflow must be a CommentedMap or CommentedSeq.") if not hasattr(processobj.lc, "filename"): processobj.lc.filename = fileuri if loadingContext.metadata: metadata = loadingContext.metadata # Make a shallow copy. If we do a version update later, metadata # will be updated, we don't want to write through and change the # original object. metadata = copy.copy(metadata) if not isinstance(metadata, CommentedMap): raise ValidationException("metadata must be a CommentedMap, was %s" % type(metadata)) if isinstance(processobj, CommentedMap): uri = processobj["id"] if not loadingContext.fast_parser: _convert_stdstreams_to_files(workflowobj) if isinstance(jobobj, CommentedMap): loadingContext.jobdefaults = jobobj loadingContext.avsc_names = avsc_names loadingContext.metadata = metadata if preprocess_only: return loadingContext, uri if loadingContext.do_validate: validate_doc(avsc_names, processobj, document_loader, loadingContext.strict) # None means default behavior (do update) if loadingContext.do_update in (True, None): if "cwlVersion" not in metadata: metadata["cwlVersion"] = cwlVersion processobj = update.update( processobj, document_loader, fileuri, loadingContext.enable_dev, metadata ) document_loader.idx[processobj["id"]] = processobj visit_class( processobj, ("CommandLineTool", "Workflow", "ExpressionTool"), partial(update_index, document_loader), ) return loadingContext, uri
def make_tool( uri: Union[str, CommentedMap, CommentedSeq], loadingContext: LoadingContext ) -> Process: """Make a Python CWL object.""" if loadingContext.loader is None: raise ValueError("loadingContext must have a loader") resolveduri: Union[float, str, CommentedMap, CommentedSeq, None] metadata: CWLObjectType if loadingContext.fast_parser and isinstance(uri, str) and not loadingContext.skip_resolve_all: resolveduri, metadata = fast_parser( None, None, uri, loadingContext, loadingContext.loader.fetcher ) else: resolveduri, metadata = loadingContext.loader.resolve_ref(uri) processobj = None if isinstance(resolveduri, MutableSequence): for obj in resolveduri: if obj["id"].endswith("#main"): processobj = obj break if not processobj: raise GraphTargetMissingException( "Tool file contains graph of multiple objects, must specify " "one of #%s" % ", #".join(urllib.parse.urldefrag(i["id"])[1] for i in resolveduri if "id" in i) ) elif isinstance(resolveduri, MutableMapping): processobj = resolveduri else: raise Exception("Must resolve to list or dict") tool = loadingContext.construct_tool_object(processobj, loadingContext) if loadingContext.jobdefaults: jobobj = loadingContext.jobdefaults for inp in tool.tool["inputs"]: if shortname(inp["id"]) in jobobj: inp["default"] = jobobj[shortname(inp["id"])] return tool def load_tool( argsworkflow: Union[str, CWLObjectType], loadingContext: Optional[LoadingContext] = None, ) -> Process: loadingContext, workflowobj, uri = fetch_document(argsworkflow, loadingContext) loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri, ) return make_tool(uri, loadingContext) def resolve_overrides( ov: IdxResultType, ov_uri: str, baseurl: str, ) -> List[CWLObjectType]: ovloader = Loader(overrides_ctx) ret, _ = ovloader.resolve_all(ov, baseurl) if not isinstance(ret, CommentedMap): raise Exception("Expected CommentedMap, got %s" % type(ret)) cwl_docloader = get_schema("v1.0")[0] cwl_docloader.resolve_all(ret, ov_uri) return cast(List[CWLObjectType], ret["http://commonwl.org/cwltool#overrides"]) def load_overrides(ov: str, base_url: str) -> List[CWLObjectType]: ovloader = Loader(overrides_ctx) return resolve_overrides(ovloader.fetch(ov), ov, base_url) def recursive_resolve_and_validate_document( loadingContext: LoadingContext, workflowobj: Union[CommentedMap, CommentedSeq], uri: str, preprocess_only: bool = False, ) -> Tuple[LoadingContext, str, Process]: """Validate a CWL document, checking that a tool object can be built.""" loadingContext, uri = resolve_and_validate_document( loadingContext, workflowobj, uri, preprocess_only=preprocess_only, ) tool = make_tool(uri, loadingContext) return loadingContext, uri, tool