Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_manager.manager
import errno
import logging
import os
from typing import (
Dict,
List,
Optional,
Union,
)
from typing_extensions import Protocol
from galaxy import util
from galaxy.structured_app import StructuredApp
from galaxy.tool_shed.galaxy_install.client import DataManagersInterface
from galaxy.tool_util.data import (
BundleProcessingOptions,
OutputDataset,
)
from galaxy.tool_util.data.bundles.models import (
convert_data_tables_xml,
RepoInfo,
)
from galaxy.util import Element
log = logging.getLogger(__name__)
[docs]class DataManagers(DataManagersInterface):
data_managers: Dict[str, "DataManager"]
managed_data_tables: Dict[str, "DataManager"]
__reload_count: int
[docs] def __init__(self, app: StructuredApp, xml_filename=None, reload_count: Optional[int] = None):
self.app = app
self.data_managers = {}
self.managed_data_tables = {}
self.tool_path: Optional[str] = None
self.__reload_count = reload_count or 0
self.filename = xml_filename or self.app.config.data_manager_config_file
for filename in util.listify(self.filename):
if not filename:
continue
self.load_from_xml(filename)
if self.app.config.shed_data_manager_config_file:
try:
self.load_from_xml(self.app.config.shed_data_manager_config_file, store_tool_path=True)
except OSError as exc:
if exc.errno != errno.ENOENT or self.app.config.is_set("shed_data_manager_config_file"):
raise
@property
def _reload_count(self) -> int:
return self.__reload_count
[docs] def load_from_xml(self, xml_filename, store_tool_path=True) -> None:
try:
tree = util.parse_xml(xml_filename)
except OSError as e:
if e.errno != errno.ENOENT or self.app.config.is_set("data_manager_config_file"):
raise
return # default config option and it doesn't exist, which is fine
except Exception as e:
log.error(f'There was an error parsing your Data Manager config file "{xml_filename}": {e}')
return # we are not able to load any data managers
root = tree.getroot()
if root.tag != "data_managers":
log.error(
f'A data managers configuration must have a "data_managers" tag as the root. "{root.tag}" is present'
)
return
if store_tool_path:
tool_path = root.get("tool_path", None)
if tool_path is None:
tool_path = self.app.config.tool_path
if not tool_path:
tool_path = "."
self.tool_path = tool_path
for data_manager_elem in root.findall("data_manager"):
if not self.load_manager_from_elem(data_manager_elem, tool_path=self.tool_path):
# Wasn't able to load manager, could happen when galaxy is managed by planemo.
# Fall back to loading relative to the data_manager_conf.xml file
tool_path = os.path.dirname(xml_filename)
self.load_manager_from_elem(data_manager_elem, tool_path=tool_path)
[docs] def load_manager_from_elem(self, data_manager_elem, tool_path=None, add_manager=True) -> Optional["DataManager"]:
try:
data_manager = DataManager(self, data_manager_elem, tool_path=tool_path)
except OSError as e:
if e.errno == errno.ENOENT:
# File does not exist
return None
except Exception:
log.exception("Error loading data_manager")
return None
if add_manager:
self.add_manager(data_manager)
log.debug(f"Loaded Data Manager: {data_manager.id}")
return data_manager
[docs] def add_manager(self, data_manager):
if data_manager.id in self.data_managers:
log.warning(f"A data manager has been defined twice: {data_manager.id} ")
self.data_managers[data_manager.id] = data_manager
for data_table_name in data_manager.data_table_names:
if data_table_name not in self.managed_data_tables:
self.managed_data_tables[data_table_name] = []
self.managed_data_tables[data_table_name].append(data_manager)
[docs] def remove_manager(self, manager_ids: Union[str, List[str]]) -> None:
if not isinstance(manager_ids, list):
manager_ids = [manager_ids]
for manager_id in manager_ids:
data_manager = self.get_manager(manager_id, None)
if data_manager is not None:
del self.data_managers[manager_id]
# remove tool from toolbox
if data_manager.tool:
self.app.toolbox.remove_tool_by_id(data_manager.tool.id)
# determine if any data_tables are no longer tracked
for data_table_name in data_manager.data_table_names:
remove_data_table_tracking = True
for other_data_manager in self.data_managers.values():
if data_table_name in other_data_manager.data_table_names:
remove_data_table_tracking = False
break
if remove_data_table_tracking and data_table_name in self.managed_data_tables:
del self.managed_data_tables[data_table_name]
[docs]class DataManager:
GUID_TYPE = "data_manager"
DEFAULT_VERSION = "0.0.1"
tool: Optional[Tool]
[docs] def __init__(self, data_managers: DataManagers, elem: Optional[Element] = None, tool_path: Optional[str] = None):
self.data_managers = data_managers
self.declared_id: Optional[str] = None
self.name: Optional[str] = None
self.description: Optional[str] = None
self.version = self.DEFAULT_VERSION
self.guid: Optional[str] = None
self.tool = None
self.tool_shed_repository_info: Optional[RepoInfo] = None
self.undeclared_tables = False
if elem is not None:
self._load_from_element(elem, tool_path or self.data_managers.tool_path)
def _load_from_element(self, elem: Element, tool_path: Optional[str]) -> None:
assert (
elem.tag == "data_manager"
), f'A data manager configuration must have a "data_manager" tag as the root. "{elem.tag}" is present'
self.declared_id = elem.get("id")
self.guid = elem.get("guid")
path = elem.get("tool_file")
tool_shed_repository = None
tool_guid = None
if path is None:
tool_elem = elem.find("tool")
assert (
tool_elem is not None
), f"Error loading tool for data manager. Make sure that a tool_file attribute or a tool tag set has been defined:\n{util.xml_to_string(elem)}"
path = tool_elem.get("file")
tool_guid = tool_elem.get("guid")
# need to determine repository info so that dependencies will work correctly
tool_shed_repository = self.data_managers.app.toolbox.get_tool_repository_from_xml_item(tool_elem, path)
self.tool_shed_repository_info = RepoInfo(
tool_shed=tool_shed_repository.tool_shed,
name=tool_shed_repository.name,
owner=tool_shed_repository.owner,
installed_changeset_revision=tool_shed_repository.installed_changeset_revision,
)
# use shed_conf_file to determine tool_path
shed_conf_file = elem.get("shed_conf_file")
if shed_conf_file:
shed_conf = self.data_managers.app.toolbox.get_shed_config_dict_by_filename(shed_conf_file)
if shed_conf:
tool_path = shed_conf.get("tool_path", tool_path)
assert path is not None, f"A tool file path could not be determined:\n{util.xml_to_string(elem)}"
assert tool_path, "A tool root path is required"
self._load_tool(
os.path.join(tool_path, path),
guid=tool_guid,
data_manager_id=self.id,
tool_shed_repository=tool_shed_repository,
)
assert self.tool
self.name = elem.get("name", self.tool.name)
self.description = elem.get("description", self.tool.description)
self.version = elem.get("version", self.tool.version)
self.processor_description = convert_data_tables_xml(elem)
@property
def id(self):
return self.guid or self.declared_id # if we have a guid, we will use that as the data_manager id
@property
def data_table_names(self):
return self.processor_description.data_table_names
def _load_tool(
self, tool_filename, guid=None, data_manager_id=None, tool_shed_repository_id=None, tool_shed_repository=None
):
toolbox = self.data_managers.app.toolbox
tool = toolbox.load_hidden_tool(
tool_filename,
guid=guid,
data_manager_id=data_manager_id,
repository_id=tool_shed_repository_id,
tool_shed_repository=tool_shed_repository,
use_cached=True,
)
self.data_managers.app.toolbox.data_manager_tools[tool.id] = tool
self.tool = tool
return tool
[docs] def process_result(self, out_data: Dict[str, OutputDataset]) -> None:
tool_data_tables = self.data_managers.app.tool_data_tables
options = BundleProcessingOptions(
what=f"data manager '{self.id}'",
data_manager_path=self._data_manager_path,
target_config_file=self.data_managers.filename,
)
updated_data_tables = tool_data_tables.process_bundle(
out_data,
self.processor_description,
self.repo_info,
options,
)
for data_table_name in updated_data_tables:
self._reload(data_table_name)
[docs] def write_bundle(
self,
out_data: Dict[str, OutputDataset],
) -> Dict[str, OutputDataset]:
tool_data_tables = self.data_managers.app.tool_data_tables
return tool_data_tables.write_bundle(
out_data,
self.processor_description,
self.repo_info,
)
@property
def _data_manager_path(self) -> str:
return self.data_managers.app.config.galaxy_data_manager_data_path
def _reload(self, data_table_name: str) -> None:
self.data_managers.app.queue_worker.send_control_task(
"reload_tool_data_tables", noop_self=True, kwargs={"table_name": data_table_name}
)
@property
def repo_info(self) -> Optional[RepoInfo]:
return self.tool_shed_repository_info
# legacy stuff because tool shed code calls this...
# data manager manual integration test provides coverage
[docs] def get_tool_shed_repository_info_dict(self) -> Optional[dict]:
repo_info = self.repo_info
return repo_info.model_dump(mode="json") if repo_info else None