Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for cwltool.context
"""Shared context objects that replace use of kwargs."""
import copy
import os
import shutil
import tempfile
import threading
from typing import (
IO,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
TextIO,
Tuple,
Union,
)
# move to a regular typing import when Python 3.3-3.6 is no longer supported
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType
from typing_extensions import TYPE_CHECKING
from .builder import Builder
from .mpi import MpiConfig
from .mutation import MutationManager
from .pathmapper import PathMapper
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType
if TYPE_CHECKING:
from cwl_utils.parser.cwl_v1_2 import LoadingOptions
from .process import Process
from .provenance import ResearchObject # pylint: disable=unused-import
from .provenance_profile import ProvenanceProfile
class ContextBase:
"""Shared kwargs based initilizer for {Runtime,Loading}Context."""
def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize."""
if kwargs:
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
def make_tool_notimpl(
toolpath_object: CommentedMap, loadingContext: "LoadingContext"
) -> "Process":
raise NotImplementedError()
default_make_tool = make_tool_notimpl
def log_handler(
outdir: str,
base_path_logs: str,
stdout_path: Optional[str],
stderr_path: Optional[str],
) -> None:
"""Move logs from log location to final output."""
if outdir != base_path_logs:
if stdout_path:
new_stdout_path = stdout_path.replace(base_path_logs, outdir)
shutil.copy2(stdout_path, new_stdout_path)
if stderr_path:
new_stderr_path = stderr_path.replace(base_path_logs, outdir)
shutil.copy2(stderr_path, new_stderr_path)
def set_log_dir(outdir: str, log_dir: str, subdir_name: str) -> str:
"""Default handler for setting the log directory."""
if log_dir == "":
return outdir
else:
return log_dir + "/" + subdir_name
[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the LoadingContext from the kwargs."""
self.debug = False # type: bool
self.metadata = {} # type: CWLObjectType
self.requirements = None # type: Optional[List[CWLObjectType]]
self.hints = None # type: Optional[List[CWLObjectType]]
self.overrides_list = [] # type: List[CWLObjectType]
self.loader = None # type: Optional[Loader]
self.avsc_names = None # type: Optional[Names]
self.disable_js_validation = False # type: bool
self.js_hint_options_file: Optional[str] = None
self.do_validate = True # type: bool
self.enable_dev = False # type: bool
self.strict = True # type: bool
self.resolver = None # type: Optional[ResolverType]
self.fetcher_constructor = None # type: Optional[FetcherCallableType]
self.construct_tool_object = default_make_tool
self.research_obj = None # type: Optional[ResearchObject]
self.orcid = "" # type: str
self.cwl_full_name = "" # type: str
self.host_provenance = False # type: bool
self.user_provenance = False # type: bool
self.prov_obj = None # type: Optional[ProvenanceProfile]
self.do_update = None # type: Optional[bool]
self.jobdefaults = None # type: Optional[CommentedMap]
self.doc_cache = True # type: bool
self.relax_path_checks = False # type: bool
self.singularity = False # type: bool
self.podman = False # type: bool
self.eval_timeout: float = 60
self.codegen_idx: Dict[str, Tuple[Any, "LoadingOptions"]] = {}
self.fast_parser = False
self.skip_resolve_all = False
super().__init__(kwargs)
[docs]class RuntimeContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the RuntimeContext from the kwargs."""
select_resources_callable = Callable[ # pylint: disable=unused-variable
[Dict[str, Union[int, float]], RuntimeContext],
Dict[str, Union[int, float]],
]
self.user_space_docker_cmd = "" # type: Optional[str]
self.secret_store = None # type: Optional[SecretStore]
self.no_read_only = False # type: bool
self.custom_net = None # type: Optional[str]
self.no_match_user = False # type: bool
self.preserve_environment = None # type: Optional[Iterable[str]]
self.preserve_entire_environment = False # type: bool
self.use_container = True # type: bool
self.force_docker_pull = False # type: bool
self.tmp_outdir_prefix = "" # type: str
self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: str
self.tmpdir = "" # type: str
self.rm_tmpdir = True # type: bool
self.pull_image = True # type: bool
self.rm_container = True # type: bool
self.move_outputs = "move" # type: str
self.log_dir = "" # type: str
self.set_log_dir = set_log_dir
self.log_dir_handler = log_handler
self.streaming_allowed: bool = False
self.singularity = False # type: bool
self.podman = False # type: bool
self.debug = False # type: bool
self.compute_checksum = True # type: bool
self.name = "" # type: str
self.default_container = "" # type: Optional[str]
self.find_default_container = (
None
) # type: Optional[Callable[[HasReqsHints], Optional[str]]]
self.cachedir = None # type: Optional[str]
self.outdir = None # type: Optional[str]
self.stagedir = "" # type: str
self.part_of = "" # type: str
self.basedir = "" # type: str
self.toplevel = False # type: bool
self.mutation_manager = None # type: Optional[MutationManager]
self.make_fs_access = StdFsAccess
self.path_mapper = PathMapper
self.builder = None # type: Optional[Builder]
self.docker_outdir = "" # type: str
self.docker_tmpdir = "" # type: str
self.docker_stagedir = "" # type: str
self.js_console = False # type: bool
self.job_script_provider = None # type: Optional[DependenciesConfiguration]
self.select_resources = None # type: Optional[select_resources_callable]
self.eval_timeout = 60 # type: float
self.postScatterEval = (
None
) # type: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]]
self.on_error = "stop" # type: str
self.strict_memory_limit = False # type: bool
self.strict_cpu_limit = False # type: bool
self.cidfile_dir = None # type: Optional[str]
self.cidfile_prefix = None # type: Optional[str]
self.workflow_eval_lock = None # type: Optional[threading.Condition]
self.research_obj = None # type: Optional[ResearchObject]
self.orcid = "" # type: str
self.cwl_full_name = "" # type: str
self.process_run_id = None # type: Optional[str]
self.prov_obj = None # type: Optional[ProvenanceProfile]
self.mpi_config = MpiConfig() # type: MpiConfig
self.default_stdout = None # type: Optional[Union[IO[bytes], TextIO]]
self.default_stderr = None # type: Optional[Union[IO[bytes], TextIO]]
super().__init__(kwargs)
if self.tmp_outdir_prefix == "":
self.tmp_outdir_prefix = self.tmpdir_prefix
[docs] def get_outdir(self) -> str:
"""Return self.outdir or create one with self.tmp_outdir_prefix."""
if self.outdir:
return self.outdir
return self.create_outdir()
[docs] def get_tmpdir(self) -> str:
"""Return self.tmpdir or create one with self.tmpdir_prefix."""
if self.tmpdir:
return self.tmpdir
return self.create_tmpdir()
[docs] def get_stagedir(self) -> str:
"""Return self.stagedir or create one with self.tmpdir_prefix."""
if self.stagedir:
return self.stagedir
tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_tmpdir(self) -> str:
"""Create a temporary directory that respects self.tmpdir_prefix."""
tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_outdir(self) -> str:
"""Create a temporary directory that respects self.tmp_outdir_prefix."""
out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix)
return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
[docs]def getdefault(val, default):
# type: (Any, Any) -> Any
if val is None:
return default
else:
return val