Source code for galaxy.tool_util.deps.container_resolvers.explicit

"""This module describes the :class:`ExplicitContainerResolver` ContainerResolver plugin."""
import copy
import logging
import os
from typing import (
    cast,
    Container,
    Optional,
    TYPE_CHECKING,
)

from galaxy.util.commands import shell
from . import ContainerResolver
from .mulled import CliContainerResolver
from ..container_classes import SingularityContainer
from ..requirements import ContainerDescription

if TYPE_CHECKING:
    from ..dependencies import (
        AppInfo,
        ToolInfo,
    )

log = logging.getLogger(__name__)

DEFAULT_SHELL = "/bin/bash"


[docs]class ExplicitContainerResolver(ContainerResolver): """Find explicit containers referenced in the tool description (e.g. tool XML file) if present.""" resolver_type = "explicit"
[docs] def resolve( self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: """Find a container explicitly mentioned in tool description. This ignores the tool requirements and assumes the tool author crafted a correct container. """ for container_description in tool_info.container_descriptions: if self._container_type_enabled(container_description, enabled_container_types): container_description.explicit = True return container_description return None
[docs]class ExplicitSingularityContainerResolver(ExplicitContainerResolver): resolver_type = "explicit_singularity" container_type = "singularity"
[docs] def resolve( self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: """Find a container explicitly mentioned in tool description. This ignores the tool requirements and assumes the tool author crafted a correct container. We use singularity here to fetch docker containers, hence the container_description hack here. """ for container_description in tool_info.container_descriptions: if container_description.type == "docker": desc_dict = container_description.to_dict() desc_dict["type"] = self.container_type desc_dict["identifier"] = f"docker://{container_description.identifier}" container_description = container_description.from_dict(desc_dict) if self._container_type_enabled(container_description, enabled_container_types): return container_description return None
# TODO: should this derive from SingularityCliContainerResolver?
[docs]class CachedExplicitSingularityContainerResolver(CliContainerResolver): resolver_type = "cached_explicit_singularity" container_type = "singularity" cli = "singularity"
[docs] def __init__(self, app_info: "AppInfo", **kwargs) -> None: super().__init__(app_info=app_info, **kwargs) cache_directory_path = kwargs.get("cache_directory") if not cache_directory_path: assert self.app_info.container_image_cache_path cache_directory_path = os.path.join(self.app_info.container_image_cache_path, "singularity", "explicit") self.cache_directory_path = cache_directory_path os.makedirs(self.cache_directory_path, exist_ok=True)
[docs] def resolve( self, enabled_container_types: Container[str], tool_info: "ToolInfo", install: bool = False, **kwds ) -> Optional[ContainerDescription]: """Find a container explicitly mentioned in tool description. This ignores the tool requirements and assumes the tool author crafted a correct container. We use singularity here to fetch docker containers, hence the container_description hack here. """ for container_description in tool_info.container_descriptions: # type: ContainerDescription container_description = copy.copy(container_description) if container_description.type == "docker": container_description.type = self.container_type container_description.identifier = f"docker://{container_description.identifier}" if not self._container_type_enabled(container_description, enabled_container_types): return None if not self.cli_available: return container_description image_id = cast(str, container_description.identifier) cache_path = os.path.normpath(os.path.join(self.cache_directory_path, image_id)) if install and not os.path.exists(cache_path): destination_info = {} destination_for_container_type = kwds.get("destination_for_container_type") if destination_for_container_type: destination_info = destination_for_container_type(self.container_type) container = SingularityContainer( container_id=container_description.identifier, app_info=self.app_info, tool_info=tool_info, destination_info=destination_info, job_info=None, container_description=container_description, ) command = container.build_singularity_pull_command(cache_path=cache_path) shell(command) # Point to container in the cache in stead. container_description.identifier = cache_path return container_description else: # No container descriptions found return None
def __str__(self): return f"CachedExplicitSingularityContainerResolver[cache_directory={self.cache_directory_path}]"
class BaseAdminConfiguredContainerResolver(ContainerResolver): def __init__(self, app_info: "AppInfo", shell: str = DEFAULT_SHELL, **kwds) -> None: super().__init__(app_info=app_info, **kwds) self.shell = shell def _container_description(self, identifier: str, container_type: str) -> ContainerDescription: container_description = ContainerDescription( identifier, type=container_type, shell=self.shell, ) return container_description
[docs]class FallbackContainerResolver(BaseAdminConfiguredContainerResolver): """Specify an explicit, identified container as a Docker container resolver.""" resolver_type = "fallback" container_type = "docker"
[docs] def __init__(self, app_info: "AppInfo", identifier: str = "", **kwds) -> None: super().__init__(app_info=app_info, **kwds) assert identifier, "fallback container resolver must be specified with non-empty identifier" self.identifier = identifier
def _match( self, enabled_container_types: Container[str], tool_info: "ToolInfo", container_description: ContainerDescription, ) -> bool: if self._container_type_enabled(container_description, enabled_container_types): return True return False
[docs] def resolve( self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: container_description = self._container_description(self.identifier, self.container_type) if self._match(enabled_container_types, tool_info, container_description): return container_description return None
[docs]class FallbackSingularityContainerResolver(FallbackContainerResolver): """Specify an explicit, identified container as a Singularity container resolver.""" resolver_type = "fallback_singularity" container_type = "singularity"
[docs]class FallbackNoRequirementsContainerResolver(FallbackContainerResolver): resolver_type = "fallback_no_requirements" def _match( self, enabled_container_types: Container[str], tool_info: "ToolInfo", container_description: ContainerDescription, ) -> bool: type_matches = super()._match(enabled_container_types, tool_info, container_description) return type_matches and (tool_info.requirements is None or len(tool_info.requirements) == 0)
[docs]class FallbackNoRequirementsSingularityContainerResolver(FallbackNoRequirementsContainerResolver): resolver_type = "fallback_no_requirements_singularity" container_type = "singularity"
[docs]class RequiresGalaxyEnvironmentContainerResolver(FallbackContainerResolver): resolver_type = "requires_galaxy_environment" def _match( self, enabled_container_types: Container[str], tool_info: "ToolInfo", container_description: ContainerDescription, ) -> bool: type_matches = super()._match(enabled_container_types, tool_info, container_description) return type_matches and tool_info.requires_galaxy_python_environment
[docs]class RequiresGalaxyEnvironmentSingularityContainerResolver(RequiresGalaxyEnvironmentContainerResolver): resolver_type = "requires_galaxy_environment_singularity" container_type = "singularity"
[docs]class MappingContainerResolver(BaseAdminConfiguredContainerResolver): resolver_type = "mapping"
[docs] def __init__(self, app_info: "AppInfo", **kwds) -> None: super().__init__(app_info=app_info, **kwds) mappings = self.resolver_kwds["mappings"] assert isinstance(mappings, list), "mapping container resolver must be specified with mapping list" self.mappings = mappings
[docs] def resolve( self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds ) -> Optional[ContainerDescription]: tool_id = tool_info.tool_id # If resolving against dependencies and not a specific tool, skip over this resolver if not tool_id: return None tool_version = tool_info.tool_version for mapping in self.mappings: if mapping.get("tool_id") != tool_id: continue mapping_tool_version = mapping.get("tool_version") if mapping_tool_version is not None and tool_version != mapping_tool_version: continue container_description = self._container_description(mapping["identifier"], mapping.get("container_type")) if not self._container_type_enabled(container_description, enabled_container_types): continue return container_description return None
__all__ = ( "ExplicitContainerResolver", "ExplicitSingularityContainerResolver", "CachedExplicitSingularityContainerResolver", "FallbackContainerResolver", "FallbackSingularityContainerResolver", "FallbackNoRequirementsContainerResolver", "FallbackNoRequirementsSingularityContainerResolver", "MappingContainerResolver", "RequiresGalaxyEnvironmentContainerResolver", "RequiresGalaxyEnvironmentSingularityContainerResolver", )