Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.managers.tools
import logging
from typing import Optional, TYPE_CHECKING, Union
from uuid import UUID
from sqlalchemy import sql
from galaxy import exceptions
from galaxy import model
from galaxy.exceptions import DuplicatedIdentifierException
from galaxy.tool_util.cwl import tool_proxy
from .base import ModelManager, raise_filter_err
from .executables import artifact_class
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from galaxy.managers.base import OrmFilterParsersType
[docs]class DynamicToolManager(ModelManager):
""" Manages dynamic tools stored in Galaxy's database.
"""
model_class = model.DynamicTool
[docs] def get_tool_by_uuid(self, uuid: Optional[Union[UUID, str]]):
dynamic_tool = self._one_or_none(
self.query().filter(self.model_class.uuid == uuid)
)
return dynamic_tool
[docs] def get_tool_by_tool_id(self, tool_id):
dynamic_tool = self._one_or_none(
self.query().filter(self.model_class.tool_id == tool_id)
)
return dynamic_tool
[docs] def get_tool_by_id(self, object_id):
dynamic_tool = self._one_or_none(
self.query().filter(self.model_class.id == object_id)
)
return dynamic_tool
[docs] def create_tool(self, trans, tool_payload, allow_load=True):
if not getattr(self.app.config, "enable_beta_tool_formats", False):
raise exceptions.ConfigDoesNotAllowException("Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools.")
dynamic_tool = None
uuid_str = tool_payload.get("uuid")
# Convert uuid_str to UUID or generate new if None
uuid = model.get_uuid(uuid_str)
if uuid_str:
# TODO: enforce via DB constraint and catch appropriate
# exception.
dynamic_tool = self.get_tool_by_uuid(uuid_str)
if dynamic_tool:
if not allow_load:
raise DuplicatedIdentifierException(dynamic_tool.id)
assert dynamic_tool.uuid == uuid
if not dynamic_tool:
src = tool_payload.get("src", "representation")
is_path = src == "from_path"
if is_path:
tool_format, representation, _ = artifact_class(None, tool_payload)
else:
assert src == "representation"
representation = tool_payload.get("representation")
if not representation:
raise exceptions.ObjectAttributeMissingException(
"A tool 'representation' is required."
)
tool_format = representation.get("class")
if not tool_format:
raise exceptions.ObjectAttributeMissingException(
"Current tool representations require 'class'."
)
tool_path = tool_payload.get("path")
tool_directory = tool_payload.get("tool_directory")
if tool_format == "GalaxyTool":
tool_id = representation.get("id")
if not tool_id:
tool_id = str(uuid)
elif tool_format in ("CommandLineTool", "ExpressionTool"):
# CWL tools
if is_path:
proxy = tool_proxy(tool_path=tool_path, uuid=uuid)
else:
# Build a tool proxy so that we can convert to the persistable
# hash.
proxy = tool_proxy(
tool_object=representation["raw_process_reference"],
tool_directory=tool_directory,
uuid=uuid,
)
tool_id = proxy.galaxy_id()
else:
raise Exception(f"Unknown tool format [{tool_format}] encountered.")
tool_version = representation.get("version")
dynamic_tool = self.create(
tool_format=tool_format,
tool_id=tool_id,
tool_version=tool_version,
tool_path=tool_path,
tool_directory=tool_directory,
uuid=uuid,
value=representation,
)
self.app.toolbox.load_dynamic_tool(dynamic_tool)
return dynamic_tool
[docs] def list_tools(self, active=True):
return self.query().filter(self.model_class.active == active)
[docs] def deactivate(self, dynamic_tool):
self.update(dynamic_tool, {"active": False})
return dynamic_tool
[docs]class ToolFilterMixin:
orm_filter_parsers: "OrmFilterParsersType"
[docs] def create_tool_filter(self, attr, op, val):
def _create_tool_filter(model_class=None):
if op == 'eq':
cond = model.Job.table.c.tool_id == val
elif op == 'contains':
cond = model.Job.table.c.tool_id.contains(val, autoescape=True)
else:
raise_filter_err(attr, op, val, 'bad op in filter')
if model_class is model.HistoryDatasetAssociation:
return sql.expression.and_(
model.Job.table.c.id == model.JobToOutputDatasetAssociation.table.c.job_id,
model.HistoryDatasetAssociation.table.c.id == model.JobToOutputDatasetAssociation.table.c.dataset_id,
cond
)
elif model_class is model.HistoryDatasetCollectionAssociation:
return sql.expression.and_(
model.Job.id == model.JobToOutputDatasetAssociation.job_id,
model.JobToOutputDatasetAssociation.dataset_id == model.DatasetCollectionElement.hda_id,
model.DatasetCollectionElement.dataset_collection_id == model.HistoryDatasetCollectionAssociation.collection_id,
cond,
)
else:
return True
return _create_tool_filter
def _add_parsers(self):
self.orm_filter_parsers.update({
'tool_id': self.create_tool_filter,
})