Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.config_watchers

import logging
from os.path import dirname

from galaxy.queue_worker import job_rule_modules
from galaxy.structured_app import StructuredApp
from galaxy.tool_util.toolbox.watcher import (
    get_tool_conf_watcher,
    get_tool_watcher,
)
from galaxy.util.watcher import get_watcher

log = logging.getLogger(__name__)


[docs]class ConfigWatchers: """Contains ToolConfWatcher, ToolWatcher and ToolDataWatcher objects."""
[docs] def __init__(self, app: StructuredApp): self.app = app self.active = False # ToolConfWatcher objects will watch the tool_cache if the tool_cache is passed into get_tool_conf_watcher. # Watching the tool_cache means removing outdated items from the tool_cache. # Only the reload_toolbox callback will re-populate the cache, so we pass the tool_cache only to the ToolConfWatcher that # watches regular tools. # If there are multiple ToolConfWatcher objects for the same handler or web process a race condition occurs between the two cache_cleanup functions. # If the reload_data_managers callback wins, the cache will miss the tools that had been removed from the cache # and will be blind to further changes in these tools. def reload_toolbox(): save_integrated_tool_panel = False try: # Run and wait for toolbox reload on the process that watches the config files. # The toolbox reload will update the integrated_tool_panel_file self.app.queue_worker.send_local_control_task("reload_toolbox", get_response=True), except Exception: save_integrated_tool_panel = True log.exception("Exception occured while reloading toolbox") self.app.queue_worker.send_control_task( "reload_toolbox", noop_self=True, kwargs={"save_integrated_tool_panel": save_integrated_tool_panel} ), self.tool_config_watcher = get_tool_conf_watcher( reload_callback=reload_toolbox, tool_cache=self.app.tool_cache, ) self.data_manager_config_watcher = get_tool_conf_watcher( reload_callback=lambda: self.app.queue_worker.send_control_task("reload_data_managers"), ) self.tool_data_watcher = get_watcher(self.app.config, "watch_tool_data_dir", monitor_what_str="data tables") self.tool_watcher = get_tool_watcher(self, app.config) if getattr(self.app, "is_job_handler", False): self.job_rule_watcher = get_watcher(app.config, "watch_job_rules", monitor_what_str="job rules") else: self.job_rule_watcher = get_watcher(app.config, "__invalid__") self.core_config_watcher = get_watcher(app.config, "watch_core_config", monitor_what_str="core config file") self.tour_watcher = get_watcher(app.config, "watch_tours", monitor_what_str="tours")
@property def watchers(self): return ( self.tool_watcher, self.tool_config_watcher, self.data_manager_config_watcher, self.tool_data_watcher, self.tool_watcher, self.job_rule_watcher, self.core_config_watcher, self.tour_watcher, )
[docs] def change_state(self, active): if active: self.start() elif self.active: self.shutdown()
[docs] def start(self): for watcher in self.watchers: watcher.start() [self.tool_config_watcher.watch_file(config) for config in self.tool_config_paths] [self.data_manager_config_watcher.watch_file(config) for config in self.data_manager_configs] for tool_data_path in self.tool_data_paths: self.tool_data_watcher.watch_directory( tool_data_path, callback=lambda path: self.app.queue_worker.send_control_task( "reload_tool_data_tables", kwargs={"path": path} ), require_extensions=(".loc",), recursive=True, ) for job_rules_directory in self.job_rules_paths: self.job_rule_watcher.watch_directory( job_rules_directory, callback=lambda: self.app.queue_worker.send_control_task("reload_job_rules"), recursive=True, ignore_extensions=(".pyc", ".pyo", ".pyd"), ) if self.app.config.config_file: self.core_config_watcher.watch_file( self.app.config.config_file, callback=lambda path: self.app.queue_worker.send_control_task("reload_core_config"), ) self.tour_watcher.watch_directory( self.app.config.tour_config_dir, callback=lambda path: self.app.queue_worker.send_control_task("reload_tour", kwargs={"path": path}), ) self.active = True
[docs] def shutdown(self): for watcher in self.watchers: watcher.shutdown() self.active = False
[docs] def update_watch_data_table_paths(self): if hasattr(self.tool_data_watcher, "monitored_dirs"): for tool_data_table_path in self.tool_data_paths: if tool_data_table_path not in self.tool_data_watcher.monitored_dirs: self.tool_data_watcher.watch_directory(tool_data_table_path)
@property def data_manager_configs(self): data_manager_configs = [] if hasattr(self.app.config, "data_manager_config_file"): data_manager_configs.append(self.app.config.data_manager_config_file) if hasattr(self.app.config, "shed_data_manager_config_file"): data_manager_configs.append(self.app.config.shed_data_manager_config_file) return data_manager_configs @property def tool_data_paths(self): tool_data_paths = [] if hasattr(self.app.config, "tool_data_path"): tool_data_paths.append(self.app.config.tool_data_path) if hasattr(self.app.config, "shed_tool_data_path"): tool_data_paths.append(self.app.config.shed_tool_data_path) return tool_data_paths @property def tool_config_paths(self): tool_config_paths = [] if hasattr(self.app.config, "tool_configs"): tool_config_paths = self.app.config.tool_configs return tool_config_paths @property def job_rules_paths(self): job_rules_paths = [] for rules_module in job_rule_modules(self.app): job_rules_dir = dirname(rules_module.__file__) job_rules_paths.append(job_rules_dir) return job_rules_paths