Source code for galaxy.webapps.galaxy.controllers.async

Upload class

import logging
from urllib.parse import urlencode

import requests

from galaxy import web
from galaxy.model.base import transaction
from galaxy.util import (
from galaxy.util.hash_util import hmac_new
from galaxy.webapps.base.controller import BaseUIController

log = logging.getLogger(__name__)

[docs]class ASync(BaseUIController):
[docs] @web.expose def default(self, trans, tool_id=None, data_id=None, data_secret=None, **kwd): """Catches the tool id and redirects as needed""" return self.index(trans, tool_id=tool_id, data_id=data_id, data_secret=data_secret, **kwd)
[docs] @web.expose def index(self, trans, tool_id=None, data_secret=None, **kwd): """Manages ascynchronous connections""" if tool_id is None: return "tool_id argument is required" tool_id = str(tool_id) # redirect to main when getting no parameters if not kwd: return trans.response.send_redirect("/index") params = Params(kwd, sanitize=False) STATUS = params.STATUS URL = params.URL data_id = params.data_id log.debug(f"async dataid -> {data_id}") trans.log_event(f"Async dataid -> {str(data_id)}") # initialize the tool toolbox = self.get_toolbox() tool = toolbox.get_tool(tool_id) if not tool: return f"Tool with id {tool_id} not found" # # we have an incoming data_id # if data_id: if not URL: return f"No URL parameter was submitted for data {data_id}" data = trans.sa_session.query(trans.model.HistoryDatasetAssociation).get(data_id) if not data: return f"Data {data_id} does not exist or has already been deleted" if STATUS == "OK": key = hmac_new(, "%d:%d" % (, data.history_id)) if key != data_secret: return f"You do not have permission to alter data {data_id}." # push the job into the queue data.state = data.blurb = data.states.RUNNING log.debug(f"executing tool {}") trans.log_event(f"Async executing tool {}", galaxy_url = f"{trans.request.url_path}/async/{tool_id}/{}/{key}" galaxy_url = params.get("GALAXY_URL", galaxy_url) params = dict( URL=URL, GALAXY_URL=galaxy_url,,, dbkey=data.dbkey, data_type=data.ext ) # Assume there is exactly one output file possible TOOL_OUTPUT_TYPE = None for key, obj in tool.outputs.items(): try: TOOL_OUTPUT_TYPE = obj.format params[key] = break except Exception: # exclude outputs different from ToolOutput (e.g. collections) from the previous assumption continue if TOOL_OUTPUT_TYPE is None: raise Exception("Error: ToolOutput object not found") original_history = trans.sa_session.query( job, *_ = tool.execute(trans, incoming=params, history=original_history), tool=tool) else: log.debug(f"async error -> {STATUS}") trans.log_event(f"Async error -> {STATUS}") data.state = data.blurb = "error" = f"Error -> {STATUS}" with transaction(trans.sa_session): trans.sa_session.commit() return f"Data {data_id} with status {STATUS} received. OK" else: # # no data_id must be parameter submission # GALAXY_TYPE = None if params.data_type: GALAXY_TYPE = params.data_type elif params.galaxyFileFormat == "wig": # this is an undocumented legacy special case GALAXY_TYPE = "wig" elif params.GALAXY_TYPE: GALAXY_TYPE = params.GALAXY_TYPE else: # Assume there is exactly one output outputs_count = 0 for obj in tool.outputs.values(): try: GALAXY_TYPE = obj.format outputs_count += 1 except Exception: # exclude outputs different from ToolOutput (e.g. collections) from the previous assumption # a collection object does not have the 'format' attribute, so it will throw an exception continue if outputs_count > 1: raise Exception("Error: the tool should have just one output") if GALAXY_TYPE is None: raise Exception("Error: ToolOutput object not found") GALAXY_NAME = or params.GALAXY_NAME or f"{} query" GALAXY_INFO = or params.GALAXY_INFO or params.galaxyDescription or "" GALAXY_BUILD = params.dbkey or params.GALAXY_BUILD or params.galaxyFreeze or "?" # data = datatypes.factory(ext=GALAXY_TYPE)() # data.ext = GALAXY_TYPE # = GALAXY_NAME # = GALAXY_INFO # data.dbkey = GALAXY_BUILD # data.state = jobs.JOB_OK # history.datasets.add_dataset( data ) data = create_dataset=True, sa_session=trans.sa_session, extension=GALAXY_TYPE ) data.dataset, ) = GALAXY_NAME data.dbkey = GALAXY_BUILD = GALAXY_INFO trans.sa_session.add( data ) # Need to add data to session before setting state (setting state requires that the data object is in the session, but this may change) data.state = data.states.NEW trans.history.add_dataset(data, genome_build=GALAXY_BUILD) trans.sa_session.add(trans.history) with transaction(trans.sa_session): trans.sa_session.commit() # Need to explicitly create the file data.dataset.object_store.create(data.dataset) trans.log_event("Added dataset %d to history %d" % (,, tool_id=tool_id) try: key = hmac_new(, "%d:%d" % (, data.history_id)) galaxy_url = f"{trans.request.url_path}/async/{tool_id}/{}/{key}" params.update({"GALAXY_URL": galaxy_url}) params.update({"data_id":}) # Use provided URL or fallback to tool action url = URL or tool.action # Does url already have query params? if "?" in url: url_join_char = "&" else: url_join_char = "?" url = f"{url}{url_join_char}{urlencode(params.flatten())}" log.debug(f"connecting to -> {url}") trans.log_event(f"Async connecting to -> {url}") text = requests.get(url, timeout=DEFAULT_SOCKET_TIMEOUT).text.strip() if not text.endswith("OK"): raise Exception(text) data.state = data.blurb = data.states.RUNNING except Exception as e: = unicodify(e) data.state = data.blurb = data.states.ERROR with transaction(trans.sa_session): trans.sa_session.commit() return trans.fill_template("root/tool_runner.mako", out_data={}, num_jobs=1, job_errors=[])