Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.services.tools

import logging
import shutil
import tempfile
from json import dumps
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Union,
)

from starlette.datastructures import UploadFile

from galaxy import (
    exceptions,
    util,
)
from galaxy.config import GalaxyAppConfiguration
from galaxy.managers.collections_util import dictify_dataset_collection_instance
from galaxy.managers.context import (
    ProvidesHistoryContext,
    ProvidesUserContext,
)
from galaxy.managers.histories import HistoryManager
from galaxy.model import (
    LibraryDatasetDatasetAssociation,
    PostJobAction,
)
from galaxy.model.base import transaction
from galaxy.schema.fetch_data import (
    FetchDataFormPayload,
    FetchDataPayload,
    FilesPayload,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.tools import Tool
from galaxy.tools.search import ToolBoxSearch
from galaxy.webapps.galaxy.services._fetch_util import validate_and_normalize_targets
from galaxy.webapps.galaxy.services.base import ServiceBase

log = logging.getLogger(__name__)


[docs]class ToolsService(ServiceBase):
[docs] def __init__( self, config: GalaxyAppConfiguration, toolbox_search: ToolBoxSearch, security: IdEncodingHelper, history_manager: HistoryManager, ): super().__init__(security) self.config = config self.toolbox_search = toolbox_search self.history_manager = history_manager
[docs] def create_fetch( self, trans: ProvidesHistoryContext, fetch_payload: Union[FetchDataFormPayload, FetchDataPayload], files: Optional[List[UploadFile]] = None, ): payload = fetch_payload.model_dump(exclude_unset=True) request_version = "1" history_id = payload.pop("history_id") clean_payload = {} files_payload = {} if files: for i, upload_file in enumerate(files): with tempfile.NamedTemporaryFile( dir=trans.app.config.new_file_path, prefix="upload_file_data_", delete=False ) as dest: shutil.copyfileobj(upload_file.file, dest) # type: ignore[misc] # https://github.com/python/mypy/issues/15031 upload_file.file.close() files_payload[f"files_{i}|file_data"] = FilesPayload( filename=upload_file.filename, local_filename=dest.name ) for key, value in payload.items(): if key == "key": continue if key.startswith("files_") or key.startswith("__files_"): files_payload[key] = value continue clean_payload[key] = value clean_payload["check_content"] = self.config.check_upload_content validate_and_normalize_targets(trans, clean_payload) request = dumps(clean_payload) create_payload = { "tool_id": "__DATA_FETCH__", "history_id": history_id, "inputs": { "request_version": request_version, "request_json": request, "file_count": str(len(files_payload)), }, } create_payload.update(files_payload) return self._create(trans, create_payload)
def _create(self, trans: ProvidesHistoryContext, payload, **kwd): if trans.user_is_bootstrap_admin: raise exceptions.RealUserRequiredException("Only real users can execute tools or run jobs.") action = payload.get("action") if action == "rerun": raise Exception("'rerun' action has been deprecated") # Get tool. tool_version = payload.get("tool_version") tool_id = payload.get("tool_id") tool_uuid = payload.get("tool_uuid") get_kwds = dict( tool_id=tool_id, tool_uuid=tool_uuid, tool_version=tool_version, ) if tool_id is None and tool_uuid is None: raise exceptions.RequestParameterMissingException("Must specify either a tool_id or a tool_uuid.") tool = trans.app.toolbox.get_tool(**get_kwds) if not tool: log.debug(f"Not found tool with kwds [{get_kwds}]") raise exceptions.ToolMissingException("Tool not found.") if not tool.allow_user_access(trans.user): raise exceptions.ItemAccessibilityException("Tool not accessible.") if self.config.user_activation_on: if not trans.user: log.warning("Anonymous user attempts to execute tool, but account activation is turned on.") elif not trans.user.active: log.warning( f'User "{trans.user.email}" attempts to execute tool, but account activation is turned on and user account is not active.' ) # Set running history from payload parameters. # History not set correctly as part of this API call for # dataset upload. if history_id := payload.get("history_id"): history_id = trans.security.decode_id(history_id) if isinstance(history_id, str) else history_id target_history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) else: target_history = None # Set up inputs. inputs = payload.get("inputs", {}) if not isinstance(inputs, dict): raise exceptions.RequestParameterInvalidException(f"inputs invalid {inputs}") # Find files coming in as multipart file data and add to inputs. for k, v in payload.items(): if k.startswith("files_") or k.startswith("__files_"): inputs[k] = v # for inputs that are coming from the Library, copy them into the history self._patch_library_inputs(trans, inputs, target_history) # TODO: encode data ids and decode ids. # TODO: handle dbkeys params = util.Params(inputs, sanitize=False) incoming = params.__dict__ # use_cached_job can be passed in via the top-level payload or among the tool inputs. # I think it should be a top-level parameter, but because the selector is implemented # as a regular tool parameter we accept both. use_cached_job = payload.get("use_cached_job", False) or util.string_as_bool( inputs.get("use_cached_job", "false") ) preferred_object_store_id = payload.get("preferred_object_store_id") input_format = str(payload.get("input_format", "legacy")) if "data_manager_mode" in payload: incoming["__data_manager_mode"] = payload["data_manager_mode"] vars = tool.handle_input( trans, incoming, history=target_history, use_cached_job=use_cached_job, input_format=input_format, preferred_object_store_id=preferred_object_store_id, ) new_pja_flush = False for job in vars.get("jobs", []): if inputs.get("send_email_notification", False): # Unless an anonymous user is invoking this via the API it # should never be an option, but check and enforce that here if trans.user is None: raise exceptions.ToolExecutionError("Anonymously run jobs cannot send an email notification.") else: job_email_action = PostJobAction("EmailAction") job.add_post_job_action(job_email_action) new_pja_flush = True if new_pja_flush: with transaction(trans.sa_session): trans.sa_session.commit() return self._handle_inputs_output_to_api_response(trans, tool, target_history, vars) def _handle_inputs_output_to_api_response(self, trans, tool, target_history, vars): # TODO: check for errors and ensure that output dataset(s) are available. output_datasets = vars.get("out_data", []) rval: Dict[str, Any] = {"outputs": [], "output_collections": [], "jobs": [], "implicit_collections": []} rval["produces_entry_points"] = tool.produces_entry_points if job_errors := vars.get("job_errors", []): # If we are here - some jobs were successfully executed but some failed. rval["errors"] = job_errors outputs = rval["outputs"] # TODO:?? poss. only return ids? for output_name, output in output_datasets: output_dict = output.to_dict() # add the output name back into the output data structure # so it's possible to figure out which newly created elements # correspond with which tool file outputs output_dict["output_name"] = output_name outputs.append(output_dict) for job in vars.get("jobs", []): rval["jobs"].append(job.to_dict(view="collection")) for output_name, collection_instance in vars.get("output_collections", []): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, url_builder=trans.url_builder, parent=history, ) output_dict["output_name"] = output_name rval["output_collections"].append(output_dict) for output_name, collection_instance in vars.get("implicit_collections", {}).items(): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, url_builder=trans.url_builder, parent=history, ) output_dict["output_name"] = output_name rval["implicit_collections"].append(output_dict) trans.security.encode_all_ids(rval, recursive=True) return rval def _search(self, q, view): """ Perform the search on the given query. Boosts and numer of results are configurable in galaxy.ini file. :param q: the query to search with :type q: str :return: Dictionary containing the tools' ids of the best hits. :return type: dict """ panel_view = view or self.config.default_panel_view results = self.toolbox_search.search( q=q, panel_view=panel_view, config=self.config, ) return results def _patch_library_inputs(self, trans: ProvidesHistoryContext, inputs, target_history): """ Transform inputs from the data library to history items. """ for k, v in inputs.items(): new_value = self._patch_library_dataset(trans, v, target_history) if new_value: v = new_value elif isinstance(v, dict) and "values" in v: for index, value in enumerate(v["values"]): patched = self._patch_library_dataset(trans, value, target_history) if patched: v["values"][index] = patched inputs[k] = v def _patch_library_dataset(self, trans: ProvidesHistoryContext, v, target_history): if isinstance(v, dict) and "id" in v and v.get("src") == "ldda": ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, self.decode_id(v["id"])) if trans.user_is_admin or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), ldda.dataset ): return ldda.to_history_dataset_association(target_history, add_to_history=True) # # -- Helper methods -- # def _get_tool(self, trans, id, tool_version=None, user=None) -> Tool: tool = trans.app.toolbox.get_tool(id, tool_version) if not tool: raise exceptions.ObjectNotFound(f"Could not find tool with id '{id}'.") if not tool.allow_user_access(user): raise exceptions.AuthenticationFailed(f"Access denied, please login for tool with id '{id}'.") return tool def _detect(self, trans: ProvidesUserContext, tool_id): """ Detect whether the tool with the given id is installed. :param tool_id: exact id of the tool :type tool_id: str :return: list with available versions "return type: list """ tools = trans.app.toolbox.get_tool(tool_id, get_all_versions=True) detected_versions = [] if tools: for tool in tools: if tool and tool.allow_user_access(trans.user): detected_versions.append(tool.version) return detected_versions