Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for cwltool.context

"""Shared context objects that replace use of kwargs."""
import copy
import os
import shutil
import tempfile
import threading
from typing import (
    IO,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    TextIO,
    Tuple,
    Union,
)

# move to a regular typing import when Python 3.3-3.6 is no longer supported
from ruamel.yaml.comments import CommentedMap
from schema_salad.avro.schema import Names
from schema_salad.ref_resolver import Loader
from schema_salad.utils import FetcherCallableType
from typing_extensions import TYPE_CHECKING

from .builder import Builder
from .mpi import MpiConfig
from .mutation import MutationManager
from .pathmapper import PathMapper
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType

if TYPE_CHECKING:
    from cwl_utils.parser.cwl_v1_2 import LoadingOptions

    from .process import Process
    from .provenance import ResearchObject  # pylint: disable=unused-import
    from .provenance_profile import ProvenanceProfile


class ContextBase:
    """Shared kwargs based initilizer for {Runtime,Loading}Context."""

    def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
        """Initialize."""
        if kwargs:
            for k, v in kwargs.items():
                if hasattr(self, k):
                    setattr(self, k, v)


def make_tool_notimpl(
    toolpath_object: CommentedMap, loadingContext: "LoadingContext"
) -> "Process":
    raise NotImplementedError()


default_make_tool = make_tool_notimpl


def log_handler(
    outdir: str,
    base_path_logs: str,
    stdout_path: Optional[str],
    stderr_path: Optional[str],
) -> None:
    """Move logs from log location to final output."""
    if outdir != base_path_logs:
        if stdout_path:
            new_stdout_path = stdout_path.replace(base_path_logs, outdir)
            shutil.copy2(stdout_path, new_stdout_path)
        if stderr_path:
            new_stderr_path = stderr_path.replace(base_path_logs, outdir)
            shutil.copy2(stderr_path, new_stderr_path)


def set_log_dir(outdir: str, log_dir: str, subdir_name: str) -> str:
    """Default handler for setting the log directory."""
    if log_dir == "":
        return outdir
    else:
        return log_dir + "/" + subdir_name


[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: """Initialize the LoadingContext from the kwargs.""" self.debug = False # type: bool self.metadata = {} # type: CWLObjectType self.requirements = None # type: Optional[List[CWLObjectType]] self.hints = None # type: Optional[List[CWLObjectType]] self.overrides_list = [] # type: List[CWLObjectType] self.loader = None # type: Optional[Loader] self.avsc_names = None # type: Optional[Names] self.disable_js_validation = False # type: bool self.js_hint_options_file: Optional[str] = None self.do_validate = True # type: bool self.enable_dev = False # type: bool self.strict = True # type: bool self.resolver = None # type: Optional[ResolverType] self.fetcher_constructor = None # type: Optional[FetcherCallableType] self.construct_tool_object = default_make_tool self.research_obj = None # type: Optional[ResearchObject] self.orcid = "" # type: str self.cwl_full_name = "" # type: str self.host_provenance = False # type: bool self.user_provenance = False # type: bool self.prov_obj = None # type: Optional[ProvenanceProfile] self.do_update = None # type: Optional[bool] self.jobdefaults = None # type: Optional[CommentedMap] self.doc_cache = True # type: bool self.relax_path_checks = False # type: bool self.singularity = False # type: bool self.podman = False # type: bool self.eval_timeout: float = 60 self.codegen_idx: Dict[str, Tuple[Any, "LoadingOptions"]] = {} self.fast_parser = False self.skip_resolve_all = False super().__init__(kwargs)
[docs] def copy(self): # type: () -> LoadingContext return copy.copy(self)
[docs]class RuntimeContext(ContextBase):
[docs] def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None: """Initialize the RuntimeContext from the kwargs.""" select_resources_callable = Callable[ # pylint: disable=unused-variable [Dict[str, Union[int, float]], RuntimeContext], Dict[str, Union[int, float]], ] self.user_space_docker_cmd = "" # type: Optional[str] self.secret_store = None # type: Optional[SecretStore] self.no_read_only = False # type: bool self.custom_net = None # type: Optional[str] self.no_match_user = False # type: bool self.preserve_environment = None # type: Optional[Iterable[str]] self.preserve_entire_environment = False # type: bool self.use_container = True # type: bool self.force_docker_pull = False # type: bool self.tmp_outdir_prefix = "" # type: str self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: str self.tmpdir = "" # type: str self.rm_tmpdir = True # type: bool self.pull_image = True # type: bool self.rm_container = True # type: bool self.move_outputs = "move" # type: str self.log_dir = "" # type: str self.set_log_dir = set_log_dir self.log_dir_handler = log_handler self.streaming_allowed: bool = False self.singularity = False # type: bool self.podman = False # type: bool self.debug = False # type: bool self.compute_checksum = True # type: bool self.name = "" # type: str self.default_container = "" # type: Optional[str] self.find_default_container = ( None ) # type: Optional[Callable[[HasReqsHints], Optional[str]]] self.cachedir = None # type: Optional[str] self.outdir = None # type: Optional[str] self.stagedir = "" # type: str self.part_of = "" # type: str self.basedir = "" # type: str self.toplevel = False # type: bool self.mutation_manager = None # type: Optional[MutationManager] self.make_fs_access = StdFsAccess self.path_mapper = PathMapper self.builder = None # type: Optional[Builder] self.docker_outdir = "" # type: str self.docker_tmpdir = "" # type: str self.docker_stagedir = "" # type: str self.js_console = False # type: bool self.job_script_provider = None # type: Optional[DependenciesConfiguration] self.select_resources = None # type: Optional[select_resources_callable] self.eval_timeout = 60 # type: float self.postScatterEval = ( None ) # type: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] self.on_error = "stop" # type: str self.strict_memory_limit = False # type: bool self.strict_cpu_limit = False # type: bool self.cidfile_dir = None # type: Optional[str] self.cidfile_prefix = None # type: Optional[str] self.workflow_eval_lock = None # type: Optional[threading.Condition] self.research_obj = None # type: Optional[ResearchObject] self.orcid = "" # type: str self.cwl_full_name = "" # type: str self.process_run_id = None # type: Optional[str] self.prov_obj = None # type: Optional[ProvenanceProfile] self.mpi_config = MpiConfig() # type: MpiConfig self.default_stdout = None # type: Optional[Union[IO[bytes], TextIO]] self.default_stderr = None # type: Optional[Union[IO[bytes], TextIO]] super().__init__(kwargs) if self.tmp_outdir_prefix == "": self.tmp_outdir_prefix = self.tmpdir_prefix
[docs] def get_outdir(self) -> str: """Return self.outdir or create one with self.tmp_outdir_prefix.""" if self.outdir: return self.outdir return self.create_outdir()
[docs] def get_tmpdir(self) -> str: """Return self.tmpdir or create one with self.tmpdir_prefix.""" if self.tmpdir: return self.tmpdir return self.create_tmpdir()
[docs] def get_stagedir(self) -> str: """Return self.stagedir or create one with self.tmpdir_prefix.""" if self.stagedir: return self.stagedir tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_tmpdir(self) -> str: """Create a temporary directory that respects self.tmpdir_prefix.""" tmp_dir, tmp_prefix = os.path.split(self.tmpdir_prefix) return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
[docs] def create_outdir(self) -> str: """Create a temporary directory that respects self.tmp_outdir_prefix.""" out_dir, out_prefix = os.path.split(self.tmp_outdir_prefix) return tempfile.mkdtemp(prefix=out_prefix, dir=out_dir)
[docs] def copy(self): # type: () -> RuntimeContext return copy.copy(self)
[docs]def getdefault(val, default): # type: (Any, Any) -> Any if val is None: return default else: return val