Source code for galaxy.tool_util.toolbox.watcher

import logging
import os.path
import threading

try:
    from watchdog.events import FileSystemEventHandler
    from watchdog.observers import Observer
    from watchdog.observers.polling import PollingObserver

    can_watch = True
except ImportError:
    Observer = None
    FileSystemEventHandler = object
    PollingObserver = None
    can_watch = False

from galaxy.util.hash_util import md5_hash_file
from galaxy.util.watcher import (
    BaseWatcher,
    get_observer_class,
    NullWatcher,
)

log = logging.getLogger(__name__)


[docs]def get_tool_conf_watcher(reload_callback, tool_cache=None): return ToolConfWatcher(reload_callback=reload_callback, tool_cache=tool_cache)
[docs]def get_tool_watcher(toolbox, config): config_name = "watch_tools" config_value = getattr(config, config_name, None) observer_class = get_observer_class(config_name, config_value, default="False", monitor_what_str="tools") if observer_class is not None: return ToolWatcher(observer_class=observer_class, event_handler_class=ToolFileEventHandler, toolbox=toolbox) else: return NullWatcher()
[docs]class ToolFileEventHandler(FileSystemEventHandler):
[docs] def __init__(self, tool_watcher): self.tool_watcher = tool_watcher
[docs] def on_any_event(self, event): self._handle(event)
def _handle(self, event): # modified events will only have src path, move events will # have dest_path and src_path but we only care about dest. So # look at dest if it exists else use src. path = getattr(event, "dest_path", None) or event.src_path path = os.path.abspath(path) tool_id = self.tool_watcher.tool_file_ids.get(path, None) if tool_id: try: self.tool_watcher.toolbox.reload_tool_by_id(tool_id) except Exception: pass elif path.endswith(".xml"): directory = os.path.dirname(path) dir_callback = self.tool_watcher.tool_dir_callbacks.get(directory, None) if dir_callback: tool_file = event.src_path tool_id = dir_callback(tool_file) if tool_id: self.tool_watcher.tool_file_ids[tool_file] = tool_id
[docs]class ToolConfWatcher:
[docs] def __init__(self, reload_callback, tool_cache=None): self.paths = {} self.cache = tool_cache self._active = False self._lock = threading.Lock() self.thread = None self.reload_callback = reload_callback
[docs] def start(self): if not self._active: self._active = True if self.thread is None: self.exit = threading.Event() self.thread = threading.Thread(target=self.check) self.thread.daemon = True self.thread.start()
[docs] def shutdown(self): if self._active: self._active = False if self.thread.is_alive(): self.exit.set() self.thread.join() self.thread = None self.exit = None
[docs] def check(self): """Check for changes in self.paths or self.cache and call the event handler.""" hashes = {} if self.cache: self.cache.assert_hashes_initialized() while self._active and not self.exit.is_set(): do_reload = False drop_on_next_loop = set() drop_now = set() with self._lock: paths = list(self.paths.keys()) for path in paths: try: if not os.path.exists(path): continue mod_time = self.paths[path] if not hashes.get(path, None): hash = md5_hash_file(path) if hash: hashes[path] = md5_hash_file(path) else: continue new_mod_time = os.path.getmtime(path) # mod_time can be None if a non-required config was just created if not mod_time: self.paths[path] = new_mod_time log.debug("The file '%s' has been created.", path) do_reload = True elif new_mod_time > mod_time: new_hash = md5_hash_file(path) if hashes[path] != new_hash: self.paths[path] = new_mod_time hashes[path] = new_hash log.debug("The file '%s' has changes.", path) do_reload = True except OSError: # in rare cases `path` may be deleted between `os.path.exists` calls # and reading the file from the filesystem. We do not want the watcher # thread to die in these cases. if path in drop_now: log.warning("'%s' could not be read, removing from watched files", path) del self.paths[path] if path in hashes: del hashes[path] else: log.debug("'%s could not be read", path) drop_on_next_loop.add(path) if self.cache: self.cache.cleanup() do_reload = True if not do_reload and self.cache: removed_ids = self.cache.cleanup() if removed_ids: do_reload = True if do_reload: self.reload_callback() drop_now = drop_on_next_loop drop_on_next_loop = set() self.exit.wait(1)
[docs] def monitor(self, path): mod_time = None if os.path.exists(path): mod_time = os.path.getmtime(path) with self._lock: self.paths[path] = mod_time
[docs] def watch_file(self, tool_conf_file): self.monitor(tool_conf_file)
[docs]class ToolWatcher(BaseWatcher):
[docs] def __init__(self, observer_class, event_handler_class, toolbox): super().__init__(observer_class, event_handler_class) self.toolbox = toolbox self.tool_file_ids = {} self.tool_dir_callbacks = {}
[docs] def watch_file(self, tool_file, tool_id): tool_file = os.path.abspath(tool_file) self.tool_file_ids[tool_file] = tool_id tool_dir = os.path.dirname(tool_file) if tool_dir not in self.monitored_dirs: self.monitor(tool_dir)
[docs] def watch_directory(self, tool_dir, callback): tool_dir = os.path.abspath(tool_dir) self.tool_dir_callbacks[tool_dir] = callback if tool_dir not in self.monitored_dirs: self.monitor(tool_dir)