Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.controllers.async

"""
Upload class
"""

import logging
from urllib.parse import urlencode

import requests

from galaxy import web
from galaxy.model.base import transaction
from galaxy.util import (
    DEFAULT_SOCKET_TIMEOUT,
    Params,
    unicodify,
)
from galaxy.util.hash_util import hmac_new
from galaxy.webapps.base.controller import BaseUIController

log = logging.getLogger(__name__)


[docs]class ASync(BaseUIController):
[docs] @web.expose def default(self, trans, tool_id=None, data_id=None, data_secret=None, **kwd): """Catches the tool id and redirects as needed""" return self.index(trans, tool_id=tool_id, data_id=data_id, data_secret=data_secret, **kwd)
[docs] @web.expose def index(self, trans, tool_id=None, data_secret=None, **kwd): """Manages ascynchronous connections""" if tool_id is None: return "tool_id argument is required" tool_id = str(tool_id) # redirect to main when getting no parameters if not kwd: return trans.response.send_redirect("/index") params = Params(kwd, sanitize=False) STATUS = params.STATUS URL = params.URL data_id = params.data_id log.debug(f"async dataid -> {data_id}") trans.log_event(f"Async dataid -> {str(data_id)}") # initialize the tool toolbox = self.get_toolbox() tool = toolbox.get_tool(tool_id) if not tool: return f"Tool with id {tool_id} not found" # # we have an incoming data_id # if data_id: if not URL: return f"No URL parameter was submitted for data {data_id}" data = trans.sa_session.query(trans.model.HistoryDatasetAssociation).get(data_id) if not data: return f"Data {data_id} does not exist or has already been deleted" if STATUS == "OK": key = hmac_new(trans.app.config.tool_secret, "%d:%d" % (data.id, data.history_id)) if key != data_secret: return f"You do not have permission to alter data {data_id}." # push the job into the queue data.state = data.blurb = data.states.RUNNING log.debug(f"executing tool {tool.id}") trans.log_event(f"Async executing tool {tool.id}", tool_id=tool.id) galaxy_url = f"{trans.request.url_path}/async/{tool_id}/{data.id}/{key}" galaxy_url = params.get("GALAXY_URL", galaxy_url) params = dict( URL=URL, GALAXY_URL=galaxy_url, name=data.name, info=data.info, dbkey=data.dbkey, data_type=data.ext ) # Assume there is exactly one output file possible TOOL_OUTPUT_TYPE = None for key, obj in tool.outputs.items(): try: TOOL_OUTPUT_TYPE = obj.format params[key] = data.id break except Exception: # exclude outputs different from ToolOutput (e.g. collections) from the previous assumption continue if TOOL_OUTPUT_TYPE is None: raise Exception("Error: ToolOutput object not found") original_history = trans.sa_session.query(trans.app.model.History).get(data.history_id) job, *_ = tool.execute(trans, incoming=params, history=original_history) trans.app.job_manager.enqueue(job, tool=tool) else: log.debug(f"async error -> {STATUS}") trans.log_event(f"Async error -> {STATUS}") data.state = data.blurb = "error" data.info = f"Error -> {STATUS}" with transaction(trans.sa_session): trans.sa_session.commit() return f"Data {data_id} with status {STATUS} received. OK" else: # # no data_id must be parameter submission # GALAXY_TYPE = None if params.data_type: GALAXY_TYPE = params.data_type elif params.galaxyFileFormat == "wig": # this is an undocumented legacy special case GALAXY_TYPE = "wig" elif params.GALAXY_TYPE: GALAXY_TYPE = params.GALAXY_TYPE else: # Assume there is exactly one output outputs_count = 0 for obj in tool.outputs.values(): try: GALAXY_TYPE = obj.format outputs_count += 1 except Exception: # exclude outputs different from ToolOutput (e.g. collections) from the previous assumption # a collection object does not have the 'format' attribute, so it will throw an exception continue if outputs_count > 1: raise Exception("Error: the tool should have just one output") if GALAXY_TYPE is None: raise Exception("Error: ToolOutput object not found") GALAXY_NAME = params.name or params.GALAXY_NAME or f"{tool.name} query" GALAXY_INFO = params.info or params.GALAXY_INFO or params.galaxyDescription or "" GALAXY_BUILD = params.dbkey or params.GALAXY_BUILD or params.galaxyFreeze or "?" # data = datatypes.factory(ext=GALAXY_TYPE)() # data.ext = GALAXY_TYPE # data.name = GALAXY_NAME # data.info = GALAXY_INFO # data.dbkey = GALAXY_BUILD # data.state = jobs.JOB_OK # history.datasets.add_dataset( data ) data = trans.app.model.HistoryDatasetAssociation( create_dataset=True, sa_session=trans.sa_session, extension=GALAXY_TYPE ) trans.app.security_agent.set_all_dataset_permissions( data.dataset, trans.app.security_agent.history_get_default_permissions(trans.history) ) data.name = GALAXY_NAME data.dbkey = GALAXY_BUILD data.info = GALAXY_INFO trans.sa_session.add( data ) # Need to add data to session before setting state (setting state requires that the data object is in the session, but this may change) data.state = data.states.NEW trans.history.add_dataset(data, genome_build=GALAXY_BUILD) trans.sa_session.add(trans.history) with transaction(trans.sa_session): trans.sa_session.commit() # Need to explicitly create the file data.dataset.object_store.create(data.dataset) trans.log_event("Added dataset %d to history %d" % (data.id, trans.history.id), tool_id=tool_id) try: key = hmac_new(trans.app.config.tool_secret, "%d:%d" % (data.id, data.history_id)) galaxy_url = f"{trans.request.url_path}/async/{tool_id}/{data.id}/{key}" params.update({"GALAXY_URL": galaxy_url}) params.update({"data_id": data.id}) # Use provided URL or fallback to tool action url = URL or tool.action # Does url already have query params? if "?" in url: url_join_char = "&" else: url_join_char = "?" url = f"{url}{url_join_char}{urlencode(params.flatten())}" log.debug(f"connecting to -> {url}") trans.log_event(f"Async connecting to -> {url}") text = requests.get(url, timeout=DEFAULT_SOCKET_TIMEOUT).text.strip() if not text.endswith("OK"): raise Exception(text) data.state = data.blurb = data.states.RUNNING except Exception as e: data.info = unicodify(e) data.state = data.blurb = data.states.ERROR with transaction(trans.sa_session): trans.sa_session.commit() return trans.fill_template("root/tool_runner.mako", out_data={}, num_jobs=1, job_errors=[])