Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_util.deps.installable
"""Abstractions for installing local software managed and required by Galaxy/galaxy-lib."""
import abc
import logging
import os
from galaxy.util.filelock import (
FileLock,
FileLockException,
)
log = logging.getLogger(__name__)
[docs]class InstallableContext(metaclass=abc.ABCMeta):
"""Represent a directory/configuration of something that can be installed."""
[docs] @abc.abstractmethod
def is_installed(self):
"""Return bool indicating if the configured software is installed."""
@property
@abc.abstractmethod
def installable_description(self):
"""Short description of thing being installed for log statements."""
@property
@abc.abstractmethod
def parent_path(self):
"""Return parent path of the location the installable will be created within."""
[docs]def ensure_installed(installable_context, install_func, auto_init):
"""Make sure target is installed - handle multiple processes potentially attempting installation."""
parent_path = installable_context.parent_path
desc = installable_context.installable_description
def _check():
if not installable_context.is_installed():
if auto_init:
if installable_context.can_install():
if install_func(installable_context):
installed = False
log.warning(f"{desc} installation requested and failed.")
else:
installed = installable_context.is_installed()
if not installed:
log.warning(f"{desc} installation requested, seemed to succeed, but not found.")
else:
installed = False
else:
installed = False
log.warning("%s not installed and auto-installation disabled.", desc)
else:
installed = True
return installed
if not os.path.lexists(parent_path):
os.mkdir(parent_path)
try:
if auto_init and os.access(parent_path, os.W_OK):
with FileLock(os.path.join(parent_path, desc.lower()), timeout=300):
return _check()
else:
return _check()
except FileLockException:
raise Exception(f"Failed to get file lock for {os.path.join(parent_path, desc.lower())}")