Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.data_manager.manager

import errno
import logging
import os
from typing import (
    Dict,
    Optional,
)

from typing_extensions import Protocol

from galaxy import util
from galaxy.structured_app import StructuredApp
from galaxy.tool_util.data import (
    BundleProcessingOptions,
    OutputDataset,
)
from galaxy.tool_util.data.bundles.models import (
    convert_data_tables_xml,
    RepoInfo,
)
from galaxy.util import Element

log = logging.getLogger(__name__)


[docs]class DataManagers: data_managers: Dict[str, "DataManager"] managed_data_tables: Dict[str, "DataManager"]
[docs] def __init__(self, app: StructuredApp, xml_filename=None): self.app = app self.data_managers = {} self.managed_data_tables = {} self.tool_path = None self._reload_count = 0 self.filename = xml_filename or self.app.config.data_manager_config_file for filename in util.listify(self.filename): if not filename: continue self.load_from_xml(filename) if self.app.config.shed_data_manager_config_file: try: self.load_from_xml(self.app.config.shed_data_manager_config_file, store_tool_path=True) except OSError as exc: if exc.errno != errno.ENOENT or self.app.config.is_set("shed_data_manager_config_file"): raise
[docs] def load_from_xml(self, xml_filename, store_tool_path=True) -> None: try: tree = util.parse_xml(xml_filename) except OSError as e: if e.errno != errno.ENOENT or self.app.config.is_set("data_manager_config_file"): raise return # default config option and it doesn't exist, which is fine except Exception as e: log.error(f'There was an error parsing your Data Manager config file "{xml_filename}": {e}') return # we are not able to load any data managers root = tree.getroot() if root.tag != "data_managers": log.error( f'A data managers configuration must have a "data_managers" tag as the root. "{root.tag}" is present' ) return if store_tool_path: tool_path = root.get("tool_path", None) if tool_path is None: tool_path = self.app.config.tool_path if not tool_path: tool_path = "." self.tool_path = tool_path for data_manager_elem in root.findall("data_manager"): if not self.load_manager_from_elem(data_manager_elem, tool_path=self.tool_path): # Wasn't able to load manager, could happen when galaxy is managed by planemo. # Fall back to loading relative to the data_manager_conf.xml file tool_path = os.path.dirname(xml_filename) self.load_manager_from_elem(data_manager_elem, tool_path=tool_path)
[docs] def load_manager_from_elem(self, data_manager_elem, tool_path=None, add_manager=True) -> Optional["DataManager"]: try: data_manager = DataManager(self, data_manager_elem, tool_path=tool_path) except OSError as e: if e.errno == errno.ENOENT: # File does not exist return None except Exception: log.exception("Error loading data_manager") return None if add_manager: self.add_manager(data_manager) log.debug(f"Loaded Data Manager: {data_manager.id}") return data_manager
[docs] def add_manager(self, data_manager): if data_manager.id in self.data_managers: log.warning(f"A data manager has been defined twice: {data_manager.id} ") self.data_managers[data_manager.id] = data_manager for data_table_name in data_manager.data_table_names: if data_table_name not in self.managed_data_tables: self.managed_data_tables[data_table_name] = [] self.managed_data_tables[data_table_name].append(data_manager)
[docs] def get_manager(self, *args, **kwds): return self.data_managers.get(*args, **kwds)
[docs] def remove_manager(self, manager_ids): if not isinstance(manager_ids, list): manager_ids = [manager_ids] for manager_id in manager_ids: data_manager = self.get_manager(manager_id, None) if data_manager is not None: del self.data_managers[manager_id] # remove tool from toolbox if data_manager.tool: self.app.toolbox.remove_tool_by_id(data_manager.tool.id) # determine if any data_tables are no longer tracked for data_table_name in data_manager.data_table_names: remove_data_table_tracking = True for other_data_manager in self.data_managers.values(): if data_table_name in other_data_manager.data_table_names: remove_data_table_tracking = False break if remove_data_table_tracking and data_table_name in self.managed_data_tables: del self.managed_data_tables[data_table_name]
[docs]class Tool(Protocol): name: str description: str version: str
[docs]class DataManager: GUID_TYPE = "data_manager" DEFAULT_VERSION = "0.0.1" tool: Optional[Tool]
[docs] def __init__(self, data_managers: DataManagers, elem: Element = None, tool_path: Optional[str] = None): self.data_managers = data_managers self.declared_id = None self.name = None self.description = None self.version = self.DEFAULT_VERSION self.guid = None self.tool = None self.tool_shed_repository_info: Optional[RepoInfo] = None self.undeclared_tables = False if elem is not None: self._load_from_element(elem, tool_path or self.data_managers.tool_path)
def _load_from_element(self, elem: Element, tool_path: Optional[str]) -> None: assert ( elem.tag == "data_manager" ), f'A data manager configuration must have a "data_manager" tag as the root. "{elem.tag}" is present' self.declared_id = elem.get("id") self.guid = elem.get("guid") path = elem.get("tool_file") tool_shed_repository = None tool_guid = None if path is None: tool_elem = elem.find("tool") assert ( tool_elem is not None ), f"Error loading tool for data manager. Make sure that a tool_file attribute or a tool tag set has been defined:\n{util.xml_to_string(elem)}" path = tool_elem.get("file") tool_guid = tool_elem.get("guid") # need to determine repository info so that dependencies will work correctly tool_shed_repository = self.data_managers.app.toolbox.get_tool_repository_from_xml_item(tool_elem, path) self.tool_shed_repository_info = RepoInfo( tool_shed=tool_shed_repository.tool_shed, name=tool_shed_repository.name, owner=tool_shed_repository.owner, installed_changeset_revision=tool_shed_repository.installed_changeset_revision, ) # use shed_conf_file to determine tool_path shed_conf_file = elem.get("shed_conf_file") if shed_conf_file: shed_conf = self.data_managers.app.toolbox.get_shed_config_dict_by_filename(shed_conf_file) if shed_conf: tool_path = shed_conf.get("tool_path", tool_path) assert path is not None, f"A tool file path could not be determined:\n{util.xml_to_string(elem)}" assert tool_path, "A tool root path is required" self._load_tool( os.path.join(tool_path, path), guid=tool_guid, data_manager_id=self.id, tool_shed_repository=tool_shed_repository, ) assert self.tool self.name = elem.get("name", self.tool.name) self.description = elem.get("description", self.tool.description) self.version = elem.get("version", self.tool.version) self.processor_description = convert_data_tables_xml(elem) @property def id(self): return self.guid or self.declared_id # if we have a guid, we will use that as the data_manager id @property def data_table_names(self): return self.processor_description.data_table_names def _load_tool( self, tool_filename, guid=None, data_manager_id=None, tool_shed_repository_id=None, tool_shed_repository=None ): toolbox = self.data_managers.app.toolbox tool = toolbox.load_hidden_tool( tool_filename, guid=guid, data_manager_id=data_manager_id, repository_id=tool_shed_repository_id, tool_shed_repository=tool_shed_repository, use_cached=True, ) self.data_managers.app.toolbox.data_manager_tools[tool.id] = tool self.tool = tool return tool
[docs] def process_result(self, out_data: Dict[str, OutputDataset]) -> None: tool_data_tables = self.data_managers.app.tool_data_tables options = BundleProcessingOptions( what=f"data manager '{self.id}'", data_manager_path=self._data_manager_path, target_config_file=self.data_managers.filename, ) updated_data_tables = tool_data_tables.process_bundle( out_data, self.processor_description, self.repo_info, options, ) for data_table_name in updated_data_tables: self._reload(data_table_name)
[docs] def write_bundle( self, out_data: Dict[str, OutputDataset], ): tool_data_tables = self.data_managers.app.tool_data_tables return tool_data_tables.write_bundle( out_data, self.processor_description, self.repo_info, )
@property def _data_manager_path(self) -> str: return self.data_managers.app.config.galaxy_data_manager_data_path def _reload(self, data_table_name: str) -> None: self.data_managers.app.queue_worker.send_control_task( "reload_tool_data_tables", noop_self=True, kwargs={"table_name": data_table_name} ) @property def repo_info(self) -> Optional[RepoInfo]: return self.tool_shed_repository_info # legacy stuff because tool shed code calls this... # data manager manual integration test provides coverage
[docs] def get_tool_shed_repository_info_dict(self) -> Optional[dict]: repo_info = self.repo_info return repo_info.dict() if repo_info else None