Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.deps.containers

import logging
import os
import string
from abc import (
    ABCMeta,
    abstractmethod
)

import six

from galaxy.containers.docker_model import DockerVolume
from galaxy.util import (
    asbool,
    in_directory,
    plugin_config
)
from . import (
    docker_util,
    singularity_util
)
from .container_resolvers.explicit import ExplicitContainerResolver
from .container_resolvers.mulled import (
    BuildMulledDockerContainerResolver,
    BuildMulledSingularityContainerResolver,
    CachedMulledDockerContainerResolver,
    CachedMulledSingularityContainerResolver,
    MulledDockerContainerResolver,
)
from .requirements import (
    ContainerDescription,
    DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES,
    DEFAULT_CONTAINER_SHELL
)

log = logging.getLogger(__name__)

DOCKER_CONTAINER_TYPE = "docker"
SINGULARITY_CONTAINER_TYPE = "singularity"
DEFAULT_CONTAINER_TYPE = DOCKER_CONTAINER_TYPE
ALL_CONTAINER_TYPES = [DOCKER_CONTAINER_TYPE, SINGULARITY_CONTAINER_TYPE]

LOAD_CACHED_IMAGE_COMMAND_TEMPLATE = '''
python << EOF
from __future__ import print_function

import json
import re
import subprocess
import tarfile

t = tarfile.TarFile("${cached_image_file}")
meta_str = t.extractfile('repositories').read()
meta = json.loads(meta_str)
tag, tag_value = meta.items()[0]
rev, rev_value = tag_value.items()[0]
cmd = "${images_cmd}"
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
stdo, stde = proc.communicate()
found = False
for line in stdo.split("\\n"):
    tmp = re.split(r'\s+', line)
    if tmp[0] == tag and tmp[1] == rev and tmp[2] == rev_value:
        found = True
if not found:
    print("Loading image")
    cmd = "cat ${cached_image_file} | ${load_cmd}"
    subprocess.check_call(cmd, shell=True)
EOF
'''


[docs]class ContainerFinder(object):
[docs] def __init__(self, app_info): self.app_info = app_info self.container_registry = ContainerRegistry(app_info)
def __enabled_container_types(self, destination_info): return [t for t in ALL_CONTAINER_TYPES if self.__container_type_enabled(t, destination_info)]
[docs] def find_best_container_description(self, enabled_container_types, tool_info): """Regardless of destination properties - find best container for tool. Given container types and container.ToolInfo description of the tool.""" container_description = self.container_registry.find_best_container_description(enabled_container_types, tool_info) return container_description
[docs] def find_container(self, tool_info, destination_info, job_info): enabled_container_types = self.__enabled_container_types(destination_info) # Short-cut everything else and just skip checks if no container type is enabled. if not enabled_container_types: return NULL_CONTAINER def __destination_container(container_description=None, container_id=None, container_type=None): if container_description: container_id = container_description.identifier container_type = container_description.type container = self.__destination_container( container_id, container_type, tool_info, destination_info, job_info, container_description, ) return container if "container_override" in destination_info: container_description = ContainerDescription.from_dict(destination_info["container_override"][0]) if container_description: container = __destination_container(container_description) if container: return container # If destination forcing Galaxy to use a particular container do it, # this is likely kind of a corner case. For instance if deployers # do not trust the containers annotated in tools. for container_type in CONTAINER_CLASSES.keys(): container_id = self.__overridden_container_id(container_type, destination_info) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container # Otherwise lets see if we can find container for the tool. container_description = self.find_best_container_description(enabled_container_types, tool_info) container = __destination_container(container_description) if container: return container # If we still don't have a container, check to see if any container # types define a default container id and use that. if "container" in destination_info: container_description = ContainerDescription.from_dict(destination_info["container"][0]) if container_description: container = __destination_container(container_description) if container: return container for container_type in CONTAINER_CLASSES.keys(): container_id = self.__default_container_id(container_type, destination_info) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container return NULL_CONTAINER
def __overridden_container_id(self, container_type, destination_info): if not self.__container_type_enabled(container_type, destination_info): return None if "%s_container_id_override" % container_type in destination_info: return destination_info.get("%s_container_id_override" % container_type) if "%s_image_override" % container_type in destination_info: return self.__build_container_id_from_parts(container_type, destination_info, mode="override") def __build_container_id_from_parts(self, container_type, destination_info, mode): repo = "" owner = "" repo_key = "%s_repo_%s" % (container_type, mode) owner_key = "%s_owner_%s" % (container_type, mode) if repo_key in destination_info: repo = destination_info[repo_key] + "/" if owner_key in destination_info: owner = destination_info[owner_key] + "/" cont_id = repo + owner + destination_info["%s_image_%s" % (container_type, mode)] tag_key = "%s_tag_%s" % (container_type, mode) if tag_key in destination_info: cont_id += ":" + destination_info[tag_key] return cont_id def __default_container_id(self, container_type, destination_info): if not self.__container_type_enabled(container_type, destination_info): return None key = "%s_default_container_id" % container_type # Also allow docker_image... if key not in destination_info: key = "%s_image" % container_type if key in destination_info: return destination_info.get(key) elif "%s_image_default" in destination_info: return self.__build_container_id_from_parts(container_type, destination_info, mode="default") return None def __destination_container(self, container_id, container_type, tool_info, destination_info, job_info, container_description=None): # TODO: ensure destination_info is dict-like if not self.__container_type_enabled(container_type, destination_info): return NULL_CONTAINER # TODO: Right now this assumes all containers available when a # container type is - there should be more thought put into this. # Checking which are availalbe - settings policies for what can be # auto-fetched, etc.... return CONTAINER_CLASSES[container_type](container_id, self.app_info, tool_info, destination_info, job_info, container_description) def __container_type_enabled(self, container_type, destination_info): return asbool(destination_info.get("%s_enabled" % container_type, False))
[docs]class NullContainerFinder(object):
[docs] def find_container(self, tool_info, destination_info, job_info): return []
[docs]class ContainerRegistry(object): """Loop through enabled ContainerResolver plugins and find first match."""
[docs] def __init__(self, app_info): self.resolver_classes = self.__resolvers_dict() self.enable_beta_mulled_containers = app_info.enable_beta_mulled_containers self.app_info = app_info self.container_resolvers = self.__build_container_resolvers(app_info)
def __build_container_resolvers(self, app_info): conf_file = getattr(app_info, 'containers_resolvers_config_file', None) if not conf_file: return self.__default_containers_resolvers() if not os.path.exists(conf_file): log.debug("Unable to find config file '%s'", conf_file) return self.__default_containers_resolvers() plugin_source = plugin_config.plugin_source_from_path(conf_file) return self.__parse_resolver_conf_xml(plugin_source) def __parse_resolver_conf_xml(self, plugin_source): extra_kwds = { 'app_info': self.app_info } return plugin_config.load_plugins(self.resolver_classes, plugin_source, extra_kwds) def __default_containers_resolvers(self): default_resolvers = [ ExplicitContainerResolver(self.app_info), ] if self.enable_beta_mulled_containers: default_resolvers.extend([ CachedMulledDockerContainerResolver(self.app_info, namespace="biocontainers"), MulledDockerContainerResolver(self.app_info, namespace="biocontainers"), BuildMulledDockerContainerResolver(self.app_info), CachedMulledSingularityContainerResolver(self.app_info), BuildMulledSingularityContainerResolver(self.app_info), ]) return default_resolvers def __resolvers_dict(self): import galaxy.tools.deps.container_resolvers return plugin_config.plugins_dict(galaxy.tools.deps.container_resolvers, 'resolver_type')
[docs] def find_best_container_description(self, enabled_container_types, tool_info): """Yield best container description of supplied types matching tool info.""" for container_resolver in self.container_resolvers: if hasattr(container_resolver, "container_type"): if container_resolver.container_type not in enabled_container_types: continue container_description = container_resolver.resolve(enabled_container_types, tool_info) log.info("Checking with container resolver [%s] found description [%s]" % (container_resolver, container_description)) if container_description: assert container_description.type in enabled_container_types return container_description return None
[docs]class AppInfo(object):
[docs] def __init__( self, galaxy_root_dir=None, default_file_path=None, outputs_to_working_directory=False, container_image_cache_path=None, library_import_dir=None, enable_beta_mulled_containers=False, containers_resolvers_config_file=None, involucro_path=None, involucro_auto_init=True, ): self.galaxy_root_dir = galaxy_root_dir self.default_file_path = default_file_path # TODO: Vary default value for docker_volumes based on this... self.outputs_to_working_directory = outputs_to_working_directory self.container_image_cache_path = container_image_cache_path self.library_import_dir = library_import_dir self.enable_beta_mulled_containers = enable_beta_mulled_containers self.containers_resolvers_config_file = containers_resolvers_config_file self.involucro_path = involucro_path self.involucro_auto_init = involucro_auto_init
[docs]class ToolInfo(object): # TODO: Introduce tool XML syntax to annotate the optional environment # variables they can consume (e.g. JVM options, license keys, etc..) # and add these to env_path_through
[docs] def __init__(self, container_descriptions=[], requirements=[], requires_galaxy_python_environment=False, env_pass_through=["GALAXY_SLOTS"]): self.container_descriptions = container_descriptions self.requirements = requirements self.requires_galaxy_python_environment = requires_galaxy_python_environment self.env_pass_through = env_pass_through
[docs]class JobInfo(object):
[docs] def __init__( self, working_directory, tool_directory, job_directory, tmp_directory, job_directory_type ): self.working_directory = working_directory self.job_directory = job_directory # Tool files may be remote staged - so this is unintuitively a property # of the job not of the tool. self.tool_directory = tool_directory self.tmp_directory = tmp_directory self.job_directory_type = job_directory_type # "galaxy" or "pulsar"
[docs]@six.add_metaclass(ABCMeta) class Container(object):
[docs] def __init__(self, container_id, app_info, tool_info, destination_info, job_info, container_description): self.container_id = container_id self.app_info = app_info self.tool_info = tool_info self.destination_info = destination_info self.job_info = job_info self.container_description = container_description
@property def resolve_dependencies(self): return DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES if not self.container_description else self.container_description.resolve_dependencies @property def shell(self): return DEFAULT_CONTAINER_SHELL if not self.container_description else self.container_description.shell
[docs] @abstractmethod def containerize_command(self, command): """ Use destination supplied container configuration parameters, container_id, and command to build a new command that runs input command in container. """
[docs]def preprocess_volumes(volumes_raw_str, container_type): """Process Galaxy volume specification string to either Docker or Singularity specification. Galaxy allows the mount try "default_ro" which translates to ro for Docker and ro for Singularity iff no subdirectories are rw (Singularity does not allow ro parent directories with rw subdirectories). >>> preprocess_volumes("/a/b", DOCKER_CONTAINER_TYPE) ['/a/b:rw'] >>> preprocess_volumes("/a/b:ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE) ['/a/b:ro', '/a/b/c:rw'] >>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE) ['/a/b:ro', '/a/b/c:rw'] >>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", SINGULARITY_CONTAINER_TYPE) ['/a/b:rw', '/a/b/c:rw'] """ volumes_raw_strs = [v.strip() for v in volumes_raw_str.split(",")] volumes = [] rw_paths = [] for volume_raw_str in volumes_raw_strs: volume_parts = volume_raw_str.split(":") if len(volume_parts) > 2: raise Exception("Unparsable volumes string in configuration [%s]" % volumes_raw_str) if len(volume_parts) == 1: volume_parts.append("rw") volumes.append(volume_parts) if volume_parts[1] == "rw": rw_paths.append(volume_parts[0]) for volume in volumes: path = volume[0] how = volume[1] if how == "default_ro": how = "ro" if container_type == SINGULARITY_CONTAINER_TYPE: for rw_path in rw_paths: if in_directory(rw_path, path): how = "rw" volume[1] = how return [":".join(v) for v in volumes]
[docs]class HasDockerLikeVolumes(object): """Mixin to share functionality related to Docker volume handling. Singularity seems to have a fairly compatible syntax for volume handling. """ def _expand_volume_str(self, value): if not value: return value template = string.Template(value) variables = dict() def add_var(name, value): if value: if not value.startswith("$"): value = os.path.abspath(value) variables[name] = value add_var("working_directory", self.job_info.working_directory) add_var("tmp_directory", self.job_info.tmp_directory) add_var("job_directory", self.job_info.job_directory) add_var("tool_directory", self.job_info.tool_directory) add_var("galaxy_root", self.app_info.galaxy_root_dir) add_var("default_file_path", self.app_info.default_file_path) add_var("library_import_dir", self.app_info.library_import_dir) if self.job_info.job_directory and self.job_info.job_directory_type == "pulsar": # We have a Pulsar job directory, so everything needed (excluding index # files) should be available in job_directory... defaults = "$job_directory:default_ro,$tool_directory:default_ro,$job_directory/outputs:rw,$working_directory:rw" else: defaults = "$galaxy_root:default_ro,$tool_directory:default_ro" if self.job_info.job_directory: defaults += ",$job_directory:default_ro" if self.job_info.tmp_directory is not None: defaults += ",$tmp_directory:rw" if self.app_info.outputs_to_working_directory: # Should need default_file_path (which is of course an estimate given # object stores anyway). defaults += ",$working_directory:rw,$default_file_path:default_ro" else: defaults += ",$working_directory:rw,$default_file_path:rw" if self.app_info.library_import_dir: defaults += ",$library_import_dir:default_ro" # Define $defaults that can easily be extended with external library and # index data without deployer worrying about above details. variables["defaults"] = string.Template(defaults).safe_substitute(variables) return template.safe_substitute(variables)
[docs]class DockerContainer(Container, HasDockerLikeVolumes): container_type = DOCKER_CONTAINER_TYPE
[docs] def containerize_command(self, command): def prop(name, default): destination_name = "docker_%s" % name return self.destination_info.get(destination_name, default) env_directives = [] for pass_through_var in self.tool_info.env_pass_through: env_directives.append('"%s=$%s"' % (pass_through_var, pass_through_var)) # Allow destinations to explicitly set environment variables just for # docker container. Better approach is to set for destination and then # pass through only what tool needs however. (See todo in ToolInfo.) for key, value in six.iteritems(self.destination_info): if key.startswith("docker_env_"): env = key[len("docker_env_"):] env_directives.append('"%s=%s"' % (env, value)) working_directory = self.job_info.working_directory if not working_directory: raise Exception("Cannot containerize command [%s] without defined working directory." % working_directory) volumes_raw = self._expand_volume_str(self.destination_info.get("docker_volumes", "$defaults")) preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) # TODO: Remove redundant volumes... volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] # If a tool definitely has a temp directory available set it to /tmp in container for compat. # with CWL. This is part of that spec and should make it easier to share containers between CWL # and Galaxy. if self.job_info.tmp_directory is not None: volumes.append(DockerVolume.from_str("%s:/tmp:rw" % self.job_info.tmp_directory)) volumes_from = self.destination_info.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM) docker_host_props = dict( docker_cmd=prop("cmd", docker_util.DEFAULT_DOCKER_COMMAND), sudo=asbool(prop("sudo", docker_util.DEFAULT_SUDO)), sudo_cmd=prop("sudo_cmd", docker_util.DEFAULT_SUDO_COMMAND), host=prop("host", docker_util.DEFAULT_HOST), ) cached_image_file = self.__get_cached_image_file() if not cached_image_file: # TODO: Add option to cache it once here and create cached_image_file. cache_command = docker_util.build_docker_cache_command(self.container_id, **docker_host_props) else: cache_command = self.__cache_from_file_command(cached_image_file, docker_host_props) run_command = docker_util.build_docker_run_command( command, self.container_id, volumes=volumes, volumes_from=volumes_from, env_directives=env_directives, working_directory=working_directory, net=prop("net", "none"), # By default, docker instance has networking disabled auto_rm=asbool(prop("auto_rm", docker_util.DEFAULT_AUTO_REMOVE)), set_user=prop("set_user", docker_util.DEFAULT_SET_USER), run_extra_arguments=prop("run_extra_arguments", docker_util.DEFAULT_RUN_EXTRA_ARGUMENTS), **docker_host_props ) return "%s\n%s" % (cache_command, run_command)
def __cache_from_file_command(self, cached_image_file, docker_host_props): images_cmd = docker_util.build_docker_images_command(truncate=False, **docker_host_props) load_cmd = docker_util.build_docker_load_command(**docker_host_props) return string.Template(LOAD_CACHED_IMAGE_COMMAND_TEMPLATE).safe_substitute( cached_image_file=cached_image_file, images_cmd=images_cmd, load_cmd=load_cmd ) def __get_cached_image_file(self): container_id = self.container_id cache_directory = os.path.abspath(self.__get_destination_overridable_property("container_image_cache_path")) cache_path = docker_cache_path(cache_directory, container_id) return cache_path if os.path.exists(cache_path) else None def __get_destination_overridable_property(self, name): prop_name = "docker_%s" % name if prop_name in self.destination_info: return self.destination_info[prop_name] else: return getattr(self.app_info, name)
[docs]def docker_cache_path(cache_directory, container_id): file_container_id = container_id.replace("/", "_slash_") cache_file_name = "docker_%s.tar" % file_container_id return os.path.join(cache_directory, cache_file_name)
[docs]class SingularityContainer(Container, HasDockerLikeVolumes): container_type = SINGULARITY_CONTAINER_TYPE
[docs] def containerize_command(self, command): def prop(name, default): destination_name = "singularity_%s" % name return self.destination_info.get(destination_name, default) env = [] for pass_through_var in self.tool_info.env_pass_through: env.append((pass_through_var, "$%s" % pass_through_var)) # Allow destinations to explicitly set environment variables just for # docker container. Better approach is to set for destination and then # pass through only what tool needs however. (See todo in ToolInfo.) for key, value in six.iteritems(self.destination_info): if key.startswith("singularity_env_"): real_key = key[len("singularity_env_"):] env.append((real_key, value)) working_directory = self.job_info.working_directory if not working_directory: raise Exception("Cannot containerize command [%s] without defined working directory." % working_directory) volumes_raw = self._expand_volume_str(self.destination_info.get("singularity_volumes", "$defaults")) preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] singularity_target_kwds = dict( singularity_cmd=prop("cmd", singularity_util.DEFAULT_SINGULARITY_COMMAND), sudo=asbool(prop("sudo", singularity_util.DEFAULT_SUDO)), sudo_cmd=prop("sudo_cmd", singularity_util.DEFAULT_SUDO_COMMAND), ) run_command = singularity_util.build_singularity_run_command( command, self.container_id, volumes=volumes, env=env, working_directory=working_directory, run_extra_arguments=prop("run_extra_arguments", singularity_util.DEFAULT_RUN_EXTRA_ARGUMENTS), **singularity_target_kwds ) return run_command
CONTAINER_CLASSES = dict( docker=DockerContainer, singularity=SingularityContainer, )
[docs]class NullContainer(object):
[docs] def __init__(self): pass
def __bool__(self): return False __nonzero__ = __bool__
NULL_CONTAINER = NullContainer()