Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for cwltool.context
import copy
import threading # pylint: disable=unused-import
from .utils import DEFAULT_TMP_PREFIX
from .stdfsaccess import StdFsAccess
from typing import (Any, Callable, Dict, # pylint: disable=unused-import
Generator, Iterable, List, Optional, Text, Union, AnyStr)
from schema_salad.ref_resolver import ( # pylint: disable=unused-import
ContextType, Fetcher, Loader)
import schema_salad.schema as schema
from .builder import Builder, HasReqsHints
from .mutation import MutationManager
from .software_requirements import DependenciesConfiguration
from .secrets import SecretStore
import six
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .process import Process
from .provenance import (ResearchObject, # pylint: disable=unused-import
CreateProvProfile)
class ContextBase(object):
def __init__(self, kwargs=None):
# type: (Optional[Dict[str, Any]]) -> None
if kwargs:
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
def make_tool_notimpl(toolpath_object, # type: Dict[Text, Any]
loadingContext # type: LoadingContext
): # type: (...) -> Process
raise NotImplementedError()
default_make_tool = make_tool_notimpl # type: Callable[[Dict[Text, Any], LoadingContext], Process]
[docs]class LoadingContext(ContextBase):
[docs] def __init__(self, kwargs=None):
# type: (Optional[Dict[str, Any]]) -> None
self.debug = False # type: bool
self.metadata = {} # type: Dict[Text, Any]
self.requirements = None
self.hints = None
self.overrides_list = [] # type: List[Dict[Text, Any]]
self.loader = None # type: Optional[Loader]
self.avsc_names = None # type: Optional[schema.Names]
self.disable_js_validation = False # type: bool
self.js_hint_options_file = None
self.do_validate = True # type: bool
self.enable_dev = False # type: bool
self.strict = True # type: bool
self.resolver = None
self.fetcher_constructor = None
self.construct_tool_object = default_make_tool
self.research_obj = None # type: Optional[ResearchObject]
self.orcid = None
self.cwl_full_name = None
self.host_provenance = False # type: bool
self.user_provenance = False # type: bool
self.prov_obj = None # type: Optional[CreateProvProfile]
super(LoadingContext, self).__init__(kwargs)
class RuntimeContext(ContextBase):
def __init__(self, kwargs=None):
# type: (Optional[Dict[str, Any]]) -> None
select_resources_callable = Callable[ # pylint: disable=unused-variable
[Dict[str, int], RuntimeContext], Dict[str, int]]
self.user_space_docker_cmd = "" # type: Text
self.secret_store = None # type: Optional[SecretStore]
self.no_read_only = False # type: bool
self.custom_net = "" # type: Text
self.no_match_user = False # type: bool
self.preserve_environment = "" # type: Optional[Iterable[str]]
self.preserve_entire_environment = False # type: bool
self.use_container = True # type: bool
self.force_docker_pull = False # type: bool
self.tmp_outdir_prefix = DEFAULT_TMP_PREFIX # type: Text
self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: Text
self.tmpdir = "" # type: Text
self.rm_tmpdir = True # type: bool
self.pull_image = True # type: bool
self.rm_container = True # type: bool
self.move_outputs = "" # type: Text
self.singularity = False # type: bool
self.disable_net = None
self.debug = False # type: bool
self.compute_checksum = True # type: bool
self.name = "" # type: Text
self.default_container = "" # type: Text
self.find_default_container = None # type: Optional[Callable[[HasReqsHints], Optional[Text]]]
self.cachedir = None # type: Optional[Text]
self.outdir = None # type: Optional[Text]
self.stagedir = "" # type: Text
self.part_of = "" # type: Text
self.basedir = "" # type: Text
self.toplevel = False # type: bool
self.mutation_manager = None # type: Optional[MutationManager]
self.make_fs_access = StdFsAccess # type: Callable[[Text], StdFsAccess]
self.builder = None # type: Optional[Builder]
self.docker_outdir = "" # type: Text
self.docker_tmpdir = "" # type: Text
self.docker_stagedir = "" # type: Text
self.js_console = False # type: bool
self.job_script_provider = None # type: Optional[DependenciesConfiguration]
self.select_resources = None # type: Optional[select_resources_callable]
self.eval_timeout = 20 # type: float
self.postScatterEval = None # type: Optional[Callable[[Dict[Text, Any]], Dict[Text, Any]]]
self.on_error = "stop" # type: Text
self.strict_memory_limit = False # type: bool
self.record_container_id = None
self.cidfile_dir = None
self.cidfile_prefix = None
self.workflow_eval_lock = None # type: Optional[threading.Condition]
self.research_obj = None # type: Optional[ResearchObject]
self.orcid = None
self.cwl_full_name = None
self.process_run_id = None # type: Optional[str]
self.prov_obj = None # type: Optional[CreateProvProfile]
self.reference_locations = {} # type: Dict[Text, Text]
super(RuntimeContext, self).__init__(kwargs)
def copy(self):
# type: () -> RuntimeContext
return copy.copy(self)
def getdefault(val, default):
# type: (Any, Any) -> Any
if val is None:
return default
else:
return val