Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tool_util.deps.installable

"""Abstractions for installing local software managed and required by Galaxy/galaxy-lib."""

import abc
import logging
import os

from galaxy.util.filelock import (
    FileLock,
    FileLockException,
)

log = logging.getLogger(__name__)


[docs]class InstallableContext(metaclass=abc.ABCMeta): """Represent a directory/configuration of something that can be installed."""
[docs] @abc.abstractmethod def is_installed(self): """Return bool indicating if the configured software is installed."""
[docs] @abc.abstractmethod def can_install(self): """Check preconditions for installation."""
@property @abc.abstractmethod def installable_description(self): """Short description of thing being installed for log statements.""" @property @abc.abstractmethod def parent_path(self): """Return parent path of the location the installable will be created within."""
[docs]def ensure_installed(installable_context, install_func, auto_init): """Make sure target is installed - handle multiple processes potentially attempting installation.""" parent_path = installable_context.parent_path desc = installable_context.installable_description def _check(): if not installable_context.is_installed(): if auto_init: if installable_context.can_install(): if install_func(installable_context): installed = False log.warning(f"{desc} installation requested and failed.") else: installed = installable_context.is_installed() if not installed: log.warning(f"{desc} installation requested, seemed to succeed, but not found.") else: installed = False else: installed = False log.warning("%s not installed and auto-installation disabled.", desc) else: installed = True return installed if not os.path.lexists(parent_path): os.mkdir(parent_path) try: if auto_init and os.access(parent_path, os.W_OK): with FileLock(os.path.join(parent_path, desc.lower()), timeout=300): return _check() else: return _check() except FileLockException: raise Exception(f"Failed to get file lock for {os.path.join(parent_path, desc.lower())}")