Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for cwltool.context
"""Shared context objects that replace use of kwargs."""
import copy
import os
import tempfile
import threading
from typing import IO, Any, Callable, Dict, Iterable, List, Optional, TextIO, Union
# move to a regular typing import when Python 3.3-3.6 is no longer supported
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType
from typing_extensions import TYPE_CHECKING
from .builder import Builder
from .mpi import MpiConfig
from .mutation import MutationManager
from .pathmapper import PathMapper
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType
if TYPE_CHECKING:
from .process import Process
from .provenance import ResearchObject # pylint: disable=unused-import
from .provenance_profile import ProvenanceProfile
class ContextBase:
"""Shared kwargs based initilizer for {Runtime,Loading}Context."""
def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize."""
if kwargs:
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
def make_tool_notimpl(
toolpath_object: CommentedMap, loadingContext: "LoadingContext"
) -> "Process":
raise NotImplementedError()
default_make_tool = make_tool_notimpl
[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the LoadingContext from the kwargs."""
self.debug = False # type: bool
self.metadata = {} # type: CWLObjectType
self.requirements = None # type: Optional[List[CWLObjectType]]
self.hints = None # type: Optional[List[CWLObjectType]]
self.overrides_list = [] # type: List[CWLObjectType]
self.loader = None # type: Optional[Loader]
self.avsc_names = None # type: Optional[Names]
self.disable_js_validation = False # type: bool
self.js_hint_options_file: Optional[str] = None
self.do_validate = True # type: bool
self.enable_dev = False # type: bool
self.strict = True # type: bool
self.resolver = None # type: Optional[ResolverType]
self.fetcher_constructor = None # type: Optional[FetcherCallableType]
self.construct_tool_object = default_make_tool
self.research_obj = None # type: Optional[ResearchObject]
self.orcid = "" # type: str
self.cwl_full_name = "" # type: str
self.host_provenance = False # type: bool
self.user_provenance = False # type: bool
self.prov_obj = None # type: Optional[ProvenanceProfile]
self.do_update = None # type: Optional[bool]
self.jobdefaults = None # type: Optional[CommentedMap]
self.doc_cache = True # type: bool
self.relax_path_checks = False # type: bool
self.singularity = False # type: bool
self.podman = False # type: bool
super().__init__(kwargs)
[docs]class RuntimeContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the RuntimeContext from the kwargs."""
select_resources_callable = Callable[ # pylint: disable=unused-variable
[Dict[str, Union[int, float]], RuntimeContext],
Dict[str, Union[int, float]],
]
self.user_space_docker_cmd = "" # type: Optional[str]
self.secret_store = None # type: Optional[SecretStore]
self.no_read_only = False # type: bool
self.custom_net = None # type: Optional[str]
self.no_match_user = False # type: bool
self.preserve_environment = None # type: Optional[Iterable[str]]
self.preserve_entire_environment = False # type: bool
self.use_container = True # type: bool
self.force_docker_pull = False # type: bool
self.tmp_outdir_prefix = "" # type: str
self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: str
self.tmpdir = "" # type: str
self.rm_tmpdir = True # type: bool
self.pull_image = True # type: bool
self.rm_container = True # type: bool
self.move_outputs = "move" # type: str
self.streaming_allowed: bool = False
self.singularity = False # type: bool
self.podman = False # type: bool
self.debug = False # type: bool
self.compute_checksum = True # type: bool
self.name = "" # type: str
self.default_container = "" # type: Optional[str]
self.find_default_container = (
None
) # type: Optional[Callable[[HasReqsHints], Optional[str]]]
self.cachedir = None # type: Optional[str]
self.outdir = None # type: Optional[str]
self.stagedir = "" # type: str
self.part_of = "" # type: str
self.basedir = "" # type: str
self.toplevel = False # type: bool
self.mutation_manager = None # type: Optional[MutationManager]
self.make_fs_access = StdFsAccess # type: Callable[[str], StdFsAccess]
self.path_mapper = PathMapper
self.builder = None # type: Optional[Builder]
self.docker_outdir = "" # type: str
self.docker_tmpdir = "" # type: str
self.docker_stagedir = "" # type: str
self.js_console = False # type: bool
self.job_script_provider = None # type: Optional[DependenciesConfiguration]
self.select_resources = None # type: Optional[select_resources_callable]
self.eval_timeout = 20 # type: float
self.postScatterEval = (
None
) # type: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]]
self.on_error = "stop" # type: str
self.strict_memory_limit = False # type: bool
self.cidfile_dir = None # type: Optional[str]
self.cidfile_prefix = None # type: Optional[str]
self.workflow_eval_lock = None # type: Optional[threading.Condition]
self.research_obj = None # type: Optional[ResearchObject]
self.orcid = "" # type: str
self.cwl_full_name = "" # type: str
self.process_run_id = None # type: Optional[str]
self.prov_obj = None # type: Optional[ProvenanceProfile]
self.mpi_config = MpiConfig() # type: MpiConfig
self.default_stdout = None # type: Optional[Union[IO[bytes], TextIO]]
self.default_stderr = None # type: Optional[Union[IO[bytes], TextIO]]
super().__init__(kwargs)
if self.tmp_outdir_prefix == "":
self.tmp_outdir_prefix = self.tmpdir_prefix
[docs] def get_outdir(self) -> str:
"""Return self.outdir or create one with self.tmp_outdir_prefix."""
if self.outdir:
return self.outdir
return self.create_outdir()
[docs] def get_tmpdir(self) -> str:
"""Return self.tmpdir or create one with self.tmpdir_prefix."""
if self.tmpdir:
return self.tmpdir
return self.create_tmpdir()
[docs] def get_stagedir(self) -> str:
"""Return self.stagedir or create one with self.tmpdir_prefix."""
if self.stagedir:
return self.stagedir
tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_tmpdir(self) -> str:
"""Create a temporary directory that respects self.tmpdir_prefix."""
tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_outdir(self) -> str:
"""Create a temporary directory that respects self.tmp_outdir_prefix."""
out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix)
return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
[docs]def getdefault(val, default):
# type: (Any, Any) -> Any
if val is None:
return default
else:
return val