Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.actions.upload

import json
import logging
import os

from galaxy.exceptions import RequestParameterMissingException
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.structure import UninitializedTree
from galaxy.tools.actions import upload_common
from galaxy.util import ExecutionTimer
from galaxy.util.bunch import Bunch
from . import ToolAction

log = logging.getLogger(__name__)


[docs]class BaseUploadToolAction(ToolAction): produces_real_jobs = True
[docs] def execute(self, tool, trans, incoming=None, history=None, **kwargs): trans.check_user_activation() incoming = incoming or {} dataset_upload_inputs = [] for input in tool.inputs.values(): if input.type == "upload_dataset": dataset_upload_inputs.append(input) assert dataset_upload_inputs, Exception("No dataset upload groups were found.") persisting_uploads_timer = ExecutionTimer() incoming = upload_common.persist_uploads(incoming, trans) log.debug(f"Persisted uploads {persisting_uploads_timer}") rval = self._setup_job(tool, trans, incoming, dataset_upload_inputs, history) return rval
def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history): """Take persisted uploads and create a job for given tool.""" def _create_job(self, *args, **kwds): """Wrapper around upload_common.create_job with a timer.""" create_job_timer = ExecutionTimer() rval = upload_common.create_job(*args, **kwds) log.debug(f"Created upload job {create_job_timer}") return rval
[docs]class UploadToolAction(BaseUploadToolAction): def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history): check_timer = ExecutionTimer() uploaded_datasets = upload_common.get_uploaded_datasets( trans, "", incoming, dataset_upload_inputs, history=history ) if not uploaded_datasets: return None, "No data was entered in the upload form, please go back and choose data to upload." json_file_path = upload_common.create_paramfile(trans, uploaded_datasets) data_list = [ud.data for ud in uploaded_datasets] log.debug(f"Checked uploads {check_timer}") return self._create_job(trans, incoming, tool, json_file_path, data_list, history=history)
[docs]class FetchUploadToolAction(BaseUploadToolAction): def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history): # Now replace references in requests with these. files = incoming.get("files", []) files_iter = iter(files) request = json.loads(incoming.get("request_json")) def replace_file_srcs(request_part): if isinstance(request_part, dict): if request_part.get("src", None) == "files": try: path_def = next(files_iter) except StopIteration: path_def = None if path_def is None or path_def["file_data"] is None: raise RequestParameterMissingException( "Failed to find uploaded file matching target with src='files'" ) request_part["path"] = path_def["file_data"]["local_filename"] if "name" not in request_part: request_part["name"] = path_def["file_data"]["filename"] request_part["src"] = "path" else: for value in request_part.values(): replace_file_srcs(value) elif isinstance(request_part, list): for value in request_part: replace_file_srcs(value) replace_file_srcs(request) outputs = [] for target in request.get("targets", []): destination = target.get("destination") destination_type = destination.get("type") # Start by just pre-creating HDAs. if destination_type == "hdas": if target.get("elements_from"): # Dynamic collection required I think. continue _precreate_fetched_hdas(trans, history, target, outputs) if destination_type == "hdca": _precreate_fetched_collection_instance(trans, history, target, outputs) incoming["request_json"] = json.dumps(request) return self._create_job(trans, incoming, tool, None, outputs, history=history)
def _precreate_fetched_hdas(trans, history, target, outputs): for item in target.get("elements", []): name = item.get("name", None) if name is None: src = item.get("src", None) if src == "url": url = item.get("url") if name is None: name = url.split("/")[-1] elif src == "path": path = item["path"] if name is None: name = os.path.basename(path) file_type = item.get("ext", "auto") dbkey = item.get("dbkey", "?") uploaded_dataset = Bunch(type="file", name=name, file_type=file_type, dbkey=dbkey) tag_list = item.get("tags", []) data = upload_common.new_upload( trans, "", uploaded_dataset, library_bunch=None, history=history, tag_list=tag_list ) outputs.append(data) item["object_id"] = data.id def _precreate_fetched_collection_instance(trans, history, target, outputs): collection_type = target.get("collection_type") if not collection_type: # Can't precreate collections of unknown type at this time. return name = target.get("name") if not name: return tags = target.get("tags", []) collections_manager = trans.app.dataset_collection_manager collection_type_description = collections_manager.collection_type_descriptions.for_collection_type(collection_type) structure = UninitializedTree(collection_type_description) hdca = collections_manager.precreate_dataset_collection_instance( trans, history, name, structure=structure, tags=tags ) outputs.append(hdca) # Following flushed needed for an ID. with transaction(trans.sa_session): trans.sa_session.commit() target["destination"]["object_id"] = hdca.id