Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.managers.tools

import logging
from typing import (
    Any,
    Dict,
    Optional,
    TYPE_CHECKING,
    Union,
)
from uuid import UUID

from sqlalchemy import (
    exists,
    false,
    select,
    sql,
    true,
    update,
)

from galaxy import (
    exceptions,
    model,
)
from galaxy.exceptions import DuplicatedIdentifierException
from galaxy.model import (
    DynamicTool,
    UserDynamicToolAssociation,
)
from galaxy.tool_util.cwl import tool_proxy
from galaxy.tool_util.parser.yaml import YamlToolSource
from galaxy.tool_util_models.dynamic_tool_models import (
    DynamicToolPayload,
    DynamicUnprivilegedToolCreatePayload,
)
from galaxy.tools import (
    create_tool_from_source,
    Tool,
)
from .base import (
    ModelManager,
    raise_filter_err,
)
from .executables import artifact_class

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from galaxy.managers.base import OrmFilterParsersType
    from galaxy.managers.context import ProvidesUserContext


[docs] def tool_payload_to_tool(app, tool_dict: Dict[str, Any]) -> Optional[Tool]: tool_source = YamlToolSource(tool_dict) tool = create_tool_from_source(app, tool_source=tool_source, tool_dir=None) return tool
[docs] class DynamicToolManager(ModelManager[model.DynamicTool]): """Manages dynamic tools stored in Galaxy's database.""" model_class = model.DynamicTool
[docs] def ensure_can_use_unprivileged_tool(self, user: model.User): stmt = select( exists().where( model.UserRoleAssociation.user_id == user.id, model.UserRoleAssociation.role_id == model.Role.id, model.Role.type == model.Role.types.USER_TOOL_EXECUTE, model.Role.deleted == false(), ) ) if not self.session().execute(stmt).scalar(): raise exceptions.InsufficientPermissionsException("User is not allowed to run unprivileged tools")
[docs] def get_tool_by_id_or_uuid(self, id_or_uuid: Union[int, str]): if isinstance(id_or_uuid, int): return self.get_tool_by_id(id_or_uuid) else: return self.get_tool_by_uuid(id_or_uuid)
[docs] def get_tool_by_uuid(self, uuid: Optional[Union[UUID, str]]): stmt = select(DynamicTool).where(DynamicTool.uuid == uuid) return self.session().scalars(stmt).one_or_none()
[docs] def get_tool_by_tool_id(self, tool_id): stmt = select(DynamicTool).where(DynamicTool.tool_id == tool_id, DynamicTool.public == true()) return self.session().scalars(stmt).one_or_none()
[docs] def get_unprivileged_tool_by_uuid(self, user: model.User, uuid: Union[UUID, str]): stmt = self.owned_unprivileged_statement(user).where(DynamicTool.uuid == uuid) return self.session().scalars(stmt).one_or_none()
[docs] def get_unprivileged_tool_by_tool_id(self, user: model.User, tool_id: str): stmt = self.owned_unprivileged_statement(user).where(DynamicTool.tool_id == tool_id) return self.session().scalars(stmt).one_or_none()
[docs] def get_tool_by_id(self, object_id): stmt = select(DynamicTool).where(DynamicTool.id == object_id, DynamicTool.public == true()) return self.session().scalars(stmt).one_or_none()
[docs] def create_tool(self, trans: "ProvidesUserContext", tool_payload: DynamicToolPayload, allow_load=True): if not getattr(self.app.config, "enable_beta_tool_formats", False): raise exceptions.ConfigDoesNotAllowException( "Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools." ) dynamic_tool = None uuid_str = tool_payload.uuid # Convert uuid_str to UUID or generate new if None uuid = model.get_uuid(uuid_str) if uuid_str: # TODO: enforce via DB constraint and catch appropriate # exception. dynamic_tool = self.get_tool_by_uuid(uuid_str) if dynamic_tool: if not allow_load: raise DuplicatedIdentifierException( f"Attempted to create dynamic tool with duplicate UUID '{uuid_str}'" ) assert dynamic_tool.uuid == uuid if not dynamic_tool: if tool_payload.src == "from_path": tool_format, representation, _ = artifact_class(None, tool_payload.model_dump()) else: representation = tool_payload.representation.model_dump(by_alias=True, exclude_unset=True) if not representation: raise exceptions.ObjectAttributeMissingException("A tool 'representation' is required.") tool_format = representation.get("class") if not tool_format: raise exceptions.ObjectAttributeMissingException("Current tool representations require 'class'.") tool_directory: Optional[str] = None tool_path: Optional[str] = None if tool_payload.src == "from_path": tool_directory = tool_payload.tool_directory tool_path = tool_payload.path if tool_format in ("GalaxyTool", "GalaxyUserTool"): tool_id = representation.get("id") if not tool_id: tool_id = str(uuid) elif tool_format in ("CommandLineTool", "ExpressionTool"): # CWL tools if tool_path: proxy = tool_proxy(tool_path=tool_path, uuid=uuid) else: # Build a tool proxy so that we can convert to the persistable # hash. proxy = tool_proxy( tool_object=representation["raw_process_reference"], tool_directory=tool_directory, uuid=uuid, ) tool_id = proxy.galaxy_id() else: raise Exception(f"Unknown tool format [{tool_format}] encountered.") tool_version = representation.get("version") dynamic_tool = self.create( tool_format=tool_format, tool_id=tool_id, tool_version=tool_version, tool_path=tool_path, tool_directory=tool_directory, uuid=uuid, active=tool_payload.active, hidden=tool_payload.hidden, value=representation, public=True, ) self.app.toolbox.load_dynamic_tool(dynamic_tool) return dynamic_tool
[docs] def create_unprivileged_tool( self, user: model.User, tool_payload: DynamicUnprivilegedToolCreatePayload ) -> DynamicTool: if not getattr(self.app.config, "enable_beta_tool_formats", False): raise exceptions.ConfigDoesNotAllowException( "Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools." ) self.ensure_can_use_unprivileged_tool(user) dynamic_tool = self.create( tool_format=tool_payload.representation.class_, tool_id=tool_payload.representation.id, tool_version=tool_payload.representation.version, active=tool_payload.active, hidden=tool_payload.hidden, value=tool_payload.representation.model_dump(by_alias=True), public=False, flush=True, ) session = self.session() session.add(UserDynamicToolAssociation(user_id=user.id, dynamic_tool_id=dynamic_tool.id)) session.commit() return dynamic_tool
[docs] def list_tools(self, active=True): stmt = select(DynamicTool).where(DynamicTool.active == active) return self.session().scalars(stmt)
[docs] def list_unprivileged_tools(self, user: model.User, active=True): self.ensure_can_use_unprivileged_tool(user) owned_statement = self.owned_unprivileged_statement(user=user) stmt = owned_statement.where( DynamicTool.active == active, UserDynamicToolAssociation.active == active, ) return self.session().scalars(stmt)
[docs] def owned_unprivileged_statement(self, user: model.User): self.ensure_can_use_unprivileged_tool(user) return ( select(DynamicTool) .join(UserDynamicToolAssociation, DynamicTool.id == UserDynamicToolAssociation.dynamic_tool_id) .where( UserDynamicToolAssociation.user_id == user.id, ) .order_by(UserDynamicToolAssociation.id.desc()) )
[docs] def deactivate_unprivileged_tool(self, user: model.User, dynamic_tool: DynamicTool): update_stmt = ( update(UserDynamicToolAssociation) .where( UserDynamicToolAssociation.user_id == user.id, UserDynamicToolAssociation.dynamic_tool_id == dynamic_tool.id, ) .values(active=False) ) session = self.session() session.execute(update_stmt) session.commit()
[docs] def deactivate(self, dynamic_tool): self.update(dynamic_tool, {"active": False}) return dynamic_tool
[docs] class ToolFilterMixin: orm_filter_parsers: "OrmFilterParsersType"
[docs] def create_tool_filter(self, attr, op, val): def _create_tool_filter(model_class=None): if op == "eq": cond = model.Job.table.c.tool_id == val elif op == "contains": cond = model.Job.table.c.tool_id.contains(val, autoescape=True) else: raise_filter_err(attr, op, val, "bad op in filter") if model_class is model.HistoryDatasetAssociation: return sql.expression.and_( model.Job.table.c.id == model.JobToOutputDatasetAssociation.table.c.job_id, model.HistoryDatasetAssociation.table.c.id == model.JobToOutputDatasetAssociation.table.c.dataset_id, cond, ) elif model_class is model.HistoryDatasetCollectionAssociation: return sql.expression.and_( model.Job.id == model.JobToOutputDatasetAssociation.job_id, model.JobToOutputDatasetAssociation.dataset_id == model.DatasetCollectionElement.hda_id, model.DatasetCollectionElement.dataset_collection_id == model.HistoryDatasetCollectionAssociation.collection_id, cond, ) else: return True return _create_tool_filter
def _add_parsers(self): self.orm_filter_parsers.update( { "tool_id": self.create_tool_filter, } )