Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.services.tools
import logging
import shutil
import tempfile
from json import dumps
from typing import (
Any,
Dict,
List,
Optional,
Union,
)
from starlette.datastructures import UploadFile
from galaxy import (
exceptions,
util,
)
from galaxy.config import GalaxyAppConfiguration
from galaxy.managers.collections_util import dictify_dataset_collection_instance
from galaxy.managers.context import (
ProvidesHistoryContext,
ProvidesUserContext,
)
from galaxy.managers.histories import HistoryManager
from galaxy.model import (
LibraryDatasetDatasetAssociation,
PostJobAction,
)
from galaxy.model.base import transaction
from galaxy.schema.fetch_data import (
FetchDataFormPayload,
FetchDataPayload,
FilesPayload,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.tools import Tool
from galaxy.tools.search import ToolBoxSearch
from galaxy.webapps.galaxy.services._fetch_util import validate_and_normalize_targets
from galaxy.webapps.galaxy.services.base import ServiceBase
log = logging.getLogger(__name__)
[docs]class ToolsService(ServiceBase):
[docs] def __init__(
self,
config: GalaxyAppConfiguration,
toolbox_search: ToolBoxSearch,
security: IdEncodingHelper,
history_manager: HistoryManager,
):
super().__init__(security)
self.config = config
self.toolbox_search = toolbox_search
self.history_manager = history_manager
[docs] def create_fetch(
self,
trans: ProvidesHistoryContext,
fetch_payload: Union[FetchDataFormPayload, FetchDataPayload],
files: Optional[List[UploadFile]] = None,
):
payload = fetch_payload.model_dump(exclude_unset=True)
request_version = "1"
history_id = payload.pop("history_id")
clean_payload = {}
files_payload = {}
if files:
for i, upload_file in enumerate(files):
with tempfile.NamedTemporaryFile(
dir=trans.app.config.new_file_path, prefix="upload_file_data_", delete=False
) as dest:
shutil.copyfileobj(upload_file.file, dest) # type: ignore[misc] # https://github.com/python/mypy/issues/15031
upload_file.file.close()
files_payload[f"files_{i}|file_data"] = FilesPayload(
filename=upload_file.filename, local_filename=dest.name
)
for key, value in payload.items():
if key == "key":
continue
if key.startswith("files_") or key.startswith("__files_"):
files_payload[key] = value
continue
clean_payload[key] = value
clean_payload["check_content"] = self.config.check_upload_content
validate_and_normalize_targets(trans, clean_payload)
request = dumps(clean_payload)
create_payload = {
"tool_id": "__DATA_FETCH__",
"history_id": history_id,
"inputs": {
"request_version": request_version,
"request_json": request,
"file_count": str(len(files_payload)),
},
}
create_payload.update(files_payload)
return self._create(trans, create_payload)
def _create(self, trans: ProvidesHistoryContext, payload, **kwd):
if trans.user_is_bootstrap_admin:
raise exceptions.RealUserRequiredException("Only real users can execute tools or run jobs.")
action = payload.get("action")
if action == "rerun":
raise Exception("'rerun' action has been deprecated")
# Get tool.
tool_version = payload.get("tool_version")
tool_id = payload.get("tool_id")
tool_uuid = payload.get("tool_uuid")
get_kwds = dict(
tool_id=tool_id,
tool_uuid=tool_uuid,
tool_version=tool_version,
)
if tool_id is None and tool_uuid is None:
raise exceptions.RequestParameterMissingException("Must specify either a tool_id or a tool_uuid.")
tool = trans.app.toolbox.get_tool(**get_kwds)
if not tool:
log.debug(f"Not found tool with kwds [{get_kwds}]")
raise exceptions.ToolMissingException("Tool not found.")
if not tool.allow_user_access(trans.user):
raise exceptions.ItemAccessibilityException("Tool not accessible.")
if self.config.user_activation_on:
if not trans.user:
log.warning("Anonymous user attempts to execute tool, but account activation is turned on.")
elif not trans.user.active:
log.warning(
f'User "{trans.user.email}" attempts to execute tool, but account activation is turned on and user account is not active.'
)
# Set running history from payload parameters.
# History not set correctly as part of this API call for
# dataset upload.
if history_id := payload.get("history_id"):
history_id = trans.security.decode_id(history_id) if isinstance(history_id, str) else history_id
target_history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history)
else:
target_history = None
# Set up inputs.
inputs = payload.get("inputs", {})
if not isinstance(inputs, dict):
raise exceptions.RequestParameterInvalidException(f"inputs invalid {inputs}")
# Find files coming in as multipart file data and add to inputs.
for k, v in payload.items():
if k.startswith("files_") or k.startswith("__files_"):
inputs[k] = v
# for inputs that are coming from the Library, copy them into the history
self._patch_library_inputs(trans, inputs, target_history)
# TODO: encode data ids and decode ids.
# TODO: handle dbkeys
params = util.Params(inputs, sanitize=False)
incoming = params.__dict__
# use_cached_job can be passed in via the top-level payload or among the tool inputs.
# I think it should be a top-level parameter, but because the selector is implemented
# as a regular tool parameter we accept both.
use_cached_job = payload.get("use_cached_job", False) or util.string_as_bool(
inputs.get("use_cached_job", "false")
)
preferred_object_store_id = payload.get("preferred_object_store_id")
input_format = str(payload.get("input_format", "legacy"))
if "data_manager_mode" in payload:
incoming["__data_manager_mode"] = payload["data_manager_mode"]
vars = tool.handle_input(
trans,
incoming,
history=target_history,
use_cached_job=use_cached_job,
input_format=input_format,
preferred_object_store_id=preferred_object_store_id,
)
new_pja_flush = False
for job in vars.get("jobs", []):
if inputs.get("send_email_notification", False):
# Unless an anonymous user is invoking this via the API it
# should never be an option, but check and enforce that here
if trans.user is None:
raise exceptions.ToolExecutionError("Anonymously run jobs cannot send an email notification.")
else:
job_email_action = PostJobAction("EmailAction")
job.add_post_job_action(job_email_action)
new_pja_flush = True
if new_pja_flush:
with transaction(trans.sa_session):
trans.sa_session.commit()
return self._handle_inputs_output_to_api_response(trans, tool, target_history, vars)
def _handle_inputs_output_to_api_response(self, trans, tool, target_history, vars):
# TODO: check for errors and ensure that output dataset(s) are available.
output_datasets = vars.get("out_data", [])
rval: Dict[str, Any] = {"outputs": [], "output_collections": [], "jobs": [], "implicit_collections": []}
rval["produces_entry_points"] = tool.produces_entry_points
if job_errors := vars.get("job_errors", []):
# If we are here - some jobs were successfully executed but some failed.
rval["errors"] = job_errors
outputs = rval["outputs"]
# TODO:?? poss. only return ids?
for output_name, output in output_datasets:
output_dict = output.to_dict()
# add the output name back into the output data structure
# so it's possible to figure out which newly created elements
# correspond with which tool file outputs
output_dict["output_name"] = output_name
outputs.append(output_dict)
for job in vars.get("jobs", []):
rval["jobs"].append(job.to_dict(view="collection"))
for output_name, collection_instance in vars.get("output_collections", []):
history = target_history or trans.history
output_dict = dictify_dataset_collection_instance(
collection_instance,
security=trans.security,
url_builder=trans.url_builder,
parent=history,
)
output_dict["output_name"] = output_name
rval["output_collections"].append(output_dict)
for output_name, collection_instance in vars.get("implicit_collections", {}).items():
history = target_history or trans.history
output_dict = dictify_dataset_collection_instance(
collection_instance,
security=trans.security,
url_builder=trans.url_builder,
parent=history,
)
output_dict["output_name"] = output_name
rval["implicit_collections"].append(output_dict)
trans.security.encode_all_ids(rval, recursive=True)
return rval
def _search(self, q, view):
"""
Perform the search on the given query.
Boosts and numer of results are configurable in galaxy.ini file.
:param q: the query to search with
:type q: str
:return: Dictionary containing the tools' ids of the best hits.
:return type: dict
"""
panel_view = view or self.config.default_panel_view
results = self.toolbox_search.search(
q=q,
panel_view=panel_view,
config=self.config,
)
return results
def _patch_library_inputs(self, trans: ProvidesHistoryContext, inputs, target_history):
"""
Transform inputs from the data library to history items.
"""
for k, v in inputs.items():
new_value = self._patch_library_dataset(trans, v, target_history)
if new_value:
v = new_value
elif isinstance(v, dict) and "values" in v:
for index, value in enumerate(v["values"]):
patched = self._patch_library_dataset(trans, value, target_history)
if patched:
v["values"][index] = patched
inputs[k] = v
def _patch_library_dataset(self, trans: ProvidesHistoryContext, v, target_history):
if isinstance(v, dict) and "id" in v and v.get("src") == "ldda":
ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, self.decode_id(v["id"]))
if trans.user_is_admin or trans.app.security_agent.can_access_dataset(
trans.get_current_user_roles(), ldda.dataset
):
return ldda.to_history_dataset_association(target_history, add_to_history=True)
#
# -- Helper methods --
#
def _get_tool(self, trans, id, tool_version=None, user=None) -> Tool:
tool = trans.app.toolbox.get_tool(id, tool_version)
if not tool:
raise exceptions.ObjectNotFound(f"Could not find tool with id '{id}'.")
if not tool.allow_user_access(user):
raise exceptions.AuthenticationFailed(f"Access denied, please login for tool with id '{id}'.")
return tool
def _detect(self, trans: ProvidesUserContext, tool_id):
"""
Detect whether the tool with the given id is installed.
:param tool_id: exact id of the tool
:type tool_id: str
:return: list with available versions
"return type: list
"""
tools = trans.app.toolbox.get_tool(tool_id, get_all_versions=True)
detected_versions = []
if tools:
for tool in tools:
if tool and tool.allow_user_access(trans.user):
detected_versions.append(tool.version)
return detected_versions