Source code for galaxy.managers.tools
import logging
from typing import (
Any,
Dict,
Optional,
TYPE_CHECKING,
Union,
)
from uuid import UUID
from sqlalchemy import (
exists,
false,
select,
sql,
true,
update,
)
from galaxy import (
exceptions,
model,
)
from galaxy.exceptions import DuplicatedIdentifierException
from galaxy.model import (
DynamicTool,
UserDynamicToolAssociation,
)
from galaxy.tool_util.cwl import tool_proxy
from galaxy.tool_util.parser.yaml import YamlToolSource
from galaxy.tool_util_models.dynamic_tool_models import (
DynamicToolPayload,
DynamicUnprivilegedToolCreatePayload,
)
from galaxy.tools import (
create_tool_from_source,
Tool,
)
from .base import (
ModelManager,
raise_filter_err,
)
from .executables import artifact_class
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from galaxy.managers.base import OrmFilterParsersType
from galaxy.managers.context import ProvidesUserContext
[docs]
def tool_payload_to_tool(app, tool_dict: Dict[str, Any]) -> Optional[Tool]:
tool_source = YamlToolSource(tool_dict)
tool = create_tool_from_source(app, tool_source=tool_source, tool_dir=None)
return tool
[docs]
class DynamicToolManager(ModelManager[model.DynamicTool]):
"""Manages dynamic tools stored in Galaxy's database."""
model_class = model.DynamicTool
[docs]
def ensure_can_use_unprivileged_tool(self, user: model.User):
stmt = select(
exists().where(
model.UserRoleAssociation.user_id == user.id,
model.UserRoleAssociation.role_id == model.Role.id,
model.Role.type == model.Role.types.USER_TOOL_EXECUTE,
model.Role.deleted == false(),
)
)
if not self.session().execute(stmt).scalar():
raise exceptions.InsufficientPermissionsException("User is not allowed to run unprivileged tools")
[docs]
def get_tool_by_id_or_uuid(self, id_or_uuid: Union[int, str]):
if isinstance(id_or_uuid, int):
return self.get_tool_by_id(id_or_uuid)
else:
return self.get_tool_by_uuid(id_or_uuid)
[docs]
def get_tool_by_uuid(self, uuid: Optional[Union[UUID, str]]):
stmt = select(DynamicTool).where(DynamicTool.uuid == uuid)
return self.session().scalars(stmt).one_or_none()
[docs]
def get_tool_by_tool_id(self, tool_id):
stmt = select(DynamicTool).where(DynamicTool.tool_id == tool_id, DynamicTool.public == true())
return self.session().scalars(stmt).one_or_none()
[docs]
def get_unprivileged_tool_by_uuid(self, user: model.User, uuid: Union[UUID, str]):
stmt = self.owned_unprivileged_statement(user).where(DynamicTool.uuid == uuid)
return self.session().scalars(stmt).one_or_none()
[docs]
def get_unprivileged_tool_by_tool_id(self, user: model.User, tool_id: str):
stmt = self.owned_unprivileged_statement(user).where(DynamicTool.tool_id == tool_id)
return self.session().scalars(stmt).one_or_none()
[docs]
def get_tool_by_id(self, object_id):
stmt = select(DynamicTool).where(DynamicTool.id == object_id, DynamicTool.public == true())
return self.session().scalars(stmt).one_or_none()
[docs]
def create_tool(self, trans: "ProvidesUserContext", tool_payload: DynamicToolPayload, allow_load=True):
if not getattr(self.app.config, "enable_beta_tool_formats", False):
raise exceptions.ConfigDoesNotAllowException(
"Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools."
)
dynamic_tool = None
uuid_str = tool_payload.uuid
# Convert uuid_str to UUID or generate new if None
uuid = model.get_uuid(uuid_str)
if uuid_str:
# TODO: enforce via DB constraint and catch appropriate
# exception.
dynamic_tool = self.get_tool_by_uuid(uuid_str)
if dynamic_tool:
if not allow_load:
raise DuplicatedIdentifierException(
f"Attempted to create dynamic tool with duplicate UUID '{uuid_str}'"
)
assert dynamic_tool.uuid == uuid
if not dynamic_tool:
if tool_payload.src == "from_path":
tool_format, representation, _ = artifact_class(None, tool_payload.model_dump())
else:
representation = tool_payload.representation.model_dump(by_alias=True, exclude_unset=True)
if not representation:
raise exceptions.ObjectAttributeMissingException("A tool 'representation' is required.")
tool_format = representation.get("class")
if not tool_format:
raise exceptions.ObjectAttributeMissingException("Current tool representations require 'class'.")
tool_directory: Optional[str] = None
tool_path: Optional[str] = None
if tool_payload.src == "from_path":
tool_directory = tool_payload.tool_directory
tool_path = tool_payload.path
if tool_format in ("GalaxyTool", "GalaxyUserTool"):
tool_id = representation.get("id")
if not tool_id:
tool_id = str(uuid)
elif tool_format in ("CommandLineTool", "ExpressionTool"):
# CWL tools
if tool_path:
proxy = tool_proxy(tool_path=tool_path, uuid=uuid)
else:
# Build a tool proxy so that we can convert to the persistable
# hash.
proxy = tool_proxy(
tool_object=representation["raw_process_reference"],
tool_directory=tool_directory,
uuid=uuid,
)
tool_id = proxy.galaxy_id()
else:
raise Exception(f"Unknown tool format [{tool_format}] encountered.")
tool_version = representation.get("version")
dynamic_tool = self.create(
tool_format=tool_format,
tool_id=tool_id,
tool_version=tool_version,
tool_path=tool_path,
tool_directory=tool_directory,
uuid=uuid,
active=tool_payload.active,
hidden=tool_payload.hidden,
value=representation,
public=True,
)
self.app.toolbox.load_dynamic_tool(dynamic_tool)
return dynamic_tool
[docs]
def create_unprivileged_tool(
self, user: model.User, tool_payload: DynamicUnprivilegedToolCreatePayload
) -> DynamicTool:
if not getattr(self.app.config, "enable_beta_tool_formats", False):
raise exceptions.ConfigDoesNotAllowException(
"Set 'enable_beta_tool_formats' in Galaxy config to create dynamic tools."
)
self.ensure_can_use_unprivileged_tool(user)
dynamic_tool = self.create(
tool_format=tool_payload.representation.class_,
tool_id=tool_payload.representation.id,
tool_version=tool_payload.representation.version,
active=tool_payload.active,
hidden=tool_payload.hidden,
value=tool_payload.representation.model_dump(by_alias=True),
public=False,
flush=True,
)
session = self.session()
session.add(UserDynamicToolAssociation(user_id=user.id, dynamic_tool_id=dynamic_tool.id))
session.commit()
return dynamic_tool
[docs]
def list_tools(self, active=True):
stmt = select(DynamicTool).where(DynamicTool.active == active)
return self.session().scalars(stmt)
[docs]
def list_unprivileged_tools(self, user: model.User, active=True):
self.ensure_can_use_unprivileged_tool(user)
owned_statement = self.owned_unprivileged_statement(user=user)
stmt = owned_statement.where(
DynamicTool.active == active,
UserDynamicToolAssociation.active == active,
)
return self.session().scalars(stmt)
[docs]
def owned_unprivileged_statement(self, user: model.User):
self.ensure_can_use_unprivileged_tool(user)
return (
select(DynamicTool)
.join(UserDynamicToolAssociation, DynamicTool.id == UserDynamicToolAssociation.dynamic_tool_id)
.where(
UserDynamicToolAssociation.user_id == user.id,
)
.order_by(UserDynamicToolAssociation.id.desc())
)
[docs]
def deactivate_unprivileged_tool(self, user: model.User, dynamic_tool: DynamicTool):
update_stmt = (
update(UserDynamicToolAssociation)
.where(
UserDynamicToolAssociation.user_id == user.id,
UserDynamicToolAssociation.dynamic_tool_id == dynamic_tool.id,
)
.values(active=False)
)
session = self.session()
session.execute(update_stmt)
session.commit()
[docs]
def deactivate(self, dynamic_tool):
self.update(dynamic_tool, {"active": False})
return dynamic_tool
[docs]
class ToolFilterMixin:
orm_filter_parsers: "OrmFilterParsersType"
[docs]
def create_tool_filter(self, attr, op, val):
def _create_tool_filter(model_class=None):
if op == "eq":
cond = model.Job.table.c.tool_id == val
elif op == "contains":
cond = model.Job.table.c.tool_id.contains(val, autoescape=True)
else:
raise_filter_err(attr, op, val, "bad op in filter")
if model_class is model.HistoryDatasetAssociation:
return sql.expression.and_(
model.Job.table.c.id == model.JobToOutputDatasetAssociation.table.c.job_id,
model.HistoryDatasetAssociation.table.c.id
== model.JobToOutputDatasetAssociation.table.c.dataset_id,
cond,
)
elif model_class is model.HistoryDatasetCollectionAssociation:
return sql.expression.and_(
model.Job.id == model.JobToOutputDatasetAssociation.job_id,
model.JobToOutputDatasetAssociation.dataset_id == model.DatasetCollectionElement.hda_id,
model.DatasetCollectionElement.dataset_collection_id
== model.HistoryDatasetCollectionAssociation.collection_id,
cond,
)
else:
return True
return _create_tool_filter
def _add_parsers(self):
self.orm_filter_parsers.update(
{
"tool_id": self.create_tool_filter,
}
)