Source code for galaxy.tools.cache
import logging
import os
from threading import Lock
from typing import (
Optional,
TYPE_CHECKING,
Union,
)
from galaxy.util import unicodify
from galaxy.util.hash_util import md5_hash_file
if TYPE_CHECKING:
from galaxy.tools import Tool
from galaxy.util.path import StrPath
log = logging.getLogger(__name__)
[docs]
class ToolCache:
"""
Cache tool definitions to allow quickly reloading the whole
toolbox.
"""
[docs]
def __init__(self) -> None:
self._lock = Lock()
self._hash_by_tool_paths: dict[str, ToolHash] = {}
self._tools_by_path: dict[str, Tool] = {}
self._tool_paths_by_id: dict[str, StrPath] = {}
self._new_tool_ids: set[str] = set()
self._removed_tool_ids: set[str] = set()
self._removed_tools_by_path: dict[str, Tool] = {}
self._hashes_initialized = False
[docs]
def assert_hashes_initialized(self) -> None:
if not self._hashes_initialized:
for tool_hash in self._hash_by_tool_paths.values():
tool_hash.hash # noqa: B018
self._hashes_initialized = True
[docs]
def cleanup(self) -> list[str]:
"""
Remove uninstalled tools from tool cache if they are not on disk anymore or if their content has changed.
Returns list of tool_ids that have been removed.
"""
removed_tool_ids: list[str] = []
try:
with self._lock:
paths_to_cleanup = {
(path, tool) for path, tool in self._tools_by_path.items() if self._should_cleanup(path)
}
for config_filename, tool in paths_to_cleanup:
del self._hash_by_tool_paths[config_filename]
if os.path.exists(config_filename):
# This tool has probably been broken while editing on disk
# We record it here, so that we can recover it
self._removed_tools_by_path[config_filename] = self._tools_by_path[config_filename]
del self._tools_by_path[config_filename]
tool_ids = tool.all_ids
for tool_id in tool_ids:
if tool_id in self._tool_paths_by_id:
del self._tool_paths_by_id[tool_id]
removed_tool_ids.extend(tool_ids)
for tool_id in removed_tool_ids:
self._removed_tool_ids.add(tool_id)
if tool_id in self._new_tool_ids:
self._new_tool_ids.remove(tool_id)
except Exception as e:
log.debug("Exception while checking tools to remove from cache: %s", unicodify(e))
# If by chance the file is being removed while calculating the hash or modtime
# we don't want the thread to die.
if removed_tool_ids:
log.debug(f"Removed the following tools from cache: {removed_tool_ids}")
return removed_tool_ids
def _should_cleanup(self, config_filename: str) -> bool:
"""Return True if `config_filename` does not exist or if modtime and hash have changes, else return False."""
try:
new_mtime = os.path.getmtime(config_filename)
tool_hash = self._hash_by_tool_paths.get(config_filename)
if tool_hash and tool_hash.modtime < new_mtime:
if not tool_hash.hash == md5_hash_file(config_filename):
return True
else:
# No change of content, so not necessary to calculate the md5 checksum every time
tool_hash.modtime = new_mtime
tool = self._tools_by_path[config_filename]
for macro_path in tool._macro_paths:
new_mtime = os.path.getmtime(macro_path)
if (macro_hash := self._hash_by_tool_paths.get(str(macro_path))) and macro_hash.modtime < new_mtime:
return True
except FileNotFoundError:
return True
return False
[docs]
def get_tool(self, config_filename: "StrPath") -> Union["Tool", None]:
"""Get the tool at `config_filename` from the cache if the tool is up to date."""
return self._tools_by_path.get(str(config_filename))
[docs]
def get_removed_tool(self, config_filename: "StrPath") -> Union["Tool", None]:
return self._removed_tools_by_path.get(str(config_filename))
[docs]
def get_tool_by_id(self, tool_id: str) -> Union["Tool", None]:
"""Get the tool with the id `tool_id` from the cache if the tool is up to date."""
if tool_path := self._tool_paths_by_id.get(tool_id):
return self.get_tool(tool_path)
return None
[docs]
def expire_tool(self, tool_id: str) -> None:
with self._lock:
if tool_id in self._tool_paths_by_id:
config_filename = str(self._tool_paths_by_id[tool_id])
del self._hash_by_tool_paths[config_filename]
del self._tool_paths_by_id[tool_id]
del self._tools_by_path[config_filename]
if tool_id in self._new_tool_ids:
self._new_tool_ids.remove(tool_id)
[docs]
def cache_tool(self, config_filename: "StrPath", tool: "Tool") -> None:
tool_id = str(tool.id)
# We defer hashing of the config file if we haven't called assert_hashes_initialized.
# This allows startup to occur without having to read in and hash all tool and macro files
lazy_hash = not self._hashes_initialized
with self._lock:
self._hash_by_tool_paths[str(config_filename)] = ToolHash(config_filename, lazy_hash=lazy_hash)
self._tool_paths_by_id[tool_id] = config_filename
self._tools_by_path[str(config_filename)] = tool
self._new_tool_ids.add(tool_id)
for macro_path in tool._macro_paths:
self._hash_by_tool_paths[str(macro_path)] = ToolHash(macro_path, lazy_hash=lazy_hash)
[docs]
def reset_status(self) -> None:
"""
Reset tracking of new and newly disabled tools.
"""
with self._lock:
self._new_tool_ids = set()
self._removed_tool_ids = set()
self._removed_tools_by_path = {}
[docs]
class ToolHash:
[docs]
def __init__(self, path: "StrPath", modtime: Optional[float] = None, lazy_hash: bool = False) -> None:
self.path = path
self.modtime = modtime or os.path.getmtime(path)
self._tool_hash: Optional[str] = None
if not lazy_hash:
self.hash # noqa: B018
@property
def hash(self) -> Union[str, None]:
if self._tool_hash is None:
self._tool_hash = md5_hash_file(self.path)
return self._tool_hash