Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.deps.resolvers

"""The module defines the abstract interface for dealing tool dependency resolution plugins."""
import errno
import os.path
from abc import (
    ABCMeta,
    abstractmethod,
    abstractproperty,
)
from typing import Any, Dict

import yaml

from galaxy.util import listify
from galaxy.util.dictifiable import Dictifiable
from ..requirements import ToolRequirement


[docs]class DependencyResolver(Dictifiable, metaclass=ABCMeta): """Abstract description of a technique for resolving container images for tool execution.""" # Keys for dictification. dict_collection_visible_keys = ['resolver_type', 'resolves_simple_dependencies', 'can_uninstall_dependencies', 'read_only'] # A "simple" dependency is one that does not depend on the the tool # resolving the dependency. Classic tool shed dependencies are non-simple # because the repository install context is used in dependency resolution # so the same requirement tags in different tools will have very different # resolution. disabled = False resolves_simple_dependencies = True config_options: Dict[str, Any] = {} read_only = True
[docs] @abstractmethod def resolve(self, requirement, **kwds): """Given inputs describing dependency in the abstract yield a Dependency object. The Dependency object describes various attributes (script, bin, version) used to build scripts with the dependency availble. Here script is the env.sh file to source before running a job, if that is not found the bin directory will be appended to the path (if it is not ``None``). Finally, version is the resolved tool dependency version (which may differ from requested version for instance if the request version is 'default'.) """
[docs] def install_dependency(self, name, version, type, **kwds): if self.read_only: return False else: return self._install_dependency(name, version, type, **kwds)
def _install_dependency(self, name, version, type, **kwds): """ Attempt to install this dependency if a recipe to do so has been registered in some way. """ return False @property def can_uninstall_dependencies(self): return not self.read_only
[docs]class MultipleDependencyResolver: """Variant of DependencyResolver that can optionally resolve multiple dependencies together."""
[docs] @abstractmethod def resolve_all(self, requirements, **kwds): """ Given multiple requirements yields a list of Dependency objects if and only if they may all be resolved together. Unsuccessfull attempts should return an empty list. :param requirements: list of tool requirements :param type: [ToolRequirement] or ToolRequirements :returns: list of resolved dependencies :rtype: [Dependency] """
[docs]class ListableDependencyResolver(metaclass=ABCMeta): """ Mix this into a ``DependencyResolver`` and implement to indicate the dependency resolver can iterate over its dependencies and generate requirements. """
[docs] @abstractmethod def list_dependencies(self): """ List the "simple" requirements that may be resolved "exact"-ly by this dependency resolver. """
def _to_requirement(self, name, version=None): return ToolRequirement(name=name, type="package", version=version)
[docs]class MappableDependencyResolver: """Mix this into a ``DependencyResolver`` to allow mapping files. Mapping files allow adapting generic requirements to specific local implementations. """ def _setup_mapping(self, dependency_manager, **kwds): mapping_files = dependency_manager.get_resolver_option(self, "mapping_files", explicit_resolver_options=kwds) mappings = [] if mapping_files: search_dirs = [os.getcwd()] if isinstance(dependency_manager.default_base_path, str): search_dirs.append(dependency_manager.default_base_path) def candidates(path): if os.path.isabs(path): yield path else: for search_dir in search_dirs: yield os.path.join(search_dir, path) mapping_files = listify(mapping_files) for mapping_file in mapping_files: for full_path in candidates(mapping_file): if os.path.exists(full_path): mappings.extend(MappableDependencyResolver._mapping_file_to_list(full_path)) break self._mappings = mappings @staticmethod def _mapping_file_to_list(mapping_file): raw_mapping = [] try: with open(mapping_file) as f: raw_mapping = yaml.safe_load(f) except OSError as exc: if exc.errno != errno.ENOENT: raise return list(map(RequirementMapping.from_dict, raw_mapping)) def _expand_mappings(self, requirement): for mapping in self._mappings: if mapping.matches_requirement(requirement): requirement = mapping.apply(requirement) break return requirement
FROM_UNVERSIONED = object()
[docs]class RequirementMapping:
[docs] def __init__(self, from_name, from_version, to_name, to_version): self.from_name = from_name self.from_version = from_version self.to_name = to_name self.to_version = to_version
[docs] def matches_requirement(self, requirement): """Check if supplied ToolRequirement matches this mapping description. For it to match - the names must match. Additionally if the requirement is created with a version or with unversioned being set to True additional checks are needed. If a version is specified, it must match the supplied version exactly. If ``unversioned`` is True, then the supplied requirement must be unversioned (i.e. its version must be set to ``None``). """ if requirement.name != self.from_name: return False elif self.from_version is None: return True elif self.from_version is FROM_UNVERSIONED: return requirement.version is None else: return requirement.version == self.from_version
[docs] def apply(self, requirement): requirement = requirement.copy() requirement.name = self.to_name if self.to_version is not None: requirement.version = self.to_version return requirement
[docs] @staticmethod def from_dict(raw_mapping): from_raw = raw_mapping.get("from") if isinstance(from_raw, dict): from_name = from_raw.get("name") raw_version = from_raw.get("version", None) unversioned = from_raw.get("unversioned", False) if unversioned and raw_version: raise Exception("Cannot define both version and set unversioned to True.") if unversioned: from_version = FROM_UNVERSIONED else: from_version = str(raw_version) if raw_version is not None else raw_version else: from_name = from_raw from_version = None to_raw = raw_mapping.get("to") if isinstance(to_raw, dict): to_name = to_raw.get("name", from_name) to_version = str(to_raw.get("version")) else: to_name = to_raw to_version = None return RequirementMapping(from_name, from_version, to_name, to_version)
[docs]class SpecificationAwareDependencyResolver(metaclass=ABCMeta): """Mix this into a :class:`DependencyResolver` to implement URI specification matching. Allows adapting generic requirements to more specific URIs - to tailor name or version to specified resolution system. """ @abstractmethod def _expand_specs(self, requirement): """Find closest matching specification for discovered resolver and return new concrete requirement."""
[docs]class SpecificationPatternDependencyResolver(SpecificationAwareDependencyResolver): """Implement the :class:`SpecificationAwareDependencyResolver` with a regex pattern.""" @abstractproperty def _specification_pattern(self): """Pattern of URI to match against.""" def _find_specification(self, specs): pattern = self._specification_pattern for spec in specs: if pattern.match(spec.uri): return spec return None def _expand_specs(self, requirement): name = requirement.name version = requirement.version specs = requirement.specs spec = self._find_specification(specs) if spec is not None: name = spec.short_name version = spec.version or version requirement = requirement.copy() requirement.name = name requirement.version = version return requirement
[docs]class Dependency(Dictifiable, metaclass=ABCMeta): dict_collection_visible_keys = ['dependency_type', 'exact', 'name', 'version', 'cacheable'] cacheable = False
[docs] @abstractmethod def shell_commands(self): """ Return shell commands to enable this dependency. """
@abstractproperty def exact(self): """ Return true if version information wasn't discarded to resolve the dependency. """ @property def resolver_msg(self): """ Return a message describing this dependency """ return f"Using dependency {self.name} version {self.version} of type {self.dependency_type}"
[docs]class ContainerDependency(Dependency): dict_collection_visible_keys = Dependency.dict_collection_visible_keys + ['environment_path', 'container_description', 'container_resolver']
[docs] def __init__(self, container_description, name=None, version=None, container_resolver=None): self.container_description = container_description self.dependency_type = container_description.type self._name = name self._version = version self.environment_path = container_description.identifier self.container_resolver = container_resolver
@property def name(self): return self._name @property def version(self): return self._version @property def exact(self): return True @property def shell_commands(self): return None
[docs]class NullDependency(Dependency): dependency_type = None exact = True
[docs] def __init__(self, version=None, name=None): self.version = version self.name = name
@property def resolver_msg(self): """ Return a message describing this dependency """ return f"Dependency {self.name} not found."
[docs] def shell_commands(self): return None
[docs]class DependencyException(Exception): pass