Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.tools

import logging
from typing import Optional, TYPE_CHECKING, Union
from uuid import UUID

from sqlalchemy import sql

from galaxy import exceptions
from galaxy import model
from galaxy.exceptions import DuplicatedIdentifierException
from galaxy.tool_util.cwl import tool_proxy
from .base import ModelManager, raise_filter_err
from .executables import artifact_class

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from galaxy.managers.base import OrmFilterParsersType


[docs]class DynamicToolManager(ModelManager): """ Manages dynamic tools stored in Galaxy's database. """ model_class = model.DynamicTool
[docs] def get_tool_by_uuid(self, uuid: Optional[Union[UUID, str]]): dynamic_tool = self._one_or_none( self.query().filter(self.model_class.uuid == uuid) ) return dynamic_tool
[docs] def get_tool_by_tool_id(self, tool_id): dynamic_tool = self._one_or_none( self.query().filter(self.model_class.tool_id == tool_id) ) return dynamic_tool
[docs] def get_tool_by_id(self, object_id): dynamic_tool = self._one_or_none( self.query().filter(self.model_class.id == object_id) ) return dynamic_tool
[docs] def create_tool(self, trans, tool_payload, allow_load=True): if not getattr(self.app.config, "enable_beta_tool_formats", False): raise exceptions.ConfigDoesNotAllowException("Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools.") dynamic_tool = None uuid_str = tool_payload.get("uuid") # Convert uuid_str to UUID or generate new if None uuid = model.get_uuid(uuid_str) if uuid_str: # TODO: enforce via DB constraint and catch appropriate # exception. dynamic_tool = self.get_tool_by_uuid(uuid_str) if dynamic_tool: if not allow_load: raise DuplicatedIdentifierException(dynamic_tool.id) assert dynamic_tool.uuid == uuid if not dynamic_tool: src = tool_payload.get("src", "representation") is_path = src == "from_path" if is_path: tool_format, representation, _ = artifact_class(None, tool_payload) else: assert src == "representation" representation = tool_payload.get("representation") if not representation: raise exceptions.ObjectAttributeMissingException( "A tool 'representation' is required." ) tool_format = representation.get("class") if not tool_format: raise exceptions.ObjectAttributeMissingException( "Current tool representations require 'class'." ) tool_path = tool_payload.get("path") tool_directory = tool_payload.get("tool_directory") if tool_format == "GalaxyTool": tool_id = representation.get("id") if not tool_id: tool_id = str(uuid) elif tool_format in ("CommandLineTool", "ExpressionTool"): # CWL tools if is_path: proxy = tool_proxy(tool_path=tool_path, uuid=uuid) else: # Build a tool proxy so that we can convert to the persistable # hash. proxy = tool_proxy( tool_object=representation["raw_process_reference"], tool_directory=tool_directory, uuid=uuid, ) tool_id = proxy.galaxy_id() else: raise Exception(f"Unknown tool format [{tool_format}] encountered.") tool_version = representation.get("version") dynamic_tool = self.create( tool_format=tool_format, tool_id=tool_id, tool_version=tool_version, tool_path=tool_path, tool_directory=tool_directory, uuid=uuid, value=representation, ) self.app.toolbox.load_dynamic_tool(dynamic_tool) return dynamic_tool
[docs] def list_tools(self, active=True): return self.query().filter(self.model_class.active == active)
[docs] def deactivate(self, dynamic_tool): self.update(dynamic_tool, {"active": False}) return dynamic_tool
[docs]class ToolFilterMixin: orm_filter_parsers: "OrmFilterParsersType"
[docs] def create_tool_filter(self, attr, op, val): def _create_tool_filter(model_class=None): if op == 'eq': cond = model.Job.table.c.tool_id == val elif op == 'contains': cond = model.Job.table.c.tool_id.contains(val, autoescape=True) else: raise_filter_err(attr, op, val, 'bad op in filter') if model_class is model.HistoryDatasetAssociation: return sql.expression.and_( model.Job.table.c.id == model.JobToOutputDatasetAssociation.table.c.job_id, model.HistoryDatasetAssociation.table.c.id == model.JobToOutputDatasetAssociation.table.c.dataset_id, cond ) elif model_class is model.HistoryDatasetCollectionAssociation: return sql.expression.and_( model.Job.id == model.JobToOutputDatasetAssociation.job_id, model.JobToOutputDatasetAssociation.dataset_id == model.DatasetCollectionElement.hda_id, model.DatasetCollectionElement.dataset_collection_id == model.HistoryDatasetCollectionAssociation.collection_id, cond, ) else: return True return _create_tool_filter
def _add_parsers(self): self.orm_filter_parsers.update({ 'tool_id': self.create_tool_filter, })