Source code for galaxy.tool_util.deps.container_resolvers.mulled

"""This module describes the :class:`MulledContainerResolver` ContainerResolver plugin."""

import logging
import os
import subprocess
from abc import (
    ABCMeta,
    abstractmethod,
)
from typing import (
    Callable,
    Container as TypingContainer,
    Dict,
    List,
    NamedTuple,
    Optional,
    Type,
    TYPE_CHECKING,
    Union,
)

from requests import Session
from typing_extensions import Literal

from galaxy.util import (
    safe_makedirs,
    string_as_bool,
    unicodify,
    which,
)
from galaxy.util.commands import shell
from . import (
    ContainerResolver,
    ResolutionCache,
)
from ..conda_util import CondaTarget
from ..container_classes import (
    Container,
    CONTAINER_CLASSES,
    DockerContainer,
    SingularityContainer,
)
from ..docker_util import build_docker_images_command
from ..mulled.mulled_build import (
    DEFAULT_CHANNELS,
    ensure_installed,
    InvolucroContext,
    mull_targets,
)
from ..mulled.mulled_build_tool import requirements_to_mulled_targets
from ..mulled.util import (
    default_mulled_conda_channels_from_env,
    mulled_tags_for,
    split_tag,
    v1_image_name,
    v2_image_name,
    version_sorted,
)
from ..requirements import (
    ContainerDescription,
    DEFAULT_CONTAINER_SHELL,
)

if TYPE_CHECKING:
    from ..dependencies import (
        AppInfo,
        ToolInfo,
    )

log = logging.getLogger(__name__)


class CachedMulledImageSingleTarget(NamedTuple):
    package_name: str
    version: Optional[str]
    build: Optional[str]
    image_identifier: str


class CachedV1MulledImageMultiTarget(NamedTuple):
    hash: str
    build: Optional[str]
    image_identifier: str


class CachedV2MulledImageMultiTarget(NamedTuple):
    image_name: str
    version_hash: Optional[str]
    build: Optional[str]
    image_identifier: str

    @property
    def package_hash(self) -> str:
        # Make this work for Singularity file name or fully qualified Docker repository
        # image names.
        image_name = self.image_name
        if "/" not in image_name:
            return image_name
        else:
            return image_name.rsplit("/")[-1]


CachedTarget = Union[CachedMulledImageSingleTarget, CachedV1MulledImageMultiTarget, CachedV2MulledImageMultiTarget]


class CacheDirectory(metaclass=ABCMeta):
    def __init__(self, path: str, hash_func: Literal["v1", "v2"] = "v2") -> None:
        self.path = path
        self.hash_func = hash_func

    def _list_cached_mulled_images_from_path(self) -> List[CachedTarget]:
        contents = os.listdir(self.path)
        sorted_images = version_sorted(contents)
        raw_images = map(lambda name: identifier_to_cached_target(name, self.hash_func), sorted_images)
        return list(i for i in raw_images if i is not None)

    @abstractmethod
    def list_cached_mulled_images_from_path(self) -> List[CachedTarget]:
        """Generate a list of cached, mulled images in the cache."""

    @abstractmethod
    def invalidate_cache(self) -> None:
        """Invalidate the cache."""


class UncachedCacheDirectory(CacheDirectory):
    cacher_type = "uncached"

    def list_cached_mulled_images_from_path(
        self,
    ) -> List[CachedTarget]:
        return self._list_cached_mulled_images_from_path()

    def invalidate_cache(self) -> None:
        pass


class DirMtimeCacheDirectory(CacheDirectory):
    cacher_type = "dir_mtime"

    def __init__(self, path: str, **kwargs):
        super().__init__(path, **kwargs)
        self.invalidate_cache()

    def __get_mtime(self) -> float:
        return os.stat(self.path).st_mtime

    def __cache(self) -> None:
        self.__contents = self._list_cached_mulled_images_from_path()
        self.__mtime = self.__get_mtime()
        log.debug(f"Cached images in path {self.path} at directory mtime {self.__mtime}")

    def list_cached_mulled_images_from_path(
        self,
    ) -> List[CachedTarget]:
        mtime = self.__get_mtime()
        if mtime != self.__mtime:
            if mtime < self.__mtime:
                log.warning(
                    f"Modification time '{mtime}' of cache directory '{self.path}' is older than previous "
                    f"modification time '{self.__mtime}'! Cache directory will be recached"
                )
            self.__cache()
        return self.__contents

    def invalidate_cache(self) -> None:
        self.__mtime = -1.0
        self.__contents = []


def get_cache_directory_cacher(cacher_type: Optional[str]) -> Type[CacheDirectory]:
    # these can become a separate module and use plugin_config if we need more
    cachers: Dict[str, Type[CacheDirectory]] = {
        UncachedCacheDirectory.cacher_type: UncachedCacheDirectory,
        DirMtimeCacheDirectory.cacher_type: DirMtimeCacheDirectory,
    }
    cacher_type = cacher_type or "uncached"
    return cachers[cacher_type]


def list_docker_cached_mulled_images(
    namespace: Optional[str] = None,
    hash_func: Literal["v1", "v2"] = "v2",
    resolution_cache: Optional[ResolutionCache] = None,
) -> List[CachedTarget]:
    cache_key = "galaxy.tool_util.deps.container_resolvers.mulled:cached_images"
    if resolution_cache is not None and cache_key in resolution_cache:
        images_and_versions = resolution_cache.get(cache_key)
    else:
        command = build_docker_images_command(truncate=True, sudo=False, to_str=False)
        try:
            images_and_versions = unicodify(subprocess.check_output(command)).strip().splitlines()
        except subprocess.CalledProcessError:
            log.info("Call to `docker images` failed, configured container resolution may be broken")
            return []
        images_and_versions = [":".join(line.split()[0:2]) for line in images_and_versions[1:]]
        if resolution_cache is not None:
            resolution_cache[cache_key] = images_and_versions

    def output_line_to_image(line: str) -> Optional[CachedTarget]:
        image = identifier_to_cached_target(line, hash_func, namespace=namespace)
        return image

    name_filter = get_filter(namespace)
    sorted_images = version_sorted([_ for _ in filter(name_filter, images_and_versions)])
    raw_images = (output_line_to_image(_) for _ in sorted_images)
    return [i for i in raw_images if i is not None]


def identifier_to_cached_target(
    identifier: str, hash_func: Literal["v1", "v2"], namespace: Optional[str] = None
) -> Optional[CachedTarget]:
    if ":" in identifier:
        image_name, version = identifier.rsplit(":", 1)
    else:
        image_name = identifier
        version = None

    if not version or version == "latest":
        version = None

    image: Optional[CachedTarget] = None
    prefix = ""
    if namespace is not None:
        prefix = f"quay.io/{namespace}/"
    if image_name.startswith(f"{prefix}mulled-v1-"):
        if hash_func == "v2":
            return None

        hash = image_name
        build = None
        if version and version.isdigit():
            build = version
        image = CachedV1MulledImageMultiTarget(hash, build, identifier)
    elif image_name.startswith(f"{prefix}mulled-v2-"):
        if hash_func == "v1":
            return None

        version_hash = None
        build = None
        if version:
            if "-" in version:
                version_hash, build = version.rsplit("-", 1)
            elif version.isdigit():
                version_hash, build = None, version
            else:
                log.debug(f"Unparsable mulled image tag encountered [{version}]")

        image = CachedV2MulledImageMultiTarget(image_name, version_hash, build, identifier)
    else:
        build = None
        if version and "--" in version:
            version, build = split_tag(version)
        if prefix and image_name.startswith(prefix):
            image_name = image_name[len(prefix) :]
        image = CachedMulledImageSingleTarget(image_name, version, build, identifier)
    return image


def get_filter(namespace: Optional[str]) -> Callable[[str], bool]:
    prefix = "quay.io/" if namespace is None else f"quay.io/{namespace}"
    return lambda name: name.startswith(prefix) and name.count("/") == 2


def find_best_matching_cached_image(
    targets: List[CondaTarget], cached_images: List[CachedTarget], hash_func: Literal["v1", "v2"]
) -> Optional[CachedTarget]:
    if len(targets) == 0:
        return None

    image: Optional[CachedTarget] = None
    cached_image: CachedTarget
    if len(targets) == 1:
        target = targets[0]
        for cached_image in cached_images:
            if not isinstance(cached_image, CachedMulledImageSingleTarget):
                continue
            if not cached_image.package_name == target.package:
                continue
            if not target.version or target.version == cached_image.version:
                image = cached_image
                break
    elif hash_func == "v2":
        name = v2_image_name(targets)
        if ":" in name:
            package_hash, version_hash = name.split(":", 2)
        else:
            package_hash, version_hash = name, None

        for cached_image in cached_images:
            if not isinstance(cached_image, CachedV2MulledImageMultiTarget):
                continue

            if version_hash is None:
                # Just match on package hash...
                if package_hash == cached_image.package_hash:
                    image = cached_image
                    break
            else:
                # Match on package and version hash...
                if package_hash == cached_image.package_hash and version_hash == cached_image.version_hash:
                    image = cached_image
                    break

    elif hash_func == "v1":
        name = v1_image_name(targets)
        for cached_image in cached_images:
            if not isinstance(cached_image, CachedV1MulledImageMultiTarget):
                continue

            if name == cached_image.hash:
                image = cached_image
                break
    return image


def docker_cached_container_description(
    targets: List[CondaTarget],
    namespace: str,
    hash_func: Literal["v1", "v2"] = "v2",
    shell: str = DEFAULT_CONTAINER_SHELL,
    resolution_cache: Optional[ResolutionCache] = None,
) -> Optional[ContainerDescription]:
    if len(targets) == 0:
        return None

    cached_images = list_docker_cached_mulled_images(namespace, hash_func=hash_func, resolution_cache=resolution_cache)
    image = find_best_matching_cached_image(targets, cached_images, hash_func)

    container = None
    if image:
        container = ContainerDescription(
            image.image_identifier,
            type="docker",
            shell=shell,
        )

    return container


def singularity_cached_container_description(
    targets: List[CondaTarget],
    cache_directory: CacheDirectory,
    hash_func: Literal["v1", "v2"] = "v2",
    shell: str = DEFAULT_CONTAINER_SHELL,
) -> Optional[ContainerDescription]:
    if len(targets) == 0:
        return None

    if not os.path.exists(cache_directory.path):
        return None

    cached_images = cache_directory.list_cached_mulled_images_from_path()
    image = find_best_matching_cached_image(targets, cached_images, hash_func)

    container = None
    if image:
        container = ContainerDescription(
            os.path.abspath(os.path.join(cache_directory.path, image.image_identifier)),
            type="singularity",
            shell=shell,
        )
    return container


def targets_to_mulled_name(
    targets: List[CondaTarget],
    hash_func: Literal["v1", "v2"],
    namespace: str,
    resolution_cache: Optional[ResolutionCache] = None,
    session: Optional[Session] = None,
) -> Optional[str]:
    unresolved_cache_key = "galaxy.tool_util.deps.container_resolvers.mulled:unresolved"
    if resolution_cache is not None:
        if unresolved_cache_key not in resolution_cache:
            resolution_cache[unresolved_cache_key] = set()
        unresolved_cache = resolution_cache.get(unresolved_cache_key)
    else:
        unresolved_cache = set()

    mulled_resolution_cache = None
    if resolution_cache and resolution_cache.mulled_resolution_cache:
        mulled_resolution_cache = resolution_cache.mulled_resolution_cache

    name = None

    def cached_name(cache_key: str) -> Optional[str]:
        if mulled_resolution_cache:
            try:
                return resolution_cache.get(cache_key)  # type: ignore[union-attr] # mulled_resolution_cache not None implies resolution_cache not None
            except KeyError:
                return None
        return None

    if len(targets) == 1:
        target = targets[0]
        target_version = target.version
        cache_key = f"ns[{namespace}]__single__{target.package}__@__{target_version}"
        if cache_key in unresolved_cache:
            return None
        name = cached_name(cache_key)
        if name:
            return name

        tags = mulled_tags_for(namespace, target.package, resolution_cache=resolution_cache, session=session)

        if tags:
            for tag in tags:
                if "--" in tag:
                    version, _ = split_tag(tag)
                else:
                    version = tag
                if target_version and version == target_version:
                    name = f"{target.package}:{tag}"
                    break

    else:

        def first_tag_if_available(image_name):
            if ":" in image_name:
                repo_name, tag_prefix = image_name.split(":", 2)
            else:
                repo_name = image_name
                tag_prefix = None
            tags = mulled_tags_for(
                namespace, repo_name, tag_prefix=tag_prefix, resolution_cache=resolution_cache, session=session
            )
            return tags[0] if tags else None

        if hash_func == "v2":
            base_image_name = v2_image_name(targets)
        elif hash_func == "v1":
            base_image_name = v1_image_name(targets)
        else:
            raise Exception(f"Unimplemented mulled hash_func [{hash_func}]")

        cache_key = f"ns[{namespace}]__{hash_func}__{base_image_name}"
        if cache_key in unresolved_cache:
            return None
        name = cached_name(cache_key)
        if name:
            return name

        tag = first_tag_if_available(base_image_name)
        if tag:
            if ":" in base_image_name:
                assert hash_func != "v1"
                # base_image_name of form <package_hash>:<version_hash>, expand tag
                # to include build number in tag.
                name = f"{base_image_name.split(':')[0]}:{tag}"
            else:
                # base_image_name of form <package_hash>, simply add build number
                # as tag to fully qualify image.
                name = f"{base_image_name}:{tag}"

    if name and mulled_resolution_cache:
        mulled_resolution_cache.put(cache_key, name)

    if name is None:
        unresolved_cache.add(name)

    return name


class CliContainerResolver(ContainerResolver):
    container_type = "docker"
    cli = "docker"

    def __init__(self, app_info: "AppInfo", **kwargs) -> None:
        super().__init__(app_info=app_info, **kwargs)
        self._cli_available = bool(which(self.cli))

    @property
    def cli_available(self) -> bool:
        return self._cli_available

    @cli_available.setter
    def cli_available(self, value: bool) -> None:
        if not value:
            log.info(
                f"{self.cli} CLI not available, cannot list or pull images in Galaxy process. Does not impact kubernetes."
            )
        self._cli_available = value


class SingularityCliContainerResolver(CliContainerResolver):
    container_type = "singularity"
    cli = "singularity"

    def __init__(self, app_info: "AppInfo", hash_func: Literal["v1", "v2"] = "v2", **kwargs) -> None:
        super().__init__(app_info=app_info, **kwargs)
        self.hash_func = hash_func
        self.cache_directory_cacher_type = kwargs.get("cache_directory_cacher_type")
        cacher_class = get_cache_directory_cacher(self.cache_directory_cacher_type)
        cache_directory_path = kwargs.get("cache_directory")
        if not cache_directory_path:
            assert self.app_info.container_image_cache_path
            cache_directory_path = os.path.join(self.app_info.container_image_cache_path, "singularity", "mulled")
        self.cache_directory = cacher_class(cache_directory_path, hash_func=self.hash_func)
        safe_makedirs(self.cache_directory.path)


[docs]class CachedMulledDockerContainerResolver(CliContainerResolver): resolver_type = "cached_mulled" shell = "/bin/bash"
[docs] def __init__( self, app_info: "AppInfo", namespace: str = "biocontainers", hash_func: Literal["v1", "v2"] = "v2", **kwds ): super().__init__(app_info=app_info, **kwds) self.namespace = namespace self.hash_func = hash_func
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: if ( not self.cli_available or tool_info.requires_galaxy_python_environment or self.container_type not in enabled_container_types ): return None targets = mulled_targets(tool_info) log.debug(f"Image name for tool {tool_info.tool_id}: {image_name(targets, self.hash_func)}") resolution_cache = kwds.get("resolution_cache") return docker_cached_container_description( targets, self.namespace, hash_func=self.hash_func, shell=self.shell, resolution_cache=resolution_cache )
def __str__(self): return f"CachedMulledDockerContainerResolver[namespace={self.namespace}]"
[docs]class CachedMulledSingularityContainerResolver(SingularityCliContainerResolver): resolver_type = "cached_mulled_singularity" shell = "/bin/bash"
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: if tool_info.requires_galaxy_python_environment or self.container_type not in enabled_container_types: return None targets = mulled_targets(tool_info) log.debug(f"Image name for tool {tool_info.tool_id}: {image_name(targets, self.hash_func)}") return singularity_cached_container_description( targets, self.cache_directory, hash_func=self.hash_func, shell=self.shell )
def __str__(self) -> str: return f"CachedMulledSingularityContainerResolver[cache_directory={self.cache_directory.path}]"
[docs]class MulledDockerContainerResolver(CliContainerResolver): """Look for mulled images matching tool dependencies.""" resolver_type = "mulled" shell = "/bin/bash" protocol: Optional[str] = None
[docs] def __init__( self, app_info: "AppInfo", namespace: str = "biocontainers", hash_func: Literal["v1", "v2"] = "v2", auto_install: bool = True, **kwds, ) -> None: super().__init__(app_info=app_info, **kwds) self.namespace = namespace self.hash_func = hash_func self.auto_install = string_as_bool(auto_install)
[docs] def cached_container_description( self, targets: List[CondaTarget], namespace: str, hash_func: Literal["v1", "v2"], resolution_cache: Optional[ResolutionCache] = None, ) -> Optional[ContainerDescription]: try: return docker_cached_container_description( targets, namespace, hash_func=hash_func, resolution_cache=resolution_cache ) except subprocess.CalledProcessError: # We should only get here if a docker binary is available, but command quits with a non-zero exit code, # e.g if the docker daemon is not available log.exception("An error occured while listing cached docker image. Docker daemon may need to be restarted.") return None
[docs] def pull(self, container: Container) -> None: if self.cli_available: assert isinstance(container, DockerContainer) command = container.build_pull_command() shell(command)
@property def can_list_containers(self) -> bool: return self.cli_available
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", install: bool = False, session: Optional[Session] = None, **kwds, ) -> Optional[ContainerDescription]: resolution_cache = kwds.get("resolution_cache") if tool_info.requires_galaxy_python_environment or self.container_type not in enabled_container_types: return None targets = mulled_targets(tool_info) log.debug(f"Image name for tool {tool_info.tool_id}: {image_name(targets, self.hash_func)}") if len(targets) == 0: return None name = targets_to_mulled_name( targets=targets, hash_func=self.hash_func, namespace=self.namespace, resolution_cache=resolution_cache, session=session, ) if not name: return None container_id = f"quay.io/{self.namespace}/{name}" if self.protocol: container_id = f"{self.protocol}{container_id}" container_description = ContainerDescription( container_id, type=self.container_type, shell=self.shell, ) if self.can_list_containers: if install and not self.cached_container_description( targets, namespace=self.namespace, hash_func=self.hash_func, resolution_cache=resolution_cache, ): destination_info = {} destination_for_container_type = kwds.get("destination_for_container_type") if destination_for_container_type: destination_info = destination_for_container_type(self.container_type) container = CONTAINER_CLASSES[self.container_type]( container_description.identifier, self.app_info, tool_info, destination_info, None, container_description, ) self.pull(container) if not self.auto_install: container_description = ( self.cached_container_description( targets, namespace=self.namespace, hash_func=self.hash_func, resolution_cache=resolution_cache, ) or container_description ) return container_description
def __str__(self) -> str: return f"MulledDockerContainerResolver[namespace={self.namespace}]"
[docs]class MulledSingularityContainerResolver(SingularityCliContainerResolver, MulledDockerContainerResolver): resolver_type = "mulled_singularity" protocol = "docker://"
[docs] def __init__( self, app_info: "AppInfo", hash_func: Literal["v1", "v2"] = "v2", namespace: str = "biocontainers", auto_install: bool = True, **kwds, ) -> None: super().__init__(app_info=app_info, hash_func=hash_func, **kwds) self.namespace = namespace self.auto_install = string_as_bool(auto_install)
[docs] def cached_container_description( self, targets: List[CondaTarget], namespace: str, hash_func: Literal["v1", "v2"], resolution_cache: Optional[ResolutionCache] = None, ) -> Optional[ContainerDescription]: return singularity_cached_container_description( targets, cache_directory=self.cache_directory, hash_func=hash_func )
@property def can_list_containers(self) -> bool: # Only needs access to path, doesn't require CLI return True
[docs] def pull(self, container: Container) -> None: if self.cli_available: assert isinstance(container, SingularityContainer) cmds = container.build_mulled_singularity_pull_command( cache_directory=self.cache_directory.path, namespace=self.namespace ) shell(cmds=cmds) self.cache_directory.invalidate_cache()
def __str__(self) -> str: return f"MulledSingularityContainerResolver[namespace={self.namespace}]"
[docs]class BuildMulledDockerContainerResolver(CliContainerResolver): """Build for Docker mulled images matching tool dependencies.""" resolver_type = "build_mulled" shell = "/bin/bash" builds_on_resolution = True
[docs] def __init__( self, app_info: "AppInfo", namespace: str = "local", hash_func: Literal["v1", "v2"] = "v2", auto_install: bool = True, **kwds, ) -> None: super().__init__(app_info=app_info, **kwds) self._involucro_context_kwds = {"involucro_bin": self._get_config_option("involucro_path", None)} self.namespace = namespace self.hash_func = hash_func self.auto_install = string_as_bool(auto_install) self._mulled_kwds = { "namespace": namespace, "hash_func": self.hash_func, "command": "build-and-test", "use_mamba": True, } self._mulled_kwds["channels"] = default_mulled_conda_channels_from_env() or self._get_config_option( "mulled_channels", DEFAULT_CHANNELS ) self.involucro_context = InvolucroContext(**self._involucro_context_kwds) auto_init = self._get_config_option("involucro_auto_init", True) self.enabled = ensure_installed(self.involucro_context, auto_init)
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", install: bool = False, **kwds ) -> Optional[ContainerDescription]: if tool_info.requires_galaxy_python_environment or self.container_type not in enabled_container_types: return None targets = mulled_targets(tool_info) log.debug(f"Image name for tool {tool_info.tool_id}: {image_name(targets, self.hash_func)}") if len(targets) == 0: return None if self.auto_install or install: mull_targets(targets, involucro_context=self.involucro_context, **self._mulled_kwds) return docker_cached_container_description(targets, self.namespace, hash_func=self.hash_func, shell=self.shell)
def __str__(self) -> str: return f"BuildDockerContainerResolver[namespace={self.namespace}]"
[docs]class BuildMulledSingularityContainerResolver(SingularityCliContainerResolver): """Build for Singularity mulled images matching tool dependencies.""" resolver_type = "build_mulled_singularity" shell = "/bin/bash" builds_on_resolution = True
[docs] def __init__( self, app_info: "AppInfo", hash_func: Literal["v1", "v2"] = "v2", auto_install: bool = True, **kwds, ) -> None: super().__init__(app_info=app_info, hash_func=hash_func, **kwds) self._involucro_context_kwds = {"involucro_bin": self._get_config_option("involucro_path", None)} self.auto_install = string_as_bool(auto_install) self._mulled_kwds = { "channels": self._get_config_option("mulled_channels", DEFAULT_CHANNELS), "hash_func": self.hash_func, "command": "build-and-test", "singularity": True, "singularity_image_dir": self.cache_directory.path, "use_mamba": True, } self.involucro_context = InvolucroContext(**self._involucro_context_kwds) auto_init = self._get_config_option("involucro_auto_init", True) self.enabled = ensure_installed(self.involucro_context, auto_init)
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", install: bool = False, **kwds ) -> Optional[ContainerDescription]: if tool_info.requires_galaxy_python_environment or self.container_type not in enabled_container_types: return None targets = mulled_targets(tool_info) log.debug(f"Image name for tool {tool_info.tool_id}: {image_name(targets, self.hash_func)}") if len(targets) == 0: return None if self.auto_install or install: mull_targets(targets, involucro_context=self.involucro_context, **self._mulled_kwds) return singularity_cached_container_description( targets, self.cache_directory, hash_func=self.hash_func, shell=self.shell )
def __str__(self) -> str: return f"BuildSingularityContainerResolver[cache_directory={self.cache_directory.path}]"
def mulled_targets(tool_info: "ToolInfo") -> List[CondaTarget]: return requirements_to_mulled_targets(tool_info.requirements) def image_name(targets: List[CondaTarget], hash_func: Literal["v1", "v2"]) -> str: if len(targets) == 0: return "no targets" elif hash_func == "v2": return v2_image_name(targets) else: return v1_image_name(targets) __all__ = ( "CachedMulledDockerContainerResolver", "CachedMulledSingularityContainerResolver", "MulledDockerContainerResolver", "MulledSingularityContainerResolver", "BuildMulledDockerContainerResolver", "BuildMulledSingularityContainerResolver", )