Source code for galaxy.tool_util.deps.containers

import collections
import logging
import os
from typing import (
    Any,
    Container as TypingContainer,
    Dict,
    List,
    Optional,
    Type,
    TYPE_CHECKING,
)

from requests import Session
from typing_extensions import Literal

from galaxy.util import (
    asbool,
    plugin_config,
)
from .container_classes import (
    Container,
    CONTAINER_CLASSES,
    DOCKER_CONTAINER_TYPE,
    SINGULARITY_CONTAINER_TYPE,
)
from .container_resolvers import ResolutionCache
from .container_resolvers.explicit import (
    ExplicitContainerResolver,
    ExplicitSingularityContainerResolver,
)
from .container_resolvers.mulled import (
    BuildMulledDockerContainerResolver,
    BuildMulledSingularityContainerResolver,
    CachedMulledDockerContainerResolver,
    CachedMulledSingularityContainerResolver,
    MulledDockerContainerResolver,
    MulledSingularityContainerResolver,
)
from .requirements import ContainerDescription

if TYPE_CHECKING:
    from beaker.cache import Cache

    from galaxy.util.plugin_config import PluginConfigSource
    from .container_resolvers import ContainerResolver
    from .dependencies import (
        AppInfo,
        JobInfo,
        ToolInfo,
    )

log = logging.getLogger(__name__)


DEFAULT_CONTAINER_TYPE = DOCKER_CONTAINER_TYPE
ALL_CONTAINER_TYPES = [DOCKER_CONTAINER_TYPE, SINGULARITY_CONTAINER_TYPE]

ResolvedContainerDescription = collections.namedtuple(
    "ResolvedContainerDescription", ["container_resolver", "container_description"]
)


[docs]class ContainerFinder:
[docs] def __init__(self, app_info: "AppInfo", mulled_resolution_cache: Optional["Cache"] = None) -> None: self.app_info = app_info self.mulled_resolution_cache = mulled_resolution_cache self.default_container_registry = ContainerRegistry(app_info, mulled_resolution_cache=mulled_resolution_cache) self.destination_container_registeries: Dict[str, ContainerRegistry] = {}
def _enabled_container_types(self, destination_info: Dict[str, Any]) -> List[str]: return [t for t in ALL_CONTAINER_TYPES if self.__container_type_enabled(t, destination_info)]
[docs] def find_best_container_description( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: """Regardless of destination properties - find best container for tool. Given container types and container.ToolInfo description of the tool.""" return self.default_container_registry.find_best_container_description( enabled_container_types, tool_info, **kwds )
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", **kwds ) -> Optional[ResolvedContainerDescription]: """Regardless of destination properties - find ResolvedContainerDescription for tool.""" return self.default_container_registry.resolve(enabled_container_types, tool_info, **kwds)
def _container_registry_for_destination(self, destination_info: Dict[str, Any]) -> "ContainerRegistry": destination_id = destination_info.get("id") # Probably not the way to get the ID? destination_container_registry = None if destination_id and destination_id not in self.destination_container_registeries: if "container_resolvers" in destination_info or "container_resolvers_config_file" in destination_info: destination_container_registry = ContainerRegistry( self.app_info, destination_info=destination_info, mulled_resolution_cache=self.mulled_resolution_cache, ) self.destination_container_registeries[destination_id] = destination_container_registry elif not destination_id and ( "container_resolvers" in destination_info or "container_resolvers_config_file" in destination_info ): destination_container_registry = ContainerRegistry( self.app_info, destination_info=destination_info, mulled_resolution_cache=self.mulled_resolution_cache ) if ( destination_container_registry is None and destination_id and destination_id in self.destination_container_registeries ): destination_container_registry = self.destination_container_registeries[destination_id] return destination_container_registry or self.default_container_registry
[docs] def find_container( self, tool_info: "ToolInfo", destination_info: Dict[str, Any], job_info: "JobInfo" ) -> Optional[Container]: enabled_container_types = self._enabled_container_types(destination_info) # Short-cut everything else and just skip checks if no container type is enabled. if not enabled_container_types: return None def __destination_container( container_description: Optional[ContainerDescription] = None, container_id: Optional[str] = None, container_type: Optional[str] = None, ) -> Optional[Container]: """ either container_description or container_id and container_type must me given """ if container_description: container_id = container_description.identifier container_type = container_description.type if container_type is None: return None else: assert container_id container = self.__destination_container( container_id, container_type, tool_info, destination_info, job_info, container_description, ) return container def container_from_description_from_dicts( destination_container_dicts: List[Dict[str, Any]] ) -> Optional[Container]: for destination_container_dict in destination_container_dicts: container_description = ContainerDescription.from_dict(destination_container_dict) if container_description: container = __destination_container(container_description) if container: return container return None if "container_override" in destination_info: container = container_from_description_from_dicts(destination_info["container_override"]) if container: return container # If destination forcing Galaxy to use a particular container do it, # this is likely kind of a corner case. For instance if deployers # do not trust the containers annotated in tools. for container_type in CONTAINER_CLASSES.keys(): container_id = self.__overridden_container_id(container_type, destination_info) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container # Otherwise lets see if we can find container for the tool. container_registry = self._container_registry_for_destination(destination_info) container_description = container_registry.find_best_container_description(enabled_container_types, tool_info) container = __destination_container(container_description) if container: return container # If we still don't have a container, check to see if any container # types define a default container id and use that. if "container" in destination_info: container = container_from_description_from_dicts(destination_info["container"]) if container: return container for container_type in CONTAINER_CLASSES.keys(): container_id = self.__default_container_id(container_type, destination_info) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container return None
[docs] def resolution_cache(self) -> ResolutionCache: return self.default_container_registry.get_resolution_cache()
def __overridden_container_id(self, container_type: str, destination_info: Dict[str, Any]) -> Optional[str]: if not self.__container_type_enabled(container_type, destination_info): return None if f"{container_type}_container_id_override" in destination_info: return destination_info.get(f"{container_type}_container_id_override") if f"{container_type}_image_override" in destination_info: return self.__build_container_id_from_parts(container_type, destination_info, mode="override") return None def __build_container_id_from_parts( self, container_type: str, destination_info: Dict[str, Any], mode: Literal["default", "override"] ) -> str: repo = "" owner = "" repo_key = f"{container_type}_repo_{mode}" owner_key = f"{container_type}_owner_{mode}" if repo_key in destination_info: repo = f"{destination_info[repo_key]}/" if owner_key in destination_info: owner = f"{destination_info[owner_key]}/" cont_id = repo + owner + destination_info[f"{container_type}_image_{mode}"] tag_key = f"{container_type}_tag_{mode}" if tag_key in destination_info: cont_id += f":{destination_info[tag_key]}" return cont_id def __default_container_id(self, container_type: str, destination_info: Dict[str, Any]) -> Optional[str]: if not self.__container_type_enabled(container_type, destination_info): return None key = f"{container_type}_default_container_id" # Also allow docker_image... if key not in destination_info: key = f"{container_type}_image" if key in destination_info: return destination_info.get(key) elif f"{container_type}_image_default" in destination_info: return self.__build_container_id_from_parts(container_type, destination_info, mode="default") return None def __destination_container( self, container_id: str, container_type: str, tool_info: "ToolInfo", destination_info: Dict[str, Any], job_info: "JobInfo", container_description: Optional[ContainerDescription] = None, ) -> Optional[Container]: # TODO: ensure destination_info is dict-like if not self.__container_type_enabled(container_type, destination_info): return None # TODO: Right now this assumes all containers available when a # container type is - there should be more thought put into this. # Checking which are available - settings policies for what can be # auto-fetched, etc.... return CONTAINER_CLASSES[container_type]( container_id, self.app_info, tool_info, destination_info, job_info, container_description ) def __container_type_enabled(self, container_type: str, destination_info: Dict[str, Any]) -> bool: return asbool(destination_info.get(f"{container_type}_enabled", False))
[docs]class NullContainerFinder:
[docs] def find_container(self, tool_info: "ToolInfo", destination_info: Dict[str, Any], job_info: "JobInfo") -> None: return None
[docs]class ContainerRegistry: """Loop through enabled ContainerResolver plugins and find first match."""
[docs] def __init__( self, app_info: "AppInfo", destination_info: Optional[Dict[str, Any]] = None, mulled_resolution_cache: Optional["Cache"] = None, ) -> None: self.resolver_classes = self.__resolvers_dict() self.enable_mulled_containers = app_info.enable_mulled_containers self.app_info = app_info self.container_resolvers = self.__build_container_resolvers(app_info, destination_info) self.mulled_resolution_cache = mulled_resolution_cache
def __build_container_resolvers( self, app_info: "AppInfo", destination_info: Optional[Dict[str, Any]] ) -> List["ContainerResolver"]: app_conf_file = getattr(app_info, "container_resolvers_config_file", None) app_conf_dict = getattr(app_info, "container_resolvers_config_dict", None) if destination_info is not None and ( "container_resolvers" in destination_info or "container_resolvers_config_file" in destination_info ): conf_file = destination_info.get("container_resolvers_config_file") conf_dict = destination_info.get("container_resolvers") else: conf_file = app_conf_file conf_dict = app_conf_dict plugin_source = None if conf_dict: log.debug("Loading container resolution config inline from Galaxy or job configuration file") plugin_source = plugin_config.plugin_source_from_dict(conf_dict) elif conf_file and not os.path.exists(conf_file): log.warning(f"Unable to find config file '{conf_file}'") elif conf_file: log.debug(f"Loading container resolution config from file '{conf_file}'") plugin_source = plugin_config.plugin_source_from_path(conf_file) if plugin_source: return self._parse_resolver_conf(plugin_source) return self.__default_container_resolvers() def _parse_resolver_conf(self, plugin_source: "PluginConfigSource") -> List["ContainerResolver"]: extra_kwds = {"app_info": self.app_info} return plugin_config.load_plugins(self.resolver_classes, plugin_source, extra_kwds) def __default_container_resolvers(self) -> List["ContainerResolver"]: default_resolvers: List[ContainerResolver] = [ ExplicitContainerResolver(self.app_info), ExplicitSingularityContainerResolver(self.app_info), ] if self.enable_mulled_containers: default_resolvers.extend( [ CachedMulledDockerContainerResolver(self.app_info, namespace="biocontainers"), CachedMulledDockerContainerResolver(self.app_info, namespace="local"), CachedMulledSingularityContainerResolver(self.app_info, namespace="biocontainers"), CachedMulledSingularityContainerResolver(self.app_info, namespace="local"), MulledDockerContainerResolver(self.app_info, namespace="biocontainers"), MulledSingularityContainerResolver(self.app_info, namespace="biocontainers"), ] ) # BuildMulledDockerContainerResolver and BuildMulledSingularityContainerResolver both need the docker daemon to build images. # If docker is not available, we don't load them. build_mulled_docker_container_resolver = BuildMulledDockerContainerResolver(self.app_info) if build_mulled_docker_container_resolver.cli_available: default_resolvers.extend( [ build_mulled_docker_container_resolver, BuildMulledSingularityContainerResolver(self.app_info), ] ) return default_resolvers def __resolvers_dict(self) -> Dict[str, Type["ContainerResolver"]]: import galaxy.tool_util.deps.container_resolvers return plugin_config.plugins_dict(galaxy.tool_util.deps.container_resolvers, "resolver_type")
[docs] def get_resolution_cache(self) -> ResolutionCache: cache = ResolutionCache() if self.mulled_resolution_cache is not None: cache.mulled_resolution_cache = self.mulled_resolution_cache return cache
[docs] def find_best_container_description( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", **kwds: Any ) -> Optional[ContainerDescription]: """Yield best container description of supplied types matching tool info.""" try: resolved_container_description = self.resolve(enabled_container_types, tool_info, **kwds) except Exception: log.exception("Could not get container description for tool '%s'", tool_info.tool_id) return None return None if resolved_container_description is None else resolved_container_description.container_description
[docs] def resolve( self, enabled_container_types: TypingContainer[str], tool_info: "ToolInfo", index: Optional[int] = None, resolver_type: Optional[str] = None, install: bool = True, resolution_cache: Optional[ResolutionCache] = None, session: Optional[Session] = None, ) -> Optional[ResolvedContainerDescription]: resolution_cache = resolution_cache or self.get_resolution_cache() for i, container_resolver in enumerate(self.container_resolvers): if index is not None and i != index: continue if resolver_type is not None and resolver_type != container_resolver.resolver_type: continue if hasattr(container_resolver, "container_type"): if container_resolver.container_type not in enabled_container_types: continue if not install and container_resolver.builds_on_resolution: continue container_description = container_resolver.resolve( enabled_container_types, tool_info, install=install, resolution_cache=resolution_cache, session=session ) log.info( f"Checking with container resolver [{container_resolver}] found description [{container_description}]" ) if container_description: assert container_resolver._container_type_enabled(container_description, enabled_container_types) return ResolvedContainerDescription(container_resolver, container_description) return None