Source code for galaxy.tools.cache
import json
import logging
import os
import shutil
import sqlite3
import tempfile
import zlib
from threading import Lock
from typing import (
Dict,
Optional,
)
from sqlitedict import SqliteDict
from galaxy.util import unicodify
from galaxy.util.hash_util import md5_hash_file
log = logging.getLogger(__name__)
CURRENT_TOOL_CACHE_VERSION = 0
[docs]class ToolDocumentCache:
[docs] def __init__(self, cache_dir):
self.cache_dir = cache_dir
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
self.cache_file = os.path.join(self.cache_dir, "cache.sqlite")
self.writeable_cache_file = None
self._cache = None
self.disabled = False
self._get_cache(create_if_necessary=True)
def _get_cache(self, flag="r", create_if_necessary=False):
try:
if create_if_necessary and not os.path.exists(self.cache_file):
# Create database if necessary using 'c' flag
self._cache = SqliteDict(self.cache_file, flag="c", encode=encoder, decode=decoder, autocommit=False)
if flag == "r":
self._cache.flag = flag
else:
cache_file = self.writeable_cache_file.name if self.writeable_cache_file else self.cache_file
self._cache = SqliteDict(cache_file, flag=flag, encode=encoder, decode=decoder, autocommit=False)
except (sqlite3.OperationalError, RuntimeError):
log.warning("Tool document cache unavailable")
self._cache = None
self.disabled = True
@property
def cache_file_is_writeable(self):
return os.access(self.cache_file, os.W_OK)
[docs] def get(self, config_file):
try:
tool_document = self._cache.get(config_file)
except sqlite3.OperationalError:
log.debug("Tool document cache unavailable")
return None
if not tool_document:
return None
if tool_document.get("tool_cache_version") != CURRENT_TOOL_CACHE_VERSION:
return None
if self.cache_file_is_writeable:
for path, modtime in tool_document["paths_and_modtimes"].items():
if os.path.getmtime(path) != modtime:
return None
return tool_document
def _make_writable(self):
if not self.writeable_cache_file:
self.writeable_cache_file = tempfile.NamedTemporaryFile(
dir=self.cache_dir, suffix="cache.sqlite.tmp", delete=False
)
if os.path.exists(self.cache_file):
shutil.copy(self.cache_file, self.writeable_cache_file.name)
self._get_cache(flag="c")
[docs] def persist(self):
if self.writeable_cache_file:
self._cache.commit()
os.rename(self.writeable_cache_file.name, self.cache_file)
self.reopen_ro()
[docs] def set(self, config_file, tool_source):
try:
if self.cache_file_is_writeable:
self._make_writable()
to_persist = {
"document": tool_source.to_string(),
"macro_paths": tool_source.macro_paths,
"paths_and_modtimes": tool_source.paths_and_modtimes(),
"tool_cache_version": CURRENT_TOOL_CACHE_VERSION,
}
try:
self._cache[config_file] = to_persist
except RuntimeError:
log.debug("Tool document cache not writeable")
except sqlite3.OperationalError:
log.debug("Tool document cache unavailable")
[docs] def delete(self, config_file):
if self.cache_file_is_writeable:
self._make_writable()
try:
del self._cache[config_file]
except (KeyError, RuntimeError):
pass
def __del__(self):
if self.writeable_cache_file:
try:
os.unlink(self.writeable_cache_file.name)
except Exception:
pass
[docs]class ToolCache:
"""
Cache tool definitions to allow quickly reloading the whole
toolbox.
"""
[docs] def __init__(self):
self._lock = Lock()
self._hash_by_tool_paths: Dict[str, ToolHash] = {}
self._tools_by_path = {}
self._tool_paths_by_id = {}
self._macro_paths_by_id = {}
self._new_tool_ids = set()
self._removed_tool_ids = set()
self._removed_tools_by_path = {}
self._hashes_initialized = False
[docs] def assert_hashes_initialized(self):
if not self._hashes_initialized:
for tool_hash in self._hash_by_tool_paths.values():
tool_hash.hash # noqa: B018
self._hashes_initialized = True
[docs] def cleanup(self):
"""
Remove uninstalled tools from tool cache if they are not on disk anymore or if their content has changed.
Returns list of tool_ids that have been removed.
"""
removed_tool_ids = []
try:
with self._lock:
persist_tool_document_cache = False
paths_to_cleanup = {
(path, tool) for path, tool in self._tools_by_path.items() if self._should_cleanup(path)
}
for config_filename, tool in paths_to_cleanup:
tool.remove_from_cache()
persist_tool_document_cache = True
del self._hash_by_tool_paths[config_filename]
if os.path.exists(config_filename):
# This tool has probably been broken while editing on disk
# We record it here, so that we can recover it
self._removed_tools_by_path[config_filename] = self._tools_by_path[config_filename]
del self._tools_by_path[config_filename]
tool_ids = tool.all_ids
for tool_id in tool_ids:
if tool_id in self._tool_paths_by_id:
del self._tool_paths_by_id[tool_id]
removed_tool_ids.extend(tool_ids)
for tool_id in removed_tool_ids:
self._removed_tool_ids.add(tool_id)
if tool_id in self._new_tool_ids:
self._new_tool_ids.remove(tool_id)
if persist_tool_document_cache:
tool.app.toolbox.persist_cache()
except Exception as e:
log.debug("Exception while checking tools to remove from cache: %s", unicodify(e))
# If by chance the file is being removed while calculating the hash or modtime
# we don't want the thread to die.
if removed_tool_ids:
log.debug(f"Removed the following tools from cache: {removed_tool_ids}")
return removed_tool_ids
def _should_cleanup(self, config_filename):
"""Return True if `config_filename` does not exist or if modtime and hash have changes, else return False."""
try:
new_mtime = os.path.getmtime(config_filename)
tool_hash = self._hash_by_tool_paths.get(config_filename)
if tool_hash and tool_hash.modtime_less_than(new_mtime):
if not tool_hash.hash_equals(md5_hash_file(config_filename)):
return True
else:
# No change of content, so not necessary to calculate the md5 checksum every time
tool_hash.modtime = new_mtime
tool = self._tools_by_path[config_filename]
for macro_path in tool._macro_paths:
new_mtime = os.path.getmtime(macro_path)
if self._hash_by_tool_paths.get(macro_path).modtime < new_mtime:
return True
except FileNotFoundError:
return True
return False
[docs] def get_tool(self, config_filename):
"""Get the tool at `config_filename` from the cache if the tool is up to date."""
return self._tools_by_path.get(config_filename, None)
[docs] def get_removed_tool(self, config_filename):
return self._removed_tools_by_path.get(config_filename)
[docs] def get_tool_by_id(self, tool_id):
"""Get the tool with the id `tool_id` from the cache if the tool is up to date."""
return self.get_tool(self._tool_paths_by_id.get(tool_id))
[docs] def expire_tool(self, tool_id):
with self._lock:
if tool_id in self._tool_paths_by_id:
config_filename = self._tool_paths_by_id[tool_id]
del self._hash_by_tool_paths[config_filename]
del self._tool_paths_by_id[tool_id]
del self._tools_by_path[config_filename]
if tool_id in self._new_tool_ids:
self._new_tool_ids.remove(tool_id)
[docs] def cache_tool(self, config_filename, tool):
tool_id = str(tool.id)
# We defer hashing of the config file if we haven't called assert_hashes_initialized.
# This allows startup to occur without having to read in and hash all tool and macro files
lazy_hash = not self._hashes_initialized
with self._lock:
self._hash_by_tool_paths[config_filename] = ToolHash(config_filename, lazy_hash=lazy_hash)
self._tool_paths_by_id[tool_id] = config_filename
self._tools_by_path[config_filename] = tool
self._new_tool_ids.add(tool_id)
for macro_path in tool._macro_paths:
self._hash_by_tool_paths[macro_path] = ToolHash(macro_path, lazy_hash=lazy_hash)
if tool_id not in self._macro_paths_by_id:
self._macro_paths_by_id[tool_id] = {macro_path}
else:
self._macro_paths_by_id[tool_id].add(macro_path)
[docs] def reset_status(self):
"""
Reset tracking of new and newly disabled tools.
"""
with self._lock:
self._new_tool_ids = set()
self._removed_tool_ids = set()
self._removed_tools_by_path = {}
[docs]class ToolHash:
[docs] def __init__(self, path: str, modtime: Optional[float] = None, lazy_hash: bool = False):
self.path = path
self._modtime = modtime or os.path.getmtime(path)
self._tool_hash = None
if not lazy_hash:
self.hash # noqa: B018
@property
def modtime(self) -> float:
return self._modtime
@modtime.setter
def modtime(self, new_value: float):
self._modtime = new_value
@property
def hash(self):
if self._tool_hash is None:
self._tool_hash = md5_hash_file(self.path)
return self._tool_hash