Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for cwltool.context

"""Shared context objects that replace use of kwargs."""
import copy
import os
import tempfile
import threading
from typing import IO, Any, Callable, Dict, Iterable, List, Optional, TextIO, Union

# move to a regular typing import when Python 3.3-3.6 is no longer supported
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType
from typing_extensions import TYPE_CHECKING

from .builder import Builder
from .mpi import MpiConfig
from .mutation import MutationManager
from .pathmapper import PathMapper
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType

if TYPE_CHECKING:
    from .process import Process
    from .provenance import ResearchObject  # pylint: disable=unused-import
    from .provenance_profile import ProvenanceProfile


class ContextBase:
    """Shared kwargs based initilizer for {Runtime,Loading}Context."""

    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
        """Initialize."""
        if kwargs:
            for k, v in kwargs.items():
                if hasattr(self, k):
                    setattr(self, k, v)


def make_tool_notimpl(
    toolpath_object: CommentedMap, loadingContext: "LoadingContext"
) -> "Process":
    raise NotImplementedError()


default_make_tool = make_tool_notimpl


[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: """Initialize the LoadingContext from the kwargs.""" self.debug = False # type: bool self.metadata = {} # type: CWLObjectType self.requirements = None # type: Optional[List[CWLObjectType]] self.hints = None # type: Optional[List[CWLObjectType]] self.overrides_list = [] # type: List[CWLObjectType] self.loader = None # type: Optional[Loader] self.avsc_names = None # type: Optional[Names] self.disable_js_validation = False # type: bool self.js_hint_options_file: Optional[str] = None self.do_validate = True # type: bool self.enable_dev = False # type: bool self.strict = True # type: bool self.resolver = None # type: Optional[ResolverType] self.fetcher_constructor = None # type: Optional[FetcherCallableType] self.construct_tool_object = default_make_tool self.research_obj = None # type: Optional[ResearchObject] self.orcid = "" # type: str self.cwl_full_name = "" # type: str self.host_provenance = False # type: bool self.user_provenance = False # type: bool self.prov_obj = None # type: Optional[ProvenanceProfile] self.do_update = None # type: Optional[bool] self.jobdefaults = None # type: Optional[CommentedMap] self.doc_cache = True # type: bool self.relax_path_checks = False # type: bool self.singularity = False # type: bool self.podman = False # type: bool super().__init__(kwargs)
[docs] def copy(self): # type: () -> LoadingContext return copy.copy(self)
[docs]class RuntimeContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: """Initialize the RuntimeContext from the kwargs.""" select_resources_callable = Callable[ # pylint: disable=unused-variable [Dict[str, Union[int, float]], RuntimeContext], Dict[str, Union[int, float]], ] self.user_space_docker_cmd = "" # type: Optional[str] self.secret_store = None # type: Optional[SecretStore] self.no_read_only = False # type: bool self.custom_net = None # type: Optional[str] self.no_match_user = False # type: bool self.preserve_environment = None # type: Optional[Iterable[str]] self.preserve_entire_environment = False # type: bool self.use_container = True # type: bool self.force_docker_pull = False # type: bool self.tmp_outdir_prefix = "" # type: str self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: str self.tmpdir = "" # type: str self.rm_tmpdir = True # type: bool self.pull_image = True # type: bool self.rm_container = True # type: bool self.move_outputs = "move" # type: str self.streaming_allowed: bool = False self.singularity = False # type: bool self.podman = False # type: bool self.debug = False # type: bool self.compute_checksum = True # type: bool self.name = "" # type: str self.default_container = "" # type: Optional[str] self.find_default_container = ( None ) # type: Optional[Callable[[HasReqsHints], Optional[str]]] self.cachedir = None # type: Optional[str] self.outdir = None # type: Optional[str] self.stagedir = "" # type: str self.part_of = "" # type: str self.basedir = "" # type: str self.toplevel = False # type: bool self.mutation_manager = None # type: Optional[MutationManager] self.make_fs_access = StdFsAccess # type: Callable[[str], StdFsAccess] self.path_mapper = PathMapper self.builder = None # type: Optional[Builder] self.docker_outdir = "" # type: str self.docker_tmpdir = "" # type: str self.docker_stagedir = "" # type: str self.js_console = False # type: bool self.job_script_provider = None # type: Optional[DependenciesConfiguration] self.select_resources = None # type: Optional[select_resources_callable] self.eval_timeout = 20 # type: float self.postScatterEval = ( None ) # type: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] self.on_error = "stop" # type: str self.strict_memory_limit = False # type: bool self.cidfile_dir = None # type: Optional[str] self.cidfile_prefix = None # type: Optional[str] self.workflow_eval_lock = None # type: Optional[threading.Condition] self.research_obj = None # type: Optional[ResearchObject] self.orcid = "" # type: str self.cwl_full_name = "" # type: str self.process_run_id = None # type: Optional[str] self.prov_obj = None # type: Optional[ProvenanceProfile] self.mpi_config = MpiConfig() # type: MpiConfig self.default_stdout = None # type: Optional[Union[IO[bytes], TextIO]] self.default_stderr = None # type: Optional[Union[IO[bytes], TextIO]] super().__init__(kwargs) if self.tmp_outdir_prefix == "": self.tmp_outdir_prefix = self.tmpdir_prefix
[docs] def get_outdir(self) -> str: """Return self.outdir or create one with self.tmp_outdir_prefix.""" if self.outdir: return self.outdir return self.create_outdir()
[docs] def get_tmpdir(self) -> str: """Return self.tmpdir or create one with self.tmpdir_prefix.""" if self.tmpdir: return self.tmpdir return self.create_tmpdir()
[docs] def get_stagedir(self) -> str: """Return self.stagedir or create one with self.tmpdir_prefix.""" if self.stagedir: return self.stagedir tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_tmpdir(self) -> str: """Create a temporary directory that respects self.tmpdir_prefix.""" tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_outdir(self) -> str: """Create a temporary directory that respects self.tmp_outdir_prefix.""" out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix) return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
[docs] def copy(self): # type: () -> RuntimeContext return copy.copy(self)
[docs]def getdefault(val, default): # type: (Any, Any) -> Any if val is None: return default else: return val