Source code for cwltool.context

"""Shared context objects that replace use of kwargs."""

import copy
import os
import shutil
import tempfile
import threading
from typing import (
    IO,
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Literal,
    Optional,
    TextIO,
    Tuple,
    Union,
)

from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType

from .mpi import MpiConfig
from .pathmapper import PathMapper
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType

if TYPE_CHECKING:
    from cwl_utils.parser.cwl_v1_2 import LoadingOptions

    from .builder import Builder
    from .cwlprov.provenance_profile import ProvenanceProfile
    from .cwlprov.ro import ResearchObject
    from .mutation import MutationManager
    from .process import Process
    from .secrets import SecretStore
    from .software_requirements import DependenciesConfiguration


class ContextBase:
    """Shared kwargs based initializer for :py:class:`RuntimeContext` and :py:class:`LoadingContext`."""

    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
        """Initialize."""
        if kwargs:
            for k, v in kwargs.items():
                if hasattr(self, k):
                    setattr(self, k, v)


def make_tool_notimpl(toolpath_object: CommentedMap, loadingContext: "LoadingContext") -> "Process":
    """Fake implementation of the make tool function."""
    raise NotImplementedError()


default_make_tool = make_tool_notimpl


def log_handler(
    outdir: str,
    base_path_logs: str,
    stdout_path: Optional[str],
    stderr_path: Optional[str],
) -> None:
    """Move logs from log location to final output."""
    if outdir != base_path_logs:
        if stdout_path:
            new_stdout_path = stdout_path.replace(base_path_logs, outdir)
            shutil.copy2(stdout_path, new_stdout_path)
        if stderr_path:
            new_stderr_path = stderr_path.replace(base_path_logs, outdir)
            shutil.copy2(stderr_path, new_stderr_path)


def set_log_dir(outdir: str, log_dir: str, subdir_name: str) -> str:
    """Set the log directory."""
    if log_dir == "":
        return outdir
    else:
        return log_dir + "/" + subdir_name


[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: """Initialize the LoadingContext from the kwargs.""" self.debug: bool = False self.metadata: CWLObjectType = {} self.requirements: Optional[List[CWLObjectType]] = None self.hints: Optional[List[CWLObjectType]] = None self.overrides_list: List[CWLObjectType] = [] self.loader: Optional[Loader] = None self.avsc_names: Optional[Names] = None self.disable_js_validation: bool = False self.js_hint_options_file: Optional[str] = None self.do_validate: bool = True self.enable_dev: bool = False self.strict: bool = True self.resolver: Optional[ResolverType] = None self.fetcher_constructor: Optional[FetcherCallableType] = None self.construct_tool_object = default_make_tool self.research_obj: Optional[ResearchObject] = None self.orcid: str = "" self.cwl_full_name: str = "" self.host_provenance: bool = False self.user_provenance: bool = False self.prov_obj: Optional["ProvenanceProfile"] = None self.do_update: Optional[bool] = None self.jobdefaults: Optional[CommentedMap] = None self.doc_cache: bool = True self.relax_path_checks: bool = False self.singularity: bool = False self.podman: bool = False self.eval_timeout: float = 60 self.codegen_idx: Dict[str, Tuple[Any, "LoadingOptions"]] = {} self.fast_parser = False self.skip_resolve_all = False self.skip_schemas = False super().__init__(kwargs)
[docs] def copy(self) -> "LoadingContext": """Return a copy of this :py:class:`LoadingContext`.""" return copy.copy(self)
[docs]class RuntimeContext(ContextBase): outdir: Optional[str] = None tmpdir: str = "" tmpdir_prefix: str = DEFAULT_TMP_PREFIX tmp_outdir_prefix: str = "" stagedir: str = ""
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: """Initialize the RuntimeContext from the kwargs.""" select_resources_callable = Callable[ [Dict[str, Union[int, float]], RuntimeContext], Dict[str, Union[int, float]], ] self.user_space_docker_cmd: Optional[str] = None self.secret_store: Optional["SecretStore"] = None self.no_read_only: bool = False self.custom_net: Optional[str] = None self.no_match_user: bool = False self.preserve_environment: Optional[Iterable[str]] = None self.preserve_entire_environment: bool = False self.use_container: bool = True self.force_docker_pull: bool = False self.rm_tmpdir: bool = True self.pull_image: bool = True self.rm_container: bool = True self.move_outputs: Union[Literal["move"], Literal["leave"], Literal["copy"]] = "move" self.log_dir: str = "" self.set_log_dir = set_log_dir self.log_dir_handler = log_handler self.streaming_allowed: bool = False self.singularity: bool = False self.podman: bool = False self.debug: bool = False self.compute_checksum: bool = True self.name: str = "" self.default_container: Optional[str] = "" self.find_default_container: Optional[Callable[[HasReqsHints], Optional[str]]] = None self.cachedir: Optional[str] = None self.part_of: str = "" self.basedir: str = "" self.toplevel: bool = False self.mutation_manager: Optional["MutationManager"] = None self.make_fs_access = StdFsAccess self.path_mapper = PathMapper self.builder: Optional["Builder"] = None self.docker_outdir: str = "" self.docker_tmpdir: str = "" self.docker_stagedir: str = "" self.js_console: bool = False self.job_script_provider: Optional[DependenciesConfiguration] = None self.select_resources: Optional[select_resources_callable] = None self.eval_timeout: float = 60 self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop" self.strict_memory_limit: bool = False self.strict_cpu_limit: bool = False self.cidfile_dir: Optional[str] = None self.cidfile_prefix: Optional[str] = None self.workflow_eval_lock: Optional[threading.Condition] = None self.research_obj: Optional[ResearchObject] = None self.orcid: str = "" self.cwl_full_name: str = "" self.process_run_id: Optional[str] = None self.prov_obj: Optional[ProvenanceProfile] = None self.mpi_config: MpiConfig = MpiConfig() self.default_stdout: Optional[Union[IO[bytes], TextIO]] = None self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None self.validate_only: bool = False self.validate_stdout: Optional[Union[IO[bytes], TextIO, IO[str]]] = None super().__init__(kwargs) if self.tmp_outdir_prefix == "": self.tmp_outdir_prefix = self.tmpdir_prefix
[docs] def get_outdir(self) -> str: """Return :py:attr:`outdir` or create one with :py:attr:`tmp_outdir_prefix`.""" if self.outdir: return self.outdir return self.create_outdir()
[docs] def get_tmpdir(self) -> str: """Return :py:attr:`tmpdir` or create one with :py:attr:`tmpdir_prefix`.""" if self.tmpdir: return self.tmpdir return self.create_tmpdir()
[docs] def get_stagedir(self) -> str: """Return :py:attr:`stagedir` or create one with :py:attr:`tmpdir_prefix`.""" if self.stagedir: return self.stagedir tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_tmpdir(self) -> str: """Create a temporary directory that respects :py:attr:`tmpdir_prefix`.""" tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_outdir(self) -> str: """Create a temporary directory that respects :py:attr:`tmp_outdir_prefix`.""" out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix) return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
[docs] def copy(self) -> "RuntimeContext": """Return a copy of this :py:class:`RuntimeContext`.""" return copy.copy(self)
[docs]def getdefault(val: Any, default: Any) -> Any: """Return the ``val`` using the ``default`` as backup in case the val is ``None``.""" if val is None: return default else: return val