Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.services.tools

import logging
import os
import shutil
import tempfile
from json import dumps
from typing import (
    Any,
    cast,
    get_args,
    Optional,
    Union,
)

from starlette.datastructures import UploadFile

from galaxy import (
    exceptions,
    util,
)
from galaxy.config import GalaxyAppConfiguration
from galaxy.exceptions import RequestParameterInvalidException
from galaxy.exceptions.utils import api_error_to_dict
from galaxy.managers.collections_util import dictify_dataset_collection_instance
from galaxy.managers.context import (
    ProvidesHistoryContext,
    ProvidesUserContext,
)
from galaxy.managers.histories import HistoryManager
from galaxy.managers.tools import (
    get_tool_from_trans,
    ToolRunReference,
)
from galaxy.model import (
    LibraryDatasetDatasetAssociation,
    PostJobAction,
    User,
)
from galaxy.schema.credentials import CredentialsContext
from galaxy.schema.fetch_data import (
    CreateDataLandingPayload,
    CreateFileLandingPayload,
    DataElementsTarget,
    FetchDataFormPayload,
    FetchDataPayload,
    FilesPayload,
    HdaDestination,
    HdcaDataItemsTarget,
    HdcaDestination,
    NestedElement,
    TargetsAdapter,
    UrlDataElement,
)
from galaxy.schema.schema import CreateToolLandingRequestPayload
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.tool_util.parameters import ToolParameterT
from galaxy.tool_util_models.parameters import (
    CollectionElementCollectionRequestUri,
    CollectionElementDataRequestUri,
    DataRequestCollectionUri,
    DataRequestUri,
    FileRequestUri,
)
from galaxy.tools import Tool
from galaxy.tools._types import InputFormatT
from galaxy.tools.search import ToolBoxSearch
from galaxy.util.path import safe_contains
from galaxy.webapps.galaxy.services._fetch_util import validate_and_normalize_targets
from galaxy.webapps.galaxy.services.base import ServiceBase

log = logging.getLogger(__name__)

ToolRunPayload = dict[str, Any]
JobCreateResponse = dict[str, Any]


[docs] def get_tool(trans: ProvidesHistoryContext, tool_ref: ToolRunReference) -> Tool: tool: Optional[Tool] = None if tool_ref.tool_uuid and trans.user: tool = trans.app.toolbox.get_unprivileged_tool_or_none(trans.user, tool_uuid=tool_ref.tool_uuid) if not tool: tool_id = tool_ref.tool_id tool_uuid = tool_ref.tool_uuid tool_version = tool_ref.tool_version tool = trans.app.toolbox.get_tool( tool_id=tool_id, tool_uuid=tool_uuid, tool_version=tool_version, ) if not tool: log.debug(f"Not found tool with kwds [{tool_ref}]") raise exceptions.ToolMissingException("Tool not found.") return tool
[docs] def validate_tool_for_running(trans: ProvidesHistoryContext, tool_ref: ToolRunReference) -> Tool: if trans.user_is_bootstrap_admin: raise exceptions.RealUserRequiredException("Only real users can execute tools or run jobs.") if tool_ref.tool_id is None and tool_ref.tool_uuid is None: raise exceptions.RequestParameterMissingException("Must specify a valid tool_id to use this endpoint.") tool = get_tool_from_trans(trans, tool_ref) if not tool.allow_user_access(trans.user): raise exceptions.ItemAccessibilityException("Tool not accessible.") return tool
[docs] def file_landing_payload_to_fetch_targets(data_landing_payload: CreateFileLandingPayload): """Convert a CreateDataLandingPayload with DataOrCollectionRequest format to FetchDataPayload with Targets format. This function transforms data/collection requests (used in workflow landing and data request payloads) into the fetch API's target format. """ # Validate sample sheet metadata before conversion for request_item in data_landing_payload.request_state: if isinstance(request_item, DataRequestCollectionUri): has_sample_sheet_metadata = request_item.column_definitions is not None or request_item.rows is not None if has_sample_sheet_metadata: collection_type = request_item.collection_type if not collection_type.startswith("sample_sheet"): raise RequestParameterInvalidException( f"Sample sheet metadata (column_definitions, rows) can only be used with collection_type 'sample_sheet' or 'sample_sheet:<type>', not '{collection_type}'" ) targets: list[Union[DataElementsTarget, HdcaDataItemsTarget]] = [] for request_item in data_landing_payload.request_state: if isinstance(request_item, (DataRequestUri, FileRequestUri)): # Convert single file/URL request to a DataElementsTarget element = UrlDataElement( src="url", url=str(request_item.url), ext=request_item.ext, dbkey=request_item.dbkey, name=request_item.name, deferred=request_item.deferred, info=request_item.info, tags=request_item.tags, space_to_tab=request_item.space_to_tab, to_posix_lines=request_item.to_posix_lines, created_from_basename=request_item.created_from_basename, ) targets.append( DataElementsTarget( destination=HdaDestination(type="hdas"), elements=[element], ) ) elif isinstance(request_item, DataRequestCollectionUri): # Convert collection request to HdcaDataItemsTarget def convert_collection_element(elem): """Convert a collection element (file or nested collection) recursively.""" if isinstance(elem, CollectionElementDataRequestUri): # This is a file element return UrlDataElement( src="url", url=str(elem.url), ext=elem.ext, dbkey=elem.dbkey, name=elem.identifier, deferred=elem.deferred, info=elem.info, tags=elem.tags, space_to_tab=elem.space_to_tab, to_posix_lines=elem.to_posix_lines, created_from_basename=elem.created_from_basename, ) elif isinstance(elem, CollectionElementCollectionRequestUri): # This is a nested collection element # Recursively convert its elements nested_elements = [convert_collection_element(nested_elem) for nested_elem in elem.elements] return NestedElement( name=elem.identifier, elements=nested_elements, collection_type=elem.collection_type, ) else: raise ValueError(f"Unknown collection element type: {type(elem)}") elements = [convert_collection_element(elem) for elem in request_item.elements] targets.append( HdcaDataItemsTarget( destination=HdcaDestination(type="hdca"), elements=elements, collection_type=request_item.collection_type, name=request_item.name, column_definitions=request_item.column_definitions, rows=request_item.rows, ) ) return [target.model_dump(mode="json", exclude_unset=True) for target in TargetsAdapter.validate_python(targets)]
[docs] class ToolsService(ServiceBase):
[docs] def __init__( self, config: GalaxyAppConfiguration, toolbox_search: ToolBoxSearch, security: IdEncodingHelper, history_manager: HistoryManager, ): super().__init__(security) self.config = config self.toolbox_search = toolbox_search self.history_manager = history_manager
[docs] def file_landing_to_tool_landing( self, trans: ProvidesUserContext, file_landing_payload: CreateFileLandingPayload, ) -> CreateToolLandingRequestPayload: request_version = "1" payload = {"targets": file_landing_payload_to_fetch_targets(file_landing_payload)} validate_and_normalize_targets(trans, payload, set_internal_fields=False) request_state = { "request_version": request_version, "request_json": { "targets": payload["targets"], }, "file_count": "0", } return CreateToolLandingRequestPayload( tool_id="__DATA_FETCH__", tool_version=None, request_state=request_state, client_secret=file_landing_payload.client_secret, public=file_landing_payload.public, )
[docs] def data_landing_to_tool_landing( self, trans: ProvidesUserContext, data_landing_payload: CreateDataLandingPayload, ) -> CreateToolLandingRequestPayload: request_version = "1" payload = data_landing_payload.model_dump(exclude_unset=True)["request_state"] validate_and_normalize_targets(trans, payload, set_internal_fields=False) validated_back_to_model = TargetsAdapter.validate_python(payload["targets"]) request_state = { "request_version": request_version, "request_json": {"targets": TargetsAdapter.dump_python(validated_back_to_model, exclude_unset=True)}, "file_count": "0", } return CreateToolLandingRequestPayload( tool_id="__DATA_FETCH__", tool_version=None, request_state=request_state, client_secret=data_landing_payload.client_secret, public=data_landing_payload.public, )
[docs] def inputs( self, trans: ProvidesHistoryContext, tool_ref: ToolRunReference, ) -> list[ToolParameterT]: tool = get_tool(trans, tool_ref) return tool.parameters
[docs] def create_fetch( self, trans: ProvidesHistoryContext, fetch_payload: Union[FetchDataFormPayload, FetchDataPayload], files: Optional[list[UploadFile]] = None, ) -> JobCreateResponse: payload = fetch_payload.model_dump(exclude_unset=True) request_version = "1" history_id = payload.pop("history_id") clean_payload = {} files_payload = {} if files: for i, upload_file in enumerate(files): with tempfile.NamedTemporaryFile( dir=trans.app.config.new_file_path, prefix="upload_file_data_", delete=False ) as dest: shutil.copyfileobj(upload_file.file, dest) util.umask_fix_perms(dest.name, trans.app.config.umask, 0o0666) upload_file.file.close() files_payload[f"files_{i}|file_data"] = FilesPayload( filename=upload_file.filename, local_filename=dest.name ) for key, value in payload.items(): if key == "key": continue if key.startswith("files_") or key.startswith("__files_"): files_payload[key] = value continue clean_payload[key] = value clean_payload["check_content"] = self.config.check_upload_content validate_and_normalize_targets(trans, clean_payload) request = dumps(clean_payload) create_payload: ToolRunPayload = { "tool_id": "__DATA_FETCH__", "history_id": history_id, "inputs": { "request_version": request_version, "request_json": request, "file_count": str(len(files_payload)), }, } create_payload.update(files_payload) return self._create(trans, create_payload)
def _create(self, trans: ProvidesHistoryContext, payload: ToolRunPayload, **kwd) -> JobCreateResponse: action = payload.get("action") if action == "rerun": raise Exception("'rerun' action has been deprecated") tool_run_reference = ToolRunReference( payload.get("tool_id"), payload.get("tool_uuid"), payload.get("tool_version") ) tool = validate_tool_for_running(trans, tool_run_reference) # Set running history from payload parameters. # History not set correctly as part of this API call for # dataset upload. if history_id := payload.get("history_id"): history_id = trans.security.decode_id(history_id) if isinstance(history_id, str) else history_id target_history = self.history_manager.get_mutable(history_id, trans.user, current_history=trans.history) else: target_history = None # Set up inputs. inputs = payload.get("inputs", {}) if not isinstance(inputs, dict): raise exceptions.RequestParameterInvalidException(f"inputs invalid {inputs}") # Find files coming in as multipart file data and add to inputs. for k, v in payload.items(): if k.startswith("files_") or k.startswith("__files_"): inputs[k] = v # for inputs that are coming from the Library, copy them into the history self._patch_library_inputs(trans, inputs, target_history) # TODO: encode data ids and decode ids. # TODO: handle dbkeys params = util.Params(inputs, sanitize=False) incoming = params.__dict__ # use_cached_job can be passed in via the top-level payload or among the tool inputs. # I think it should be a top-level parameter, but because the selector is implemented # as a regular tool parameter we accept both. use_cached_job = payload.get("use_cached_job", False) or util.string_as_bool( inputs.get("use_cached_job", "false") ) preferred_object_store_id = payload.get("preferred_object_store_id") credentials_context = payload.get("credentials_context") input_format = str(payload.get("input_format", "legacy")) if input_format not in get_args(InputFormatT): raise exceptions.RequestParameterInvalidException(f"input_format invalid {input_format}") input_format = cast(InputFormatT, input_format) # https://github.com/python/mypy/issues/15106 if "data_manager_mode" in payload: incoming["__data_manager_mode"] = payload["data_manager_mode"] tags = payload.get("__tags") vars = tool.handle_input( trans, incoming, history=target_history, use_cached_job=use_cached_job, input_format=input_format, preferred_object_store_id=preferred_object_store_id, credentials_context=CredentialsContext(root=credentials_context) if credentials_context else None, tags=tags, ) new_pja_flush = False for job in vars.get("jobs", []): if inputs.get("send_email_notification", False): # Unless an anonymous user is invoking this via the API it # should never be an option, but check and enforce that here if trans.user is None: raise exceptions.ToolExecutionError("Anonymously run jobs cannot send an email notification.") else: job_email_action = PostJobAction("EmailAction") job.add_post_job_action(job_email_action) new_pja_flush = True if new_pja_flush: trans.sa_session.commit() return self._handle_inputs_output_to_api_response(trans, tool, target_history, vars) def _handle_inputs_output_to_api_response(self, trans, tool, target_history, vars) -> JobCreateResponse: # TODO: check for errors and ensure that output dataset(s) are available. output_datasets = vars.get("out_data", []) rval: dict[str, Any] = {"outputs": [], "output_collections": [], "jobs": [], "implicit_collections": []} rval["produces_entry_points"] = tool.produces_entry_points if job_errors := vars.get("job_errors", []): # If we are here - some jobs were successfully executed but some failed. # TODO: We should probably alter the response status code and have a component that knows # how to template in things like src and id, so we don't have to rely just on a textual error message. execution_errors = [ ( trans.security.encode_all_ids(api_error_to_dict(exception=e)) if isinstance(e, exceptions.MessageException) else e ) for e in job_errors ] rval["errors"] = execution_errors outputs = rval["outputs"] # TODO:?? poss. only return ids? for output_name, output in output_datasets: output_dict = output.to_dict() # add the output name back into the output data structure # so it's possible to figure out which newly created elements # correspond with which tool file outputs output_dict["output_name"] = output_name outputs.append(output_dict) for job in vars.get("jobs", []): rval["jobs"].append(job.to_dict(view="collection")) for output_name, collection_instance in vars.get("output_collections", []): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, url_builder=trans.url_builder, parent=history, ) output_dict["output_name"] = output_name rval["output_collections"].append(output_dict) for output_name, collection_instance in vars.get("implicit_collections", {}).items(): history = target_history or trans.history output_dict = dictify_dataset_collection_instance( collection_instance, security=trans.security, url_builder=trans.url_builder, parent=history, ) output_dict["output_name"] = output_name rval["implicit_collections"].append(output_dict) trans.security.encode_all_ids(rval, recursive=True) return rval def _search(self, q: str, view: Optional[str]) -> list[str]: """ Perform the search on the given query. Boosts and numer of results are configurable in galaxy.ini file. :param q: the query to search with :type q: str :return: Dictionary containing the tools' ids of the best hits. :return type: dict """ panel_view = view or self.config.default_panel_view results = self.toolbox_search.search( q=q, panel_view=panel_view, config=self.config, ) return results def _patch_library_inputs(self, trans: ProvidesHistoryContext, inputs, target_history): """ Transform inputs from the data library to history items. """ for k, v in inputs.items(): new_value = self._patch_library_dataset(trans, v, target_history) if new_value: v = new_value elif isinstance(v, dict) and "values" in v: for index, value in enumerate(v["values"]): patched = self._patch_library_dataset(trans, value, target_history) if patched: v["values"][index] = patched inputs[k] = v def _patch_library_dataset(self, trans: ProvidesHistoryContext, v, target_history): if isinstance(v, dict) and "id" in v and v.get("src") == "ldda": ldda = trans.sa_session.get(LibraryDatasetDatasetAssociation, self.decode_id(v["id"])) if not ldda: raise exceptions.ObjectNotFound("Could not find library dataset dataset association.") if trans.user_is_admin or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), ldda.dataset ): return ldda.to_history_dataset_association(target_history, add_to_history=True) # # -- Helper methods -- # def _get_tool( self, trans: ProvidesUserContext, id, tool_version=None, tool_uuid=None, user: Optional[User] = None ) -> Tool: tool = trans.app.toolbox.get_tool(id, tool_version) if not tool: if user: # FIXME: id as tool_uuid is for raw_tool_source endpoint, port to fastapi and fix if id == tool_uuid: id = None tool = trans.app.toolbox.get_tool(user=user, tool_id=id, tool_uuid=tool_uuid) if tool: return tool raise exceptions.ObjectNotFound(f"Could not find tool with id '{id or tool_uuid}'.") if not tool.allow_user_access(user): raise exceptions.AuthenticationFailed(f"Access denied, please login for tool with id '{id}'.") return tool def _detect(self, trans: ProvidesUserContext, tool_id): """ Detect whether the tool with the given id is installed. :param tool_id: exact id of the tool :type tool_id: str :return: list with available versions "return type: list """ tools = trans.app.toolbox.get_tool(tool_id, get_all_versions=True) detected_versions = [] if tools: for tool in tools: if tool and tool.allow_user_access(trans.user): detected_versions.append(tool.version) return detected_versions
[docs] def get_tool_icon_path(self, trans, tool_id, tool_version=None) -> Optional[str]: tool = self._get_tool(trans, tool_id, tool_version) if tool and tool.icon: icon_file_path = tool.icon if icon_file_path and tool.tool_dir: # Prevent any path traversal attacks. The icon_src must be in the tool's directory. if not safe_contains(tool.tool_dir, icon_file_path): raise Exception( f"Invalid icon path for tool '{tool_id}'. Path must be within the tool's directory." ) file_path = os.path.join(tool.tool_dir, icon_file_path) if os.path.exists(file_path): return file_path return None