Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.deps.container_resolvers.explicit
"""This module describes the :class:`ExplicitContainerResolver` ContainerResolver plugin."""
import copy
import logging
import os
from typing import (
cast,
Container,
Optional,
TYPE_CHECKING,
)
from galaxy.util.commands import shell
from . import ContainerResolver
from .mulled import CliContainerResolver
from ..container_classes import SingularityContainer
from ..requirements import ContainerDescription
if TYPE_CHECKING:
from ..dependencies import (
AppInfo,
ToolInfo,
)
log = logging.getLogger(__name__)
DEFAULT_SHELL = "/bin/bash"
[docs]class ExplicitContainerResolver(ContainerResolver):
"""Find explicit containers referenced in the tool description (e.g. tool XML file) if present."""
resolver_type = "explicit"
[docs] def resolve(
self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds
) -> Optional[ContainerDescription]:
"""Find a container explicitly mentioned in tool description.
This ignores the tool requirements and assumes the tool author crafted
a correct container.
"""
for container_description in tool_info.container_descriptions:
if self._container_type_enabled(container_description, enabled_container_types):
container_description.explicit = True
return container_description
return None
[docs]class ExplicitSingularityContainerResolver(ExplicitContainerResolver):
resolver_type = "explicit_singularity"
container_type = "singularity"
[docs] def resolve(
self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds
) -> Optional[ContainerDescription]:
"""Find a container explicitly mentioned in tool description.
This ignores the tool requirements and assumes the tool author crafted
a correct container. We use singularity here to fetch docker containers,
hence the container_description hack here.
"""
for container_description in tool_info.container_descriptions:
if container_description.type == "docker":
desc_dict = container_description.to_dict()
desc_dict["type"] = self.container_type
desc_dict["identifier"] = f"docker://{container_description.identifier}"
container_description = container_description.from_dict(desc_dict)
if self._container_type_enabled(container_description, enabled_container_types):
return container_description
return None
# TODO: should this derive from SingularityCliContainerResolver?
[docs]class CachedExplicitSingularityContainerResolver(CliContainerResolver):
resolver_type = "cached_explicit_singularity"
container_type = "singularity"
cli = "singularity"
[docs] def __init__(self, app_info: "AppInfo", **kwargs) -> None:
super().__init__(app_info=app_info, **kwargs)
cache_directory_path = kwargs.get("cache_directory")
if not cache_directory_path:
assert self.app_info.container_image_cache_path
cache_directory_path = os.path.join(self.app_info.container_image_cache_path, "singularity", "explicit")
self.cache_directory_path = cache_directory_path
os.makedirs(self.cache_directory_path, exist_ok=True)
[docs] def resolve(
self, enabled_container_types: Container[str], tool_info: "ToolInfo", install: bool = False, **kwds
) -> Optional[ContainerDescription]:
"""Find a container explicitly mentioned in tool description.
This ignores the tool requirements and assumes the tool author crafted
a correct container. We use singularity here to fetch docker containers,
hence the container_description hack here.
"""
for container_description in tool_info.container_descriptions: # type: ContainerDescription
container_description = copy.copy(container_description)
if container_description.type == "docker":
container_description.type = self.container_type
container_description.identifier = f"docker://{container_description.identifier}"
if not self._container_type_enabled(container_description, enabled_container_types):
return None
if not self.cli_available:
return container_description
image_id = cast(str, container_description.identifier)
cache_path = os.path.normpath(os.path.join(self.cache_directory_path, image_id))
if install and not os.path.exists(cache_path):
destination_info = {}
destination_for_container_type = kwds.get("destination_for_container_type")
if destination_for_container_type:
destination_info = destination_for_container_type(self.container_type)
container = SingularityContainer(
container_id=container_description.identifier,
app_info=self.app_info,
tool_info=tool_info,
destination_info=destination_info,
job_info=None,
container_description=container_description,
)
command = container.build_singularity_pull_command(cache_path=cache_path)
shell(command)
# Point to container in the cache in stead.
container_description.identifier = cache_path
return container_description
else: # No container descriptions found
return None
def __str__(self):
return f"CachedExplicitSingularityContainerResolver[cache_directory={self.cache_directory_path}]"
class BaseAdminConfiguredContainerResolver(ContainerResolver):
def __init__(self, app_info: "AppInfo", shell: str = DEFAULT_SHELL, **kwds) -> None:
super().__init__(app_info=app_info, **kwds)
self.shell = shell
def _container_description(self, identifier: str, container_type: str) -> ContainerDescription:
container_description = ContainerDescription(
identifier,
type=container_type,
shell=self.shell,
)
return container_description
[docs]class FallbackContainerResolver(BaseAdminConfiguredContainerResolver):
"""Specify an explicit, identified container as a Docker container resolver."""
resolver_type = "fallback"
container_type = "docker"
[docs] def __init__(self, app_info: "AppInfo", identifier: str = "", **kwds) -> None:
super().__init__(app_info=app_info, **kwds)
assert identifier, "fallback container resolver must be specified with non-empty identifier"
self.identifier = identifier
def _match(
self,
enabled_container_types: Container[str],
tool_info: "ToolInfo",
container_description: ContainerDescription,
) -> bool:
if self._container_type_enabled(container_description, enabled_container_types):
return True
return False
[docs] def resolve(
self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds
) -> Optional[ContainerDescription]:
container_description = self._container_description(self.identifier, self.container_type)
if self._match(enabled_container_types, tool_info, container_description):
return container_description
return None
[docs]class FallbackSingularityContainerResolver(FallbackContainerResolver):
"""Specify an explicit, identified container as a Singularity container resolver."""
resolver_type = "fallback_singularity"
container_type = "singularity"
[docs]class FallbackNoRequirementsContainerResolver(FallbackContainerResolver):
resolver_type = "fallback_no_requirements"
def _match(
self,
enabled_container_types: Container[str],
tool_info: "ToolInfo",
container_description: ContainerDescription,
) -> bool:
type_matches = super()._match(enabled_container_types, tool_info, container_description)
return type_matches and (tool_info.requirements is None or len(tool_info.requirements) == 0)
[docs]class FallbackNoRequirementsSingularityContainerResolver(FallbackNoRequirementsContainerResolver):
resolver_type = "fallback_no_requirements_singularity"
container_type = "singularity"
[docs]class RequiresGalaxyEnvironmentContainerResolver(FallbackContainerResolver):
resolver_type = "requires_galaxy_environment"
def _match(
self,
enabled_container_types: Container[str],
tool_info: "ToolInfo",
container_description: ContainerDescription,
) -> bool:
type_matches = super()._match(enabled_container_types, tool_info, container_description)
return type_matches and tool_info.requires_galaxy_python_environment
[docs]class RequiresGalaxyEnvironmentSingularityContainerResolver(RequiresGalaxyEnvironmentContainerResolver):
resolver_type = "requires_galaxy_environment_singularity"
container_type = "singularity"
[docs]class MappingContainerResolver(BaseAdminConfiguredContainerResolver):
resolver_type = "mapping"
[docs] def __init__(self, app_info: "AppInfo", **kwds) -> None:
super().__init__(app_info=app_info, **kwds)
mappings = self.resolver_kwds["mappings"]
assert isinstance(mappings, list), "mapping container resolver must be specified with mapping list"
self.mappings = mappings
[docs] def resolve(
self, enabled_container_types: Container[str], tool_info: "ToolInfo", **kwds
) -> Optional[ContainerDescription]:
tool_id = tool_info.tool_id
# If resolving against dependencies and not a specific tool, skip over this resolver
if not tool_id:
return None
tool_version = tool_info.tool_version
for mapping in self.mappings:
if mapping.get("tool_id") != tool_id:
continue
mapping_tool_version = mapping.get("tool_version")
if mapping_tool_version is not None and tool_version != mapping_tool_version:
continue
container_description = self._container_description(mapping["identifier"], mapping.get("container_type"))
if not self._container_type_enabled(container_description, enabled_container_types):
continue
return container_description
return None
__all__ = (
"ExplicitContainerResolver",
"ExplicitSingularityContainerResolver",
"CachedExplicitSingularityContainerResolver",
"FallbackContainerResolver",
"FallbackSingularityContainerResolver",
"FallbackNoRequirementsContainerResolver",
"FallbackNoRequirementsSingularityContainerResolver",
"MappingContainerResolver",
"RequiresGalaxyEnvironmentContainerResolver",
"RequiresGalaxyEnvironmentSingularityContainerResolver",
)