Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for cwltool.context
"""Shared context objects that replace use of kwargs."""
import copy
import os
import shutil
import tempfile
import threading
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
TextIO,
Tuple,
Union,
)
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType
from .mpi import MpiConfig
from .pathmapper import PathMapper
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType
if TYPE_CHECKING:
from _typeshed import SupportsWrite
from cwl_utils.parser.cwl_v1_2 import LoadingOptions
from .builder import Builder
from .cwlprov.provenance_profile import ProvenanceProfile
from .cwlprov.ro import ResearchObject
from .mutation import MutationManager
from .process import Process
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
class ContextBase:
"""Shared kwargs based initializer for :py:class:`RuntimeContext` and :py:class:`LoadingContext`."""
def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize."""
if kwargs:
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
def make_tool_notimpl(toolpath_object: CommentedMap, loadingContext: "LoadingContext") -> "Process":
"""Fake implementation of the make tool function."""
raise NotImplementedError()
default_make_tool = make_tool_notimpl
def log_handler(
outdir: str,
base_path_logs: str,
stdout_path: Optional[str],
stderr_path: Optional[str],
) -> None:
"""Move logs from log location to final output."""
if outdir != base_path_logs:
if stdout_path:
new_stdout_path = stdout_path.replace(base_path_logs, outdir)
shutil.copy2(stdout_path, new_stdout_path)
if stderr_path:
new_stderr_path = stderr_path.replace(base_path_logs, outdir)
shutil.copy2(stderr_path, new_stderr_path)
def set_log_dir(outdir: str, log_dir: str, subdir_name: str) -> str:
"""Set the log directory."""
if log_dir == "":
return outdir
else:
return log_dir + "/" + subdir_name
[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the LoadingContext from the kwargs."""
self.debug: bool = False
self.metadata: CWLObjectType = {}
self.requirements: Optional[List[CWLObjectType]] = None
self.hints: Optional[List[CWLObjectType]] = None
self.overrides_list: List[CWLObjectType] = []
self.loader: Optional[Loader] = None
self.avsc_names: Optional[Names] = None
self.disable_js_validation: bool = False
self.js_hint_options_file: Optional[str] = None
self.do_validate: bool = True
self.enable_dev: bool = False
self.strict: bool = True
self.resolver: Optional[ResolverType] = None
self.fetcher_constructor: Optional[FetcherCallableType] = None
self.construct_tool_object = default_make_tool
self.research_obj: Optional[ResearchObject] = None
self.orcid: str = ""
self.cwl_full_name: str = ""
self.host_provenance: bool = False
self.user_provenance: bool = False
self.prov_obj: Optional["ProvenanceProfile"] = None
self.do_update: Optional[bool] = None
self.jobdefaults: Optional[CommentedMap] = None
self.doc_cache: bool = True
self.relax_path_checks: bool = False
self.singularity: bool = False
self.podman: bool = False
self.eval_timeout: float = 60
self.codegen_idx: Dict[str, Tuple[Any, "LoadingOptions"]] = {}
self.fast_parser = False
self.skip_resolve_all = False
self.skip_schemas = False
super().__init__(kwargs)
[docs] def copy(self) -> "LoadingContext":
"""Return a copy of this :py:class:`LoadingContext`."""
return copy.copy(self)
[docs]class RuntimeContext(ContextBase):
outdir: Optional[str] = None
tmpdir: str = ""
tmpdir_prefix: str = DEFAULT_TMP_PREFIX
tmp_outdir_prefix: str = ""
stagedir: str = ""
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the RuntimeContext from the kwargs."""
select_resources_callable = Callable[
[Dict[str, Union[int, float]], RuntimeContext],
Dict[str, Union[int, float]],
]
self.user_space_docker_cmd: Optional[str] = None
self.secret_store: Optional["SecretStore"] = None
self.no_read_only: bool = False
self.custom_net: Optional[str] = None
self.no_match_user: bool = False
self.preserve_environment: Optional[Iterable[str]] = None
self.preserve_entire_environment: bool = False
self.use_container: bool = True
self.force_docker_pull: bool = False
self.rm_tmpdir: bool = True
self.pull_image: bool = True
self.rm_container: bool = True
self.move_outputs: Union[Literal["move"], Literal["leave"], Literal["copy"]] = "move"
self.log_dir: str = ""
self.set_log_dir = set_log_dir
self.log_dir_handler = log_handler
self.streaming_allowed: bool = False
self.singularity: bool = False
self.podman: bool = False
self.debug: bool = False
self.compute_checksum: bool = True
self.name: str = ""
self.default_container: Optional[str] = ""
self.find_default_container: Optional[Callable[[HasReqsHints], Optional[str]]] = None
self.cachedir: Optional[str] = None
self.part_of: str = ""
self.basedir: str = ""
self.toplevel: bool = False
self.mutation_manager: Optional["MutationManager"] = None
self.make_fs_access = StdFsAccess
self.path_mapper = PathMapper
self.builder: Optional["Builder"] = None
self.docker_outdir: str = ""
self.docker_tmpdir: str = ""
self.docker_stagedir: str = ""
self.js_console: bool = False
self.job_script_provider: Optional[DependenciesConfiguration] = None
self.select_resources: Optional[select_resources_callable] = None
self.eval_timeout: float = 60
self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None
self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop"
self.strict_memory_limit: bool = False
self.strict_cpu_limit: bool = False
self.cidfile_dir: Optional[str] = None
self.cidfile_prefix: Optional[str] = None
self.workflow_eval_lock: Optional[threading.Condition] = None
self.research_obj: Optional[ResearchObject] = None
self.orcid: str = ""
self.cwl_full_name: str = ""
self.process_run_id: Optional[str] = None
self.prov_obj: Optional[ProvenanceProfile] = None
self.mpi_config: MpiConfig = MpiConfig()
self.default_stdout: Optional[Union[IO[bytes], TextIO]] = None
self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None
self.validate_only: bool = False
self.validate_stdout: Optional["SupportsWrite[str]"] = None
super().__init__(kwargs)
if self.tmp_outdir_prefix == "":
self.tmp_outdir_prefix = self.tmpdir_prefix
[docs] def get_outdir(self) -> str:
"""Return :py:attr:`outdir` or create one with :py:attr:`tmp_outdir_prefix`."""
if self.outdir:
return self.outdir
return self.create_outdir()
[docs] def get_tmpdir(self) -> str:
"""Return :py:attr:`tmpdir` or create one with :py:attr:`tmpdir_prefix`."""
if self.tmpdir:
return self.tmpdir
return self.create_tmpdir()
[docs] def get_stagedir(self) -> str:
"""Return :py:attr:`stagedir` or create one with :py:attr:`tmpdir_prefix`."""
if self.stagedir:
return self.stagedir
tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_tmpdir(self) -> str:
"""Create a temporary directory that respects :py:attr:`tmpdir_prefix`."""
tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_outdir(self) -> str:
"""Create a temporary directory that respects :py:attr:`tmp_outdir_prefix`."""
out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix)
return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
[docs] def copy(self) -> "RuntimeContext":
"""Return a copy of this :py:class:`RuntimeContext`."""
return copy.copy(self)
[docs]def getdefault(val: Any, default: Any) -> Any:
"""Return the ``val`` using the ``default`` as backup in case the val is ``None``."""
if val is None:
return default
else:
return val