Source code for galaxy.managers.tools

import logging
from typing import (
    Optional,
    TYPE_CHECKING,
    Union,
)
from uuid import UUID

from sqlalchemy import (
    select,
    sql,
)

from galaxy import (
    exceptions,
    model,
)
from galaxy.exceptions import DuplicatedIdentifierException
from galaxy.model import DynamicTool
from galaxy.tool_util.cwl import tool_proxy
from .base import (
    ModelManager,
    raise_filter_err,
)
from .executables import artifact_class

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from galaxy.managers.base import OrmFilterParsersType


[docs]class DynamicToolManager(ModelManager): """Manages dynamic tools stored in Galaxy's database.""" model_class = model.DynamicTool
[docs] def get_tool_by_uuid(self, uuid: Optional[Union[UUID, str]]): stmt = select(DynamicTool).where(DynamicTool.uuid == uuid) return self.session().scalars(stmt).one_or_none()
[docs] def get_tool_by_tool_id(self, tool_id): stmt = select(DynamicTool).where(DynamicTool.tool_id == tool_id) return self.session().scalars(stmt).one_or_none()
[docs] def get_tool_by_id(self, object_id): stmt = select(DynamicTool).where(DynamicTool.id == object_id) return self.session().scalars(stmt).one_or_none()
[docs] def create_tool(self, trans, tool_payload, allow_load=True): if not getattr(self.app.config, "enable_beta_tool_formats", False): raise exceptions.ConfigDoesNotAllowException( "Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools." ) dynamic_tool = None uuid_str = tool_payload.get("uuid") # Convert uuid_str to UUID or generate new if None uuid = model.get_uuid(uuid_str) if uuid_str: # TODO: enforce via DB constraint and catch appropriate # exception. dynamic_tool = self.get_tool_by_uuid(uuid_str) if dynamic_tool: if not allow_load: raise DuplicatedIdentifierException(dynamic_tool.id) assert dynamic_tool.uuid == uuid if not dynamic_tool: src = tool_payload.get("src", "representation") is_path = src == "from_path" if is_path: tool_format, representation, _ = artifact_class(None, tool_payload) else: assert src == "representation" representation = tool_payload.get("representation") if not representation: raise exceptions.ObjectAttributeMissingException("A tool 'representation' is required.") tool_format = representation.get("class") if not tool_format: raise exceptions.ObjectAttributeMissingException("Current tool representations require 'class'.") tool_path = tool_payload.get("path") tool_directory = tool_payload.get("tool_directory") if tool_format == "GalaxyTool": tool_id = representation.get("id") if not tool_id: tool_id = str(uuid) elif tool_format in ("CommandLineTool", "ExpressionTool"): # CWL tools if is_path: proxy = tool_proxy(tool_path=tool_path, uuid=uuid) else: # Build a tool proxy so that we can convert to the persistable # hash. proxy = tool_proxy( tool_object=representation["raw_process_reference"], tool_directory=tool_directory, uuid=uuid, ) tool_id = proxy.galaxy_id() else: raise Exception(f"Unknown tool format [{tool_format}] encountered.") tool_version = representation.get("version") dynamic_tool = self.create( tool_format=tool_format, tool_id=tool_id, tool_version=tool_version, tool_path=tool_path, tool_directory=tool_directory, uuid=uuid, value=representation, ) self.app.toolbox.load_dynamic_tool(dynamic_tool) return dynamic_tool
[docs] def list_tools(self, active=True): stmt = select(DynamicTool).where(DynamicTool.active == active) return self.session().scalars(stmt)
[docs] def deactivate(self, dynamic_tool): self.update(dynamic_tool, {"active": False}) return dynamic_tool
[docs]class ToolFilterMixin: orm_filter_parsers: "OrmFilterParsersType"
[docs] def create_tool_filter(self, attr, op, val): def _create_tool_filter(model_class=None): if op == "eq": cond = model.Job.table.c.tool_id == val elif op == "contains": cond = model.Job.table.c.tool_id.contains(val, autoescape=True) else: raise_filter_err(attr, op, val, "bad op in filter") if model_class is model.HistoryDatasetAssociation: return sql.expression.and_( model.Job.table.c.id == model.JobToOutputDatasetAssociation.table.c.job_id, model.HistoryDatasetAssociation.table.c.id == model.JobToOutputDatasetAssociation.table.c.dataset_id, cond, ) elif model_class is model.HistoryDatasetCollectionAssociation: return sql.expression.and_( model.Job.id == model.JobToOutputDatasetAssociation.job_id, model.JobToOutputDatasetAssociation.dataset_id == model.DatasetCollectionElement.hda_id, model.DatasetCollectionElement.dataset_collection_id == model.HistoryDatasetCollectionAssociation.collection_id, cond, ) else: return True return _create_tool_filter
def _add_parsers(self): self.orm_filter_parsers.update( { "tool_id": self.create_tool_filter, } )