This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.deps.container_classes

import os
import string
from abc import (
from logging import getLogger
from typing import (
from uuid import uuid4

from packaging.version import Version
from typing_extensions import Protocol

from galaxy.util import (
from . import (
from .container_volumes import DockerVolume
from .requirements import (

    from .dependencies import (
    from .requirements import ContainerDescription

log = getLogger(__name__)

TRAP_KILL_CONTAINER = "trap _on_exit EXIT"

python << EOF
from __future__ import print_function

import json
import re
import subprocess
import tarfile

t = tarfile.TarFile("${cached_image_file}")
meta_str = t.extractfile('repositories').read()
meta = json.loads(meta_str)
tag, tag_value = next(iter(meta.items()))
rev, rev_value = next(iter(tag_value.items()))
cmd = "${images_cmd}"
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
stdo, stde = proc.communicate()
found = False
for line in stdo.split("\n"):
    tmp = re.split(r'\s+', line)
    if tmp[0] == tag and tmp[1] == rev and tmp[2] == rev_value:
        found = True
if not found:
    print("Loading image")
    cmd = "cat ${cached_image_file} | ${load_cmd}"
    subprocess.check_call(cmd, shell=True)
# Check if container was created by installing conda packages,
# and if so, source scripts to populate environment variables
# that would be set by activating the conda environment.
if [ -d /usr/local/etc/conda/activate.d ]; then
  export CONDA_PREFIX=/usr/local
  for f in /usr/local/etc/conda/activate.d/*.sh; do
    case "$f" in
      "/usr/local/etc/conda/activate.d/activate-"*) :;;
      *) . "$f" ;;

[docs]class ContainerProtocol(Protocol): """ Helper class to allow typing for the HasDockerLikeVolumes mixin """ @property def app_info(self) -> "AppInfo": ... @property def tool_info(self) -> "ToolInfo": ... @property def job_info(self) -> Optional["JobInfo"]: ...
[docs]class Container(metaclass=ABCMeta): """ TODO The container resolvers currently initialize job_info as None, ContainerFinder.find_container fixes this by constructing a new container with job_info (see __destination_container) """ container_type: str
[docs] def __init__( self, container_id: str, app_info: "AppInfo", tool_info: "ToolInfo", destination_info: Dict[str, Any], job_info: Optional["JobInfo"], container_description: Optional["ContainerDescription"], container_name: Optional[str] = None, ) -> None: self.container_id = container_id self.app_info = app_info self.tool_info = tool_info self.destination_info = destination_info self.job_info = job_info self.container_description = container_description self.container_name = container_name or uuid4().hex self.container_info: Dict[str, Any] = {}
[docs] def prop(self, name: str, default: Any) -> Any: destination_name = f"{self.container_type}_{name}" return self.destination_info.get(destination_name, default)
@property def resolve_dependencies(self) -> bool: return ( DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES if not self.container_description else self.container_description.resolve_dependencies ) @property def shell(self) -> str: return DEFAULT_CONTAINER_SHELL if not self.container_description else self.container_description.shell @property def source_environment(self) -> str: if self.container_description and not self.container_description.explicit: return SOURCE_CONDA_ACTIVATE return ""
[docs] @abstractmethod def containerize_command(self, command: str) -> str: """ Use destination supplied container configuration parameters, container_id, and command to build a new command that runs input command in container. """
[docs]class Volume: """ helper class to manage a container volume string """
[docs] def __init__(self, rawstr: str, container_type: str): self.source, self.target, self.mode = Volume.parse_volume_str(rawstr) self.container_type = container_type
[docs] @staticmethod def parse_volume_str(rawstr: str) -> Tuple[str, str, str]: """ >>> Volume.parse_volume_str('A:B:rw') ('A', 'B', 'rw') >>> Volume.parse_volume_str('A : B: ro') ('A', 'B', 'ro') >>> Volume.parse_volume_str('A:B') ('A', 'B', 'rw') >>> Volume.parse_volume_str('A:ro') ('A', 'A', 'ro') >>> Volume.parse_volume_str('A') ('A', 'A', 'rw') >>> Volume.parse_volume_str(' ') Traceback (most recent call last): ... Exception: Unparsable volumes string in configuration [ ] >>> Volume.parse_volume_str('A:B:C:D') Traceback (most recent call last): ... Exception: Unparsable volumes string in configuration [A:B:C:D] """ if rawstr.strip() == "": raise Exception(f"Unparsable volumes string in configuration [{rawstr}]") volume_parts = rawstr.split(":") if len(volume_parts) > 3: raise Exception(f"Unparsable volumes string in configuration [{rawstr}]") if len(volume_parts) == 3: source = volume_parts[0] target = volume_parts[1] mode = volume_parts[2] elif len(volume_parts) == 2: if volume_parts[1] not in ("rw", "ro", "default_ro"): source = volume_parts[0] target = volume_parts[1] mode = "rw" else: source = volume_parts[0] target = volume_parts[0] mode = volume_parts[1] elif len(volume_parts) == 1: source = volume_parts[0] target = volume_parts[0] mode = "rw" source = source.strip() target = target.strip() mode = mode.strip() return source, target, mode
def __str__(self): """ >>> str(Volume('A:A:rw', 'docker')) 'A:rw' >>> str(Volume('A:B:rw', 'docker')) 'A:B:rw' >>> str(Volume('A:A:ro', 'singularity')) 'A:ro' >>> str(Volume('A:B:ro', 'singularity')) 'A:B:ro' >>> str(Volume('A:A:rw', 'singularity')) 'A' >>> str(Volume('A:B:rw', 'singularity')) 'A:B' """ if self.source == self.target: path = self.source else: path = f"{self.source}:{self.target}" # TODO remove this, we require quite recent singularity anyway # for a while singularity did not allow to specify the bind type rw # (which is the default). so we omit this default # see https://github.com/hpcng/singularity/pull/5487 if self.container_type == SINGULARITY_CONTAINER_TYPE and self.mode == "rw": return path else: return f"{path}:{self.mode}"
[docs]def preprocess_volumes(volumes_raw_str: str, container_type: str) -> List[str]: """Process Galaxy volume specification string to either Docker or Singularity specification. Galaxy allows the mount try "default_ro" which translates to ro for Docker and ro for Singularity iff no subdirectories are rw (Singularity does not allow ro parent directories with rw subdirectories). Removes volumes that have the same target directory which is not allowed (for docker and singularity). Volumes that are specified later in the volumes_raw_str are favoured which allows admins to overwrite defaults. >>> preprocess_volumes("", DOCKER_CONTAINER_TYPE) [] >>> preprocess_volumes("/a/b", DOCKER_CONTAINER_TYPE) ['/a/b:rw'] >>> preprocess_volumes("/a/b:ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE) ['/a/b:ro', '/a/b/c:rw'] >>> preprocess_volumes("/a/b:/a:ro,/a/b/c:/a/b:rw", DOCKER_CONTAINER_TYPE) ['/a/b:/a:ro', '/a/b/c:/a/b:rw'] >>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE) ['/a/b:ro', '/a/b/c:rw'] >>> preprocess_volumes("/a/b:default_ro,/a/b/c:ro", SINGULARITY_CONTAINER_TYPE) ['/a/b:ro', '/a/b/c:ro'] >>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", SINGULARITY_CONTAINER_TYPE) ['/a/b', '/a/b/c'] >>> preprocess_volumes("/x:/a/b:default_ro,/y:/a/b/c:ro", SINGULARITY_CONTAINER_TYPE) ['/x:/a/b:ro', '/y:/a/b/c:ro'] >>> preprocess_volumes("/x:/a/b:default_ro,/y:/a/b/c:rw", SINGULARITY_CONTAINER_TYPE) ['/x:/a/b', '/y:/a/b/c'] >>> preprocess_volumes("/x:/x,/y:/x", SINGULARITY_CONTAINER_TYPE) ['/y:/x'] """ if not volumes_raw_str: return [] volumes = [Volume(v, container_type) for v in volumes_raw_str.split(",")] rw_paths = [v.target for v in volumes if v.mode == "rw"] for volume in volumes: mode = volume.mode if volume.mode == "default_ro": mode = "ro" if container_type == SINGULARITY_CONTAINER_TYPE: for rw_path in rw_paths: if in_directory(rw_path, volume.target): mode = "rw" volume.mode = mode # remove duplicate targets target_to_volume = {v.target: str(v) for v in volumes} return list(target_to_volume.values())
[docs]class HasDockerLikeVolumes: """Mixin to share functionality related to Docker volume handling. Singularity seems to have a fairly compatible syntax for volume handling. """ def _expand_volume_str(self: ContainerProtocol, value: str) -> str: if not value: return value template = string.Template(value) variables = {} def add_var(name, value): if value: if not value.startswith("$"): value = os.path.abspath(value) variables[name] = value assert self.job_info is not None add_var("working_directory", self.job_info.working_directory) add_var("tmp_directory", self.job_info.tmp_directory) add_var("job_directory", self.job_info.job_directory) add_var("tool_directory", self.job_info.tool_directory) add_var("home_directory", self.job_info.home_directory) add_var("galaxy_root", self.app_info.galaxy_root_dir) add_var("default_file_path", self.app_info.default_file_path) add_var("library_import_dir", self.app_info.library_import_dir) add_var("tool_data_path", self.app_info.tool_data_path) add_var("galaxy_data_manager_data_path", self.app_info.galaxy_data_manager_data_path) add_var("shed_tool_data_path", self.app_info.shed_tool_data_path) if self.job_info.job_directory and self.job_info.job_directory_type == "pulsar": # We have a Pulsar job directory, so everything needed (excluding index # files) should be available in job_directory... defaults = "$job_directory:default_ro" if self.job_info.tool_directory: defaults += ",$tool_directory:default_ro" defaults += ",$job_directory/outputs:rw,$working_directory:rw" else: if self.job_info.tmp_directory is not None: defaults = "$tmp_directory:rw" # If a tool definitely has a temp directory available set it to /tmp in container for compat. # with CWL. This is part of that spec and should make it easier to share containers between CWL # and Galaxy. defaults += ",$tmp_directory:/tmp:rw" else: defaults = "$_GALAXY_JOB_TMP_DIR:rw,$TMPDIR:rw,$TMP:rw,$TEMP:rw" defaults += ",$galaxy_root:default_ro" if self.job_info.tool_directory: defaults += ",$tool_directory:default_ro" if self.job_info.job_directory: defaults += ",$job_directory:default_ro,$job_directory/outputs:rw" if Version(str(self.tool_info.profile)) <= Version("19.09"): defaults += ",$job_directory/configs:rw" if self.job_info.home_directory is not None: defaults += ",$home_directory:rw" if self.app_info.outputs_to_working_directory: # Should need default_file_path (which is of course an estimate given # object stores anyway). defaults += ",$working_directory:rw,$default_file_path:default_ro" else: defaults += ",$working_directory:rw,$default_file_path:rw" if self.app_info.library_import_dir: defaults += ",$library_import_dir:default_ro" if self.app_info.tool_data_path: defaults += ",$tool_data_path:default_ro" if self.app_info.galaxy_data_manager_data_path: defaults += ",$galaxy_data_manager_data_path:default_ro" if self.app_info.shed_tool_data_path: defaults += ",$shed_tool_data_path:default_ro" # Define $defaults that can easily be extended with external library and # index data without deployer worrying about above details. variables["defaults"] = string.Template(defaults).safe_substitute(variables) volumes_str = template.safe_substitute(variables) # Not all tools have a tool_directory - strip this out if supplied by # job_conf. tool_directory_index = volumes_str.find("$tool_directory") if tool_directory_index > 0: end_index = volumes_str.find(",", tool_directory_index) if end_index < 0: end_index = len(volumes_str) volumes_str = volumes_str[0:tool_directory_index] + volumes_str[end_index : len(volumes_str)] return volumes_str
[docs]class DockerContainer(Container, HasDockerLikeVolumes): container_type = DOCKER_CONTAINER_TYPE @property def docker_host_props(self) -> Dict[str, Any]: docker_host_props = dict( docker_cmd=self.prop("cmd", docker_util.DEFAULT_DOCKER_COMMAND), sudo=asbool(self.prop("sudo", docker_util.DEFAULT_SUDO)), sudo_cmd=self.prop("sudo_cmd", docker_util.DEFAULT_SUDO_COMMAND), host=self.prop("host", docker_util.DEFAULT_HOST), ) return docker_host_props @property def connection_configuration(self) -> Dict[str, Any]: return self.docker_host_props
[docs] def build_pull_command(self) -> List[str]: return docker_util.build_pull_command(self.container_id, **self.docker_host_props)
[docs] def containerize_command(self, command: str) -> str: env_directives = [] for pass_through_var in self.tool_info.env_pass_through: env_directives.append(f'"{pass_through_var}=${pass_through_var}"') # Allow destinations to explicitly set environment variables just for # docker container. Better approach is to set for destination and then # pass through only what tool needs however. (See todo in ToolInfo.) for key, value in self.destination_info.items(): if key.startswith("docker_env_"): env = key[len("docker_env_") :] env_directives.append(f'"{env}={value}"') assert self.job_info is not None working_directory = self.job_info.working_directory if not working_directory: raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.") volumes_raw = self._expand_volume_str(self.destination_info.get("docker_volumes", "$defaults")) preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) # TODO: Remove redundant volumes... volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] volumes_from = self.destination_info.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM) docker_host_props = self.docker_host_props cached_image_file = self.__get_cached_image_file() if not cached_image_file: # TODO: Add option to cache it once here and create cached_image_file. cache_command = docker_util.build_docker_cache_command(self.container_id, **docker_host_props) else: cache_command = self.__cache_from_file_command(cached_image_file, docker_host_props) run_command = docker_util.build_docker_run_command( command, self.container_id, volumes=volumes, volumes_from=volumes_from, env_directives=env_directives, working_directory=working_directory, net=self.prop("net", None), # By default, docker instance has networking disabled auto_rm=asbool(self.prop("auto_rm", docker_util.DEFAULT_AUTO_REMOVE)), set_user=self.prop("set_user", docker_util.DEFAULT_SET_USER), run_extra_arguments=self.prop("run_extra_arguments", docker_util.DEFAULT_RUN_EXTRA_ARGUMENTS), guest_ports=self.tool_info.guest_ports, container_name=self.container_name, **docker_host_props, ) kill_command = docker_util.build_docker_simple_command( "kill", container_name=self.container_name, **docker_host_props ) # Suppress standard error below in the kill command because it can cause jobs that otherwise would work # to fail. Likely, in these cases the container has been stopped normally and so cannot be stopped again. # A less hacky approach might be to check if the container is running first before trying to kill. # https://stackoverflow.com/questions/34228864/stop-and-delete-docker-container-if-its-running # Standard error is: # Error response from daemon: Cannot kill container: 2b0b961527574ebc873256b481bbe72e: No such container: 2b0b961527574ebc873256b481bbe72e return f""" _on_exit() {{ {kill_command} &> /dev/null }} {TRAP_KILL_CONTAINER} {cache_command} {run_command}"""
def __cache_from_file_command(self, cached_image_file: str, docker_host_props: Dict[str, Any]) -> str: images_cmd = docker_util.build_docker_images_command(truncate=False, **docker_host_props) load_cmd = docker_util.build_docker_load_command(**docker_host_props) return string.Template(LOAD_CACHED_IMAGE_COMMAND_TEMPLATE).safe_substitute( cached_image_file=cached_image_file, images_cmd=images_cmd, load_cmd=load_cmd ) def __get_cached_image_file(self) -> Optional[str]: container_id = self.container_id cache_directory = os.path.abspath(self.__get_destination_overridable_property("container_image_cache_path")) cache_path = docker_cache_path(cache_directory, container_id) return cache_path if os.path.exists(cache_path) else None def __get_destination_overridable_property(self, name: str) -> Any: prop_name = f"docker_{name}" if prop_name in self.destination_info: return self.destination_info[prop_name] else: return getattr(self.app_info, name)
[docs]def docker_cache_path(cache_directory: str, container_id: str) -> str: file_container_id = container_id.replace("/", "_slash_") cache_file_name = f"docker_{file_container_id}.tar" return os.path.join(cache_directory, cache_file_name)
[docs]class SingularityContainer(Container, HasDockerLikeVolumes): container_type = SINGULARITY_CONTAINER_TYPE
[docs] def get_singularity_target_kwds(self) -> Dict[str, Any]: return dict( singularity_cmd=self.prop("cmd", singularity_util.DEFAULT_SINGULARITY_COMMAND), sudo=asbool(self.prop("sudo", singularity_util.DEFAULT_SUDO)), sudo_cmd=self.prop("sudo_cmd", singularity_util.DEFAULT_SUDO_COMMAND), )
@property def connection_configuration(self) -> Dict[str, Any]: return self.get_singularity_target_kwds()
[docs] def build_mulled_singularity_pull_command( self, cache_directory: str, namespace: str = "biocontainers" ) -> List[str]: return singularity_util.pull_mulled_singularity_command( docker_image_identifier=self.container_id, cache_directory=cache_directory, namespace=namespace, **self.get_singularity_target_kwds(), )
[docs] def build_singularity_pull_command(self, cache_path: str) -> List[str]: return singularity_util.pull_singularity_command( image_identifier=self.container_id, cache_path=cache_path, **self.get_singularity_target_kwds() )
[docs] def containerize_command(self, command: str) -> str: env = [] for pass_through_var in self.tool_info.env_pass_through: env.append((pass_through_var, f"${pass_through_var}")) # Allow destinations to explicitly set environment variables just for # docker container. Better approach is to set for destination and then # pass through only what tool needs however. (See todo in ToolInfo.) for key, value in self.destination_info.items(): if key.startswith("singularity_env_"): real_key = key[len("singularity_env_") :] env.append((real_key, value)) assert self.job_info is not None working_directory = self.job_info.working_directory if not working_directory: raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.") volumes_raw = self._expand_volume_str(self.destination_info.get("singularity_volumes", "$defaults")) preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] run_command = singularity_util.build_singularity_run_command( command, self.container_id, volumes=volumes, env=env, working_directory=working_directory, run_extra_arguments=self.prop("run_extra_arguments", singularity_util.DEFAULT_RUN_EXTRA_ARGUMENTS), guest_ports=self.tool_info.guest_ports, container_name=self.container_name, cleanenv=asbool(self.prop("cleanenv", singularity_util.DEFAULT_CLEANENV)), no_mount=self.prop("no_mount", singularity_util.DEFAULT_NO_MOUNT), **self.get_singularity_target_kwds(), ) return run_command
CONTAINER_CLASSES: Dict[str, Type[Container]] = dict( docker=DockerContainer, singularity=SingularityContainer, )