Source code for galaxy.webapps.galaxy.services.tools

import logging
import shutil
import tempfile
from json import dumps
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Union,
)

from starlette.datastructures import UploadFile

from galaxy import (
    exceptions,
    util,
)
from galaxy.config import GalaxyAppConfiguration
from galaxy.managers.collections_util import dictify_dataset_collection_instance
from galaxy.managers.context import (
    ProvidesHistoryContext,
    ProvidesUserContext,
)
from galaxy.managers.histories import HistoryManager
from galaxy.model import (
    LibraryDatasetDatasetAssociation,
    PostJobAction,
)
from galaxy.model.base import transaction
from galaxy.schema.fetch_data import (
    FetchDataFormPayload,
    FetchDataPayload,
    FilesPayload,
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.tools import Tool
from galaxy.tools.search import ToolBoxSearch
from galaxy.webapps.galaxy.services._fetch_util import validate_and_normalize_targets
from galaxy.webapps.galaxy.services.base import ServiceBase

log = logging.getLogger(__name__)


[docs]class ToolsService(ServiceBase):
[docs] def __init__( self, config: GalaxyAppConfiguration, toolbox_search: ToolBoxSearch, security: IdEncodingHelper, history_manager: HistoryManager, ): super().__init__(security) self.config = config self.toolbox_search = toolbox_search self.history_manager = history_manager
[docs] def create_fetch( self, trans: ProvidesHistoryContext, fetch_payload: Union[FetchDataFormPayload, FetchDataPayload], files: Optional[List[UploadFile]] = None, ): payload = fetch_payload.model_dump(exclude_unset=True) request_version = "1" history_id = payload.pop("history_id") clean_payload = {} files_payload = {} if files: for i, upload_file in enumerate(files): with tempfile.NamedTemporaryFile( dir=trans.app.config.new_file_path, prefix="upload_file_data_", delete=False ) as dest: shutil.copyfileobj(upload_file.file, dest) # type: ignore[misc] # https://github.com/python/mypy/issues/15031 upload_file.file.close() files_payload[f"files_{i}|file_data"] = FilesPayload( filename=upload_file.filename, local_filename=dest.name ) for key, value in payload.items(): if key == "key": continue if key.startswith("files_") or key.startswith("__files_"): files_payload[key] = value continue clean_payload[key] = value clean_payload["check_content"] = self.config.check_upload_content validate_and_normalize_targets(trans, clean_payload) request = dumps(clean_payload) create_payload = { "tool_id": "__DATA_FETCH__", "history_id": history_id, "inputs": { "request_version": request_version, "request_json": request, "file_count": str(len(files_payload)), }, } create_payload.update(files_payload) return self._create(trans, create_payload)
def _create(self, trans: ProvidesHistoryContext, payload, **kwd): if trans.user_is_bootstrap_admin: raise exceptions.RealUserRequiredException("Only real users can execute tools or run jobs.") action = payload.get("action") if action == "rerun": raise Exception("'rerun' action has been deprecated") # Get tool. tool_version = payload.get("tool_version") tool_id = payload.get("tool_id") tool_uuid = payload.get("tool_uuid") get_kwds = dict( tool_id=tool_id, tool_uuid=tool_uuid, tool_version=tool_version, ) if tool_id is None and tool_uuid is None: raise exceptions.RequestParameterMissingException("Must specify either a tool_id or a tool_uuid.") tool = trans.app.toolbox.get_tool(**get_kwds) if not tool: log.debug(f"Not found tool with kwds [{get_kwds}]") raise exceptions.ToolMissingException("Tool not found.") if not tool.allow_user_access(trans.user): raise exceptions.ItemAccessibilityException("Tool not accessible.") if self.config.user_activation_on: if not trans.user: log.warning("Anonymous user attempts to execute tool, but account activation is turned on.") elif not trans.user.active: log.warning( f'User "{trans.user.email}" attempts to execute tool, but account activation is turned on and user account is not active.' ) # Set running history from payload parameters. # History not set correctly as part of this API call for # dataset upload. if history_id := payload.get("history_id"): history_id = trans.security.decode_id(history_id) if isinstance(history_id, str) else history_id target_history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) else: target_history = None # Set up inputs. inputs = payload.get("inputs", {}) if not isinstance(inputs, dict): raise exceptions.RequestParameterInvalidException(f"inputs invalid {inputs}") # Find files coming in as multipart file data and add to inputs. for k, v in payload.items(): if k.startswith("files_") or k.startswith("__files_"): inputs[k] = v # for inputs that are coming from the Library, copy them into the history self._patch_library_inputs(trans, inputs, target_history) # TODO: encode data ids and decode ids. # TODO: handle dbkeys params = util.Params(inputs, sanitize=False) incoming = params.__dict__ # use_cached_job can be passed in via the top-level payload or among the tool inputs. # I think it should be a top-level parameter, but because the selector is implemented # as a regular tool parameter we accept both. use_cached_job = payload.get("use_cached_job", False) or util.string_as_bool( inputs.get("use_cached_job", "false") ) preferred_object_store_id = payload.get("preferred_object_store_id") input_format = str(payload.get("input_format", "legacy")) if "data_manager_mode" in payload: incoming["__data_manager_mode"] = payload["data_manager_mode"] vars = tool.handle_input( trans, incoming, history=target_history, use_cached_job=use_cached_job, input_format=input_format, preferred_object_store_id=preferred_object_store_id, ) new_pja_flush = False for job in vars.get("jobs", []): if inputs.get("send_email_notification", False): # Unless an anonymous user is invoking this via the API it # should never be an option, but check and enforce that here if trans.user is None: raise exceptions.ToolExecutionError("Anonymously run jobs cannot send an email notification.") else: job_email_action = PostJobAction("EmailAction") job.add_post_job_action(job_email_action) new_pja_flush = True if new_pja_flush: with transaction(trans.sa_session): trans.sa_session.commit() return self._handle_inputs_output_to_api_response(trans, tool, target_history, vars) def _handle_inputs_output_to_api_response(self, trans, tool, target_history, vars): # TODO: check for errors and ensure that output dataset(s) are available. output_datasets = vars.get("out_data", []) rval: Dict[str, Any] = {"outputs": [], "output_collections": [], "jobs": [], "implicit_collections": []} rval["produces_entry_points"] = tool.produces_entry_points if job_errors := vars.get("job_errors", []): # If we are here - some jobs were successfully executed but some failed. rval["errors"] = job_errors outputs = rval["outputs"] # TODO:?? poss. only return ids? for output_name, output in output_datasets: output_dict = output.to_dict() # add the output name back into the output data structure # so it's possible to figure out which newly created elements # correspond with which tool file outputs output_dict["output_name"] = output_name outputs.append(output_dict) for job in vars.get("jobs", []): rval["jobs"].append(job.to_dict(view="collection")) for output_name, collection_instance in vars.get("output_collections", []): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, url_builder=trans.url_builder, parent=history, ) output_dict["output_name"] = output_name rval["output_collections"].append(output_dict) for output_name, collection_instance in vars.get("implicit_collections", {}).items(): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, url_builder=trans.url_builder, parent=history, ) output_dict["output_name"] = output_name rval["implicit_collections"].append(output_dict) trans.security.encode_all_ids(rval, recursive=True) return rval def _search(self, q, view): """ Perform the search on the given query. Boosts and numer of results are configurable in galaxy.ini file. :param q: the query to search with :type q: str :return: Dictionary containing the tools' ids of the best hits. :return type: dict """ panel_view = view or self.config.default_panel_view results = self.toolbox_search.search( q=q, panel_view=panel_view, config=self.config, ) return results def _patch_library_inputs(self, trans: ProvidesHistoryContext, inputs, target_history): """ Transform inputs from the data library to history items. """ for k, v in inputs.items(): new_value = self._patch_library_dataset(trans, v, target_history) if new_value: v = new_value elif isinstance(v, dict) and "values" in v: for index, value in enumerate(v["values"]): patched = self._patch_library_dataset(trans, value, target_history) if patched: v["values"][index] = patched inputs[k] = v def _patch_library_dataset(self, trans: ProvidesHistoryContext, v, target_history): if isinstance(v, dict) and "id" in v and v.get("src") == "ldda": ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, self.decode_id(v["id"])) if trans.user_is_admin or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), ldda.dataset ): return ldda.to_history_dataset_association(target_history, add_to_history=True) # # -- Helper methods -- # def _get_tool(self, trans, id, tool_version=None, user=None) -> Tool: tool = trans.app.toolbox.get_tool(id, tool_version) if not tool: raise exceptions.ObjectNotFound(f"Could not find tool with id '{id}'.") if not tool.allow_user_access(user): raise exceptions.AuthenticationFailed(f"Access denied, please login for tool with id '{id}'.") return tool def _detect(self, trans: ProvidesUserContext, tool_id): """ Detect whether the tool with the given id is installed. :param tool_id: exact id of the tool :type tool_id: str :return: list with available versions "return type: list """ tools = trans.app.toolbox.get_tool(tool_id, get_all_versions=True) detected_versions = [] if tools: for tool in tools: if tool and tool.allow_user_access(trans.user): detected_versions.append(tool.version) return detected_versions