Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.controllers.async

"""
Controller to handle communication of tools of type data_source_async
"""

import logging
from urllib.parse import urlencode

import requests

from galaxy import web
from galaxy.model.base import transaction
from galaxy.util import (
    DEFAULT_SOCKET_TIMEOUT,
    Params,
    unicodify,
)
from galaxy.util.hash_util import hmac_new
from galaxy.webapps.base.controller import BaseUIController

log = logging.getLogger(__name__)


[docs]class ASync(BaseUIController):
[docs] @web.expose def default(self, trans, tool_id=None, data_id=None, data_secret=None, **kwd): """Catches the tool id and redirects as needed""" return self.index(trans, tool_id=tool_id, data_id=data_id, data_secret=data_secret, **kwd)
[docs] @web.expose def index(self, trans, tool_id=None, data_secret=None, **kwd): """Manages ascynchronous connections""" if tool_id is None: return "tool_id argument is required" tool_id = str(tool_id) # redirect to main when getting no parameters if not kwd: return trans.response.send_redirect("/index") params = Params(kwd, sanitize=False) data_id = params.data_id log.debug(f"async dataid -> {data_id}") trans.log_event(f"Async dataid -> {str(data_id)}") # initialize the tool toolbox = self.get_toolbox() tool = toolbox.get_tool(tool_id) if not tool: return f"Tool with id {tool_id} not found" if data_id: # # we have an incoming data_id # data = trans.sa_session.query(trans.model.HistoryDatasetAssociation).get(data_id) if not data: return f"Data {data_id} does not exist or has already been deleted" if data.state in data.dataset.terminal_states: log.debug(f"Tool {tool.id}: execution stopped as data {data_id} has entered terminal state prematurely") trans.log_event( f"Tool {tool.id}: execution stopped as data {data_id} has entered terminal state prematurely" ) return f"Data {data_id} has finished processing before job could be completed" # map params from the tool's <request_param_translation> section; # ignore any other params that may have been passed by the remote # server with the exception of STATUS and URL; # if name, info, dbkey and data_type are not handled via incoming params, # use the metadata from the already existing dataset; # preserve original params under nested dict params_dict = dict( STATUS=params.STATUS, URL=params.URL, name=data.name, info=data.info, dbkey=data.dbkey, data_type=data.ext, incoming_request_params=params.__dict__.copy(), ) if tool.input_translator: tool.input_translator.translate(params) tool_declared_params = { translator.galaxy_name for translator in tool.input_translator.param_trans_dict.values() } for param in params: if param in tool_declared_params or not tool.wants_params_cleaned: params_dict[param] = params.get(param, None) params = params_dict if not params.get("URL"): return f"No URL parameter was submitted for data {data_id}" STATUS = params.get("STATUS") if STATUS == "OK": key = hmac_new(trans.app.config.tool_secret, "%d:%d" % (data.id, data.history_id)) if key != data_secret: return f"You do not have permission to alter data {data_id}." if not params.get("GALAXY_URL"): # provide a fallback for GALAXY_URL params["GALAXY_URL"] = f"{trans.request.url_path}/async/{tool_id}/{data.id}/{key}" # push the job into the queue data.state = data.blurb = data.states.RUNNING log.debug(f"executing tool {tool.id}") trans.log_event(f"Async executing tool {tool.id}", tool_id=tool.id) # Assume there is exactly one output file possible TOOL_OUTPUT_TYPE = None for key, obj in tool.outputs.items(): try: TOOL_OUTPUT_TYPE = obj.format params[key] = data.id break except Exception: # exclude outputs different from ToolOutput (e.g. collections) from the previous assumption continue if TOOL_OUTPUT_TYPE is None: raise Exception("Error: ToolOutput object not found") original_history = trans.sa_session.query(trans.app.model.History).get(data.history_id) job, *_ = tool.execute(trans, incoming=params, history=original_history) trans.app.job_manager.enqueue(job, tool=tool) else: log.debug(f"async error -> {STATUS}") trans.log_event(f"Async error -> {STATUS}") data.state = data.blurb = "error" data.info = f"Error -> {STATUS}" with transaction(trans.sa_session): trans.sa_session.commit() return f"Data {data_id} with status {STATUS} received. OK" else: # no data_id must be parameter submission # # create new dataset, put it into running state, # send request for data to remote server and see if the response # ends in ok; # the request that's getting sent goes to the URL found in # params.URL or, in its absence, to the one found as the value of # the "action" attribute of the data source tool's "inputs" tag. # Included in the request are the parameters: # - data_id, which indicates to the remote server that Galaxy is # ready to accept data # - GALAXY_URL, which takes the form: # {base_url}/async/{tool_id}/{data_id}/{data_secret}, and which # when used by the remote server to send a data download link, # will trigger the if branch above. GALAXY_TYPE = None if params.data_type: GALAXY_TYPE = params.data_type elif params.galaxyFileFormat == "wig": # this is an undocumented legacy special case GALAXY_TYPE = "wig" elif params.GALAXY_TYPE: GALAXY_TYPE = params.GALAXY_TYPE else: # Assume there is exactly one output outputs_count = 0 for obj in tool.outputs.values(): try: GALAXY_TYPE = obj.format outputs_count += 1 except Exception: # exclude outputs different from ToolOutput (e.g. collections) from the previous assumption # a collection object does not have the 'format' attribute, so it will throw an exception continue if outputs_count > 1: raise Exception("Error: the tool should have just one output") if GALAXY_TYPE is None: raise Exception("Error: ToolOutput object not found") GALAXY_NAME = params.name or params.GALAXY_NAME or f"{tool.name} query" GALAXY_INFO = params.info or params.GALAXY_INFO or params.galaxyDescription or "" GALAXY_BUILD = params.dbkey or params.GALAXY_BUILD or params.galaxyFreeze or "?" # data = datatypes.factory(ext=GALAXY_TYPE)() # data.ext = GALAXY_TYPE # data.name = GALAXY_NAME # data.info = GALAXY_INFO # data.dbkey = GALAXY_BUILD # data.state = jobs.JOB_OK # history.datasets.add_dataset( data ) data = trans.app.model.HistoryDatasetAssociation( create_dataset=True, sa_session=trans.sa_session, extension=GALAXY_TYPE ) trans.app.security_agent.set_all_dataset_permissions( data.dataset, trans.app.security_agent.history_get_default_permissions(trans.history) ) data.name = GALAXY_NAME data.dbkey = GALAXY_BUILD data.info = GALAXY_INFO trans.sa_session.add( data ) # Need to add data to session before setting state (setting state requires that the data object is in the session, but this may change) data.state = data.states.NEW trans.history.add_dataset(data, genome_build=GALAXY_BUILD) trans.sa_session.add(trans.history) with transaction(trans.sa_session): trans.sa_session.commit() # Need to explicitly create the file data.dataset.object_store.create(data.dataset) trans.log_event("Added dataset %d to history %d" % (data.id, trans.history.id), tool_id=tool_id) try: key = hmac_new(trans.app.config.tool_secret, "%d:%d" % (data.id, data.history_id)) galaxy_url = f"{trans.request.url_path}/async/{tool_id}/{data.id}/{key}" params.update({"GALAXY_URL": galaxy_url}) params.update({"data_id": data.id}) # Use provided URL or fallback to tool action url = params.URL or tool.action # Does url already have query params? if "?" in url: url_join_char = "&" else: url_join_char = "?" url = f"{url}{url_join_char}{urlencode(params.flatten())}" log.debug(f"connecting to -> {url}") trans.log_event(f"Async connecting to -> {url}") text = requests.get(url, timeout=DEFAULT_SOCKET_TIMEOUT).text.strip() if not text.endswith("OK"): raise Exception(text) data.state = data.blurb = data.states.RUNNING except Exception as e: data.info = unicodify(e) data.state = data.blurb = data.states.ERROR with transaction(trans.sa_session): trans.sa_session.commit() return trans.fill_template("root/tool_runner.mako", out_data={}, num_jobs=1, job_errors=[])