Source code for galaxy.tool_util.deps.resolvers.lmod

"""
This is a prototype dependency resolver to be able to use the "LMOD environment modules system" from TACC to solve package requirements

LMOD official website: https://www.tacc.utexas.edu/research-development/tacc-projects/lmod

LMOD @ Github: https://github.com/TACC/Lmod

"""

import logging
from io import StringIO
from os import getenv
from os.path import exists
from subprocess import (
    PIPE,
    Popen,
)

from . import (
    Dependency,
    DependencyResolver,
    MappableDependencyResolver,
    NullDependency,
)

log = logging.getLogger(__name__)

DEFAULT_LMOD_PATH = getenv("LMOD_CMD")
DEFAULT_SETTARG_PATH = getenv("LMOD_SETTARG_CMD")
DEFAULT_MODULEPATH = getenv("MODULEPATH")
DEFAULT_MAPPING_FILE = "config/lmod_modules_mapping.yml"
INVALID_LMOD_PATH_MSG = "The following LMOD executable could not be found: %s. Either your LMOD Dependency Resolver is misconfigured or LMOD is improperly installed on your system !"
EMPTY_MODULEPATH_MSG = "No valid LMOD MODULEPATH defined ! Either your LMOD Dependency Resolver is misconfigured or LMOD is improperly installed on your system !"


[docs]class LmodDependencyResolver(DependencyResolver, MappableDependencyResolver): """Dependency resolver based on the LMOD environment modules system""" dict_collection_visible_keys = DependencyResolver.dict_collection_visible_keys + ["base_path", "modulepath"] resolver_type = "lmod"
[docs] def __init__(self, dependency_manager, **kwds): # Mapping file management self._set_default_mapping_file(kwds) self._setup_mapping(dependency_manager, **kwds) # Other attributes self.versionless = _string_as_bool(kwds.get("versionless", "false")) self.lmodexec = kwds.get("lmodexec", DEFAULT_LMOD_PATH) self.settargexec = kwds.get("settargexec", DEFAULT_SETTARG_PATH) self.modulepath = kwds.get("modulepath", DEFAULT_MODULEPATH) self.module_checker = AvailModuleChecker(self, self.modulepath)
def _set_default_mapping_file(self, resolver_attributes): if "mapping_files" not in resolver_attributes: if exists(DEFAULT_MAPPING_FILE): resolver_attributes["mapping_files"] = DEFAULT_MAPPING_FILE
[docs] def resolve(self, requirement, **kwds): requirement = self._expand_mappings(requirement) name, version, type = requirement.name, requirement.version, requirement.type if type != "package": return NullDependency(version=version, name=name) if self.__has_module(name, version): return LmodDependency(self, name, version, exact=True, dependency_resolver=self) elif self.versionless and self.__has_module(name, None): return LmodDependency(self, name, None, exact=False, dependency_resolver=self) return NullDependency(version=version, name=name)
def __has_module(self, name, version): return self.module_checker.has_module(name, version)
class AvailModuleChecker: """Parses the output of Lmod 'module avail' command to get the list of available modules.""" def __init__(self, lmod_dependency_resolver, modulepath): self.lmod_dependency_resolver = lmod_dependency_resolver self.modulepath = modulepath def has_module(self, module, version): # When version is None (No specific version required by the wrapper -or- versionless is set to 'true'), we only get the list of default modules # We get the full list of modules otherwise if version is None: available_modules = self.__get_list_of_available_modules(True) else: available_modules = self.__get_list_of_available_modules(False) # Is the required module in the list of avaialable modules ? for module_name, module_version in available_modules: names_match = module == module_name module_match = names_match and (version is None or module_version == version) if module_match: return True return False def __get_list_of_available_modules(self, default_version_only=False): # Get the results of the "module avail" command in an easy to parse format # Note that since "module" is actually a bash function, we are directy executing the underlying executable instead raw_output = self.__get_module_avail_command_output(default_version_only).decode("utf-8") # Parse the result for line in StringIO(raw_output): # Clean line and discard non-module lines line = line and line.strip() if not line or line.startswith("/"): continue # Split module lines by / to separate the module name from the module version # Module without version are discarded module_parts = line.split("/") if len(module_parts) == 2: yield module_parts[0], module_parts[1] def __get_module_avail_command_output(self, default_version_only=False): # Check if the LMOD executable is available (ie. if both LMOD and the lmod dependency resolver are both setup properly) lmodexec = self.lmod_dependency_resolver.lmodexec if not exists(lmodexec): raise Exception(INVALID_LMOD_PATH_MSG % lmodexec) # Check if the MODULEPATH environment if self.modulepath == "" or self.modulepath is None: raise Exception(EMPTY_MODULEPATH_MSG) # Build command line if default_version_only: module_avail_command = [lmodexec, "-t", "-d", "avail"] else: module_avail_command = [lmodexec, "-t", "avail"] # The list of avaialable modules is actually printed on stderr and not stdout for module commands return Popen( module_avail_command, stdout=PIPE, stderr=PIPE, env={"MODULEPATH": self.modulepath}, close_fds=True ).communicate()[1] class LmodDependency(Dependency): """Prepare the commands required to solve the dependency and add them to the script used to run a tool in Galaxy.""" dict_collection_visible_keys = Dependency.dict_collection_visible_keys + [ "module_name", "module_version", "dependency_resolver", ] dependency_type = "lmod" def __init__( self, lmod_dependency_resolver, module_name, module_version=None, exact=True, dependency_resolver=None ): self.lmod_dependency_resolver = lmod_dependency_resolver self.module_name = module_name self.module_version = module_version self._exact = exact self.dependency_resolver = dependency_resolver @property def name(self): return self.module_name @property def version(self): return self.module_version @property def exact(self): return self._exact def shell_commands(self): # Get the full module name in the form "tool_name/tool_version" module_to_load = self.module_name if self.module_version: module_to_load = f"{self.module_name}/{self.module_version}" # Build the list of command to add to run script # Note that since "module" is actually a bash function, we are directy executing the underlying executable instead # - Set the MODULEPATH environment variable command = f"MODULEPATH={self.lmod_dependency_resolver.modulepath}; " command += "export MODULEPATH; " # - Execute the "module load" command (or rather the "/path/to/lmod load" command) command += f"eval `{self.lmod_dependency_resolver.lmodexec} load {module_to_load}` " # - Execute the "settarg" command in addition if needed if self.lmod_dependency_resolver.settargexec is not None: command += f"&& eval `{self.lmod_dependency_resolver.settargexec} -s sh`" return command def _string_as_bool(value): return str(value).lower() == "true" __all__ = ("LmodDependencyResolver",)