Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.deps.container_classes
import os
import string
from abc import (
ABCMeta,
abstractmethod,
)
from logging import getLogger
from uuid import uuid4
from galaxy.util import (
asbool,
in_directory,
)
from . import (
docker_util,
singularity_util,
)
from .container_volumes import DockerVolume
from .requirements import (
DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES,
DEFAULT_CONTAINER_SHELL,
)
log = getLogger(__name__)
DOCKER_CONTAINER_TYPE = "docker"
SINGULARITY_CONTAINER_TYPE = "singularity"
TRAP_KILL_CONTAINER = "trap _on_exit EXIT"
LOAD_CACHED_IMAGE_COMMAND_TEMPLATE = r"""
python << EOF
from __future__ import print_function
import json
import re
import subprocess
import tarfile
t = tarfile.TarFile("${cached_image_file}")
meta_str = t.extractfile('repositories').read()
meta = json.loads(meta_str)
tag, tag_value = next(iter(meta.items()))
rev, rev_value = next(iter(tag_value.items()))
cmd = "${images_cmd}"
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
stdo, stde = proc.communicate()
found = False
for line in stdo.split("\n"):
tmp = re.split(r'\s+', line)
if tmp[0] == tag and tmp[1] == rev and tmp[2] == rev_value:
found = True
if not found:
print("Loading image")
cmd = "cat ${cached_image_file} | ${load_cmd}"
subprocess.check_call(cmd, shell=True)
EOF
"""
SOURCE_CONDA_ACTIVATE = """
# Check if container was created by installing conda packages,
# and if so, source scripts to populate environment variables
# that would be set by activating the conda environment.
if [ -d /usr/local/etc/conda/activate.d ]; then
export CONDA_PREFIX=/usr/local
for f in /usr/local/etc/conda/activate.d/*.sh; do
case "$f" in
"/usr/local/etc/conda/activate.d/activate-"*) :;;
*) . "$f" ;;
esac;
done
fi
"""
[docs]class Container(metaclass=ABCMeta):
[docs] def __init__(
self, container_id, app_info, tool_info, destination_info, job_info, container_description, container_name=None
):
self.container_id = container_id
self.app_info = app_info
self.tool_info = tool_info
self.destination_info = destination_info
self.job_info = job_info
self.container_description = container_description
self.container_name = container_name or uuid4().hex
self.container_info = {}
[docs] def prop(self, name, default):
destination_name = f"{self.container_type}_{name}"
return self.destination_info.get(destination_name, default)
@property
def resolve_dependencies(self):
return (
DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES
if not self.container_description
else self.container_description.resolve_dependencies
)
@property
def shell(self):
return DEFAULT_CONTAINER_SHELL if not self.container_description else self.container_description.shell
@property
def source_environment(self):
if self.container_description and not self.container_description.explicit:
return SOURCE_CONDA_ACTIVATE
return ""
[docs] @abstractmethod
def containerize_command(self, command):
"""
Use destination supplied container configuration parameters,
container_id, and command to build a new command that runs
input command in container.
"""
[docs]def preprocess_volumes(volumes_raw_str, container_type):
"""Process Galaxy volume specification string to either Docker or Singularity specification.
Galaxy allows the mount try "default_ro" which translates to ro for Docker and
ro for Singularity iff no subdirectories are rw (Singularity does not allow ro
parent directories with rw subdirectories).
>>> preprocess_volumes(None, DOCKER_CONTAINER_TYPE)
[]
>>> preprocess_volumes("", DOCKER_CONTAINER_TYPE)
[]
>>> preprocess_volumes("/a/b", DOCKER_CONTAINER_TYPE)
['/a/b:rw']
>>> preprocess_volumes("/a/b:ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE)
['/a/b:ro', '/a/b/c:rw']
>>> preprocess_volumes("/a/b:/a:ro,/a/b/c:/a/b:rw", DOCKER_CONTAINER_TYPE)
['/a/b:/a:ro', '/a/b/c:/a/b:rw']
>>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE)
['/a/b:ro', '/a/b/c:rw']
>>> preprocess_volumes("/a/b:default_ro,/a/b/c:ro", SINGULARITY_CONTAINER_TYPE)
['/a/b:ro', '/a/b/c:ro']
>>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", SINGULARITY_CONTAINER_TYPE)
['/a/b', '/a/b/c']
"""
if not volumes_raw_str:
return []
volumes_raw_strs = [v.strip() for v in volumes_raw_str.split(",")]
volumes = []
rw_paths = []
for volume_raw_str in volumes_raw_strs:
volume_parts = volume_raw_str.split(":")
if len(volume_parts) > 3:
raise Exception(f"Unparsable volumes string in configuration [{volumes_raw_str}]")
if len(volume_parts) == 3:
volume_parts = [f"{volume_parts[0]}:{volume_parts[1]}", volume_parts[2]]
if len(volume_parts) == 2 and volume_parts[1] not in ("rw", "ro", "default_ro"):
volume_parts = [f"{volume_parts[0]}:{volume_parts[1]}", "rw"]
if len(volume_parts) == 1:
volume_parts.append("rw")
volumes.append(volume_parts)
if volume_parts[1] == "rw":
rw_paths.append(volume_parts[0])
for volume in volumes:
path = volume[0]
how = volume[1]
if how == "default_ro":
how = "ro"
if container_type == SINGULARITY_CONTAINER_TYPE:
for rw_path in rw_paths:
if in_directory(rw_path, path):
how = "rw"
volume[1] = how
# for a while singularity did not allow to specify the bind type rw
# (which is the default). so we omit this default
# see https://github.com/hpcng/singularity/pull/5487
if container_type == SINGULARITY_CONTAINER_TYPE and volume[1] == "rw":
del volume[1]
return [":".join(v) for v in volumes]
[docs]class HasDockerLikeVolumes:
"""Mixin to share functionality related to Docker volume handling.
Singularity seems to have a fairly compatible syntax for volume handling.
"""
def _expand_volume_str(self, value):
if not value:
return value
template = string.Template(value)
variables = dict()
def add_var(name, value):
if value:
if not value.startswith("$"):
value = os.path.abspath(value)
variables[name] = value
add_var("working_directory", self.job_info.working_directory)
add_var("tmp_directory", self.job_info.tmp_directory)
add_var("job_directory", self.job_info.job_directory)
add_var("tool_directory", self.job_info.tool_directory)
add_var("home_directory", self.job_info.home_directory)
add_var("galaxy_root", self.app_info.galaxy_root_dir)
add_var("default_file_path", self.app_info.default_file_path)
add_var("library_import_dir", self.app_info.library_import_dir)
add_var("tool_data_path", self.app_info.tool_data_path)
add_var("shed_tool_data_path", self.app_info.shed_tool_data_path)
if self.job_info.job_directory and self.job_info.job_directory_type == "pulsar":
# We have a Pulsar job directory, so everything needed (excluding index
# files) should be available in job_directory...
defaults = "$job_directory:default_ro"
if self.job_info.tool_directory:
defaults += ",$tool_directory:default_ro"
defaults += ",$job_directory/outputs:rw,$working_directory:rw"
else:
defaults = "$galaxy_root:default_ro"
if self.job_info.tool_directory:
defaults += ",$tool_directory:default_ro"
if self.job_info.job_directory:
defaults += ",$job_directory:default_ro,$job_directory/outputs:rw"
if self.tool_info.profile <= 19.09:
defaults += ",$job_directory/configs:rw"
if self.job_info.tmp_directory is not None:
defaults += ",$tmp_directory:rw"
# If a tool definitely has a temp directory available set it to /tmp in container for compat.
# with CWL. This is part of that spec and should make it easier to share containers between CWL
# and Galaxy.
defaults += ",$tmp_directory:/tmp:rw"
else:
defaults += ",$_GALAXY_JOB_TMP_DIR:rw,$TMPDIR:rw,$TMP:rw,$TEMP:rw"
if self.job_info.home_directory is not None:
defaults += ",$home_directory:rw"
if self.app_info.outputs_to_working_directory:
# Should need default_file_path (which is of course an estimate given
# object stores anyway).
defaults += ",$working_directory:rw,$default_file_path:default_ro"
else:
defaults += ",$working_directory:rw,$default_file_path:rw"
if self.app_info.library_import_dir:
defaults += ",$library_import_dir:default_ro"
if self.app_info.tool_data_path:
defaults += ",$tool_data_path:default_ro"
if self.app_info.shed_tool_data_path:
defaults += ",$shed_tool_data_path:default_ro"
# Define $defaults that can easily be extended with external library and
# index data without deployer worrying about above details.
variables["defaults"] = string.Template(defaults).safe_substitute(variables)
volumes_str = template.safe_substitute(variables)
# Not all tools have a tool_directory - strip this out if supplied by
# job_conf.
tool_directory_index = volumes_str.find("$tool_directory")
if tool_directory_index > 0:
end_index = volumes_str.find(",", tool_directory_index)
if end_index < 0:
end_index = len(volumes_str)
volumes_str = volumes_str[0:tool_directory_index] + volumes_str[end_index : len(volumes_str)]
return volumes_str
[docs]class DockerContainer(Container, HasDockerLikeVolumes):
container_type = DOCKER_CONTAINER_TYPE
@property
def docker_host_props(self):
docker_host_props = dict(
docker_cmd=self.prop("cmd", docker_util.DEFAULT_DOCKER_COMMAND),
sudo=asbool(self.prop("sudo", docker_util.DEFAULT_SUDO)),
sudo_cmd=self.prop("sudo_cmd", docker_util.DEFAULT_SUDO_COMMAND),
host=self.prop("host", docker_util.DEFAULT_HOST),
)
return docker_host_props
@property
def connection_configuration(self):
return self.docker_host_props
[docs] def build_pull_command(self):
return docker_util.build_pull_command(self.container_id, **self.docker_host_props)
[docs] def containerize_command(self, command):
env_directives = []
for pass_through_var in self.tool_info.env_pass_through:
env_directives.append(f'"{pass_through_var}=${pass_through_var}"')
# Allow destinations to explicitly set environment variables just for
# docker container. Better approach is to set for destination and then
# pass through only what tool needs however. (See todo in ToolInfo.)
for key, value in self.destination_info.items():
if key.startswith("docker_env_"):
env = key[len("docker_env_") :]
env_directives.append(f'"{env}={value}"')
working_directory = self.job_info.working_directory
if not working_directory:
raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.")
volumes_raw = self._expand_volume_str(self.destination_info.get("docker_volumes", "$defaults"))
preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type)
# TODO: Remove redundant volumes...
volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list]
volumes_from = self.destination_info.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM)
docker_host_props = self.docker_host_props
cached_image_file = self.__get_cached_image_file()
if not cached_image_file:
# TODO: Add option to cache it once here and create cached_image_file.
cache_command = docker_util.build_docker_cache_command(self.container_id, **docker_host_props)
else:
cache_command = self.__cache_from_file_command(cached_image_file, docker_host_props)
run_command = docker_util.build_docker_run_command(
command,
self.container_id,
volumes=volumes,
volumes_from=volumes_from,
env_directives=env_directives,
working_directory=working_directory,
net=self.prop("net", None), # By default, docker instance has networking disabled
auto_rm=asbool(self.prop("auto_rm", docker_util.DEFAULT_AUTO_REMOVE)),
set_user=self.prop("set_user", docker_util.DEFAULT_SET_USER),
run_extra_arguments=self.prop("run_extra_arguments", docker_util.DEFAULT_RUN_EXTRA_ARGUMENTS),
guest_ports=self.tool_info.guest_ports,
container_name=self.container_name,
**docker_host_props,
)
kill_command = docker_util.build_docker_simple_command(
"kill", container_name=self.container_name, **docker_host_props
)
# Suppress standard error below in the kill command because it can cause jobs that otherwise would work
# to fail. Likely, in these cases the container has been stopped normally and so cannot be stopped again.
# A less hacky approach might be to check if the container is running first before trying to kill.
# https://stackoverflow.com/questions/34228864/stop-and-delete-docker-container-if-its-running
# Standard error is:
# Error response from daemon: Cannot kill container: 2b0b961527574ebc873256b481bbe72e: No such container: 2b0b961527574ebc873256b481bbe72e
return f"""
_on_exit() {{
{kill_command} &> /dev/null
}}
{TRAP_KILL_CONTAINER}
{cache_command}
{run_command}"""
def __cache_from_file_command(self, cached_image_file, docker_host_props):
images_cmd = docker_util.build_docker_images_command(truncate=False, **docker_host_props)
load_cmd = docker_util.build_docker_load_command(**docker_host_props)
return string.Template(LOAD_CACHED_IMAGE_COMMAND_TEMPLATE).safe_substitute(
cached_image_file=cached_image_file, images_cmd=images_cmd, load_cmd=load_cmd
)
def __get_cached_image_file(self):
container_id = self.container_id
cache_directory = os.path.abspath(self.__get_destination_overridable_property("container_image_cache_path"))
cache_path = docker_cache_path(cache_directory, container_id)
return cache_path if os.path.exists(cache_path) else None
def __get_destination_overridable_property(self, name):
prop_name = f"docker_{name}"
if prop_name in self.destination_info:
return self.destination_info[prop_name]
else:
return getattr(self.app_info, name)
[docs]def docker_cache_path(cache_directory, container_id):
file_container_id = container_id.replace("/", "_slash_")
cache_file_name = f"docker_{file_container_id}.tar"
return os.path.join(cache_directory, cache_file_name)
[docs]class SingularityContainer(Container, HasDockerLikeVolumes):
container_type = SINGULARITY_CONTAINER_TYPE
[docs] def get_singularity_target_kwds(self):
return dict(
singularity_cmd=self.prop("cmd", singularity_util.DEFAULT_SINGULARITY_COMMAND),
sudo=asbool(self.prop("sudo", singularity_util.DEFAULT_SUDO)),
sudo_cmd=self.prop("sudo_cmd", singularity_util.DEFAULT_SUDO_COMMAND),
)
@property
def connection_configuration(self):
return self.get_singularity_target_kwds()
[docs] def build_mulled_singularity_pull_command(self, cache_directory, namespace="biocontainers"):
return singularity_util.pull_mulled_singularity_command(
docker_image_identifier=self.container_id,
cache_directory=cache_directory,
namespace=namespace,
**self.get_singularity_target_kwds(),
)
[docs] def build_singularity_pull_command(self, cache_path):
return singularity_util.pull_singularity_command(
image_identifier=self.container_id, cache_path=cache_path, **self.get_singularity_target_kwds()
)
[docs] def containerize_command(self, command):
env = []
for pass_through_var in self.tool_info.env_pass_through:
env.append((pass_through_var, f"${pass_through_var}"))
# Allow destinations to explicitly set environment variables just for
# docker container. Better approach is to set for destination and then
# pass through only what tool needs however. (See todo in ToolInfo.)
for key, value in self.destination_info.items():
if key.startswith("singularity_env_"):
real_key = key[len("singularity_env_") :]
env.append((real_key, value))
working_directory = self.job_info.working_directory
if not working_directory:
raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.")
volumes_raw = self._expand_volume_str(self.destination_info.get("singularity_volumes", "$defaults"))
preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type)
volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list]
run_command = singularity_util.build_singularity_run_command(
command,
self.container_id,
volumes=volumes,
env=env,
working_directory=working_directory,
run_extra_arguments=self.prop("run_extra_arguments", singularity_util.DEFAULT_RUN_EXTRA_ARGUMENTS),
guest_ports=self.tool_info.guest_ports,
container_name=self.container_name,
cleanenv=asbool(self.prop("cleanenv", singularity_util.DEFAULT_CLEANENV)),
**self.get_singularity_target_kwds(),
)
return run_command
CONTAINER_CLASSES = dict(
docker=DockerContainer,
singularity=SingularityContainer,
)