Source code for galaxy.tools.actions.upload

import json
import logging
import os

from galaxy.exceptions import RequestParameterMissingException
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.structure import UninitializedTree
from galaxy.tools.actions import upload_common
from galaxy.util import ExecutionTimer
from galaxy.util.bunch import Bunch
from . import ToolAction

log = logging.getLogger(__name__)


[docs]class BaseUploadToolAction(ToolAction): produces_real_jobs = True
[docs] def execute(self, tool, trans, incoming=None, history=None, **kwargs): trans.check_user_activation() incoming = incoming or {} dataset_upload_inputs = [] for input in tool.inputs.values(): if input.type == "upload_dataset": dataset_upload_inputs.append(input) assert dataset_upload_inputs, Exception("No dataset upload groups were found.") persisting_uploads_timer = ExecutionTimer() incoming = upload_common.persist_uploads(incoming, trans) log.debug(f"Persisted uploads {persisting_uploads_timer}") rval = self._setup_job(tool, trans, incoming, dataset_upload_inputs, history) return rval
def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history): """Take persisted uploads and create a job for given tool.""" def _create_job(self, *args, **kwds): """Wrapper around upload_common.create_job with a timer.""" create_job_timer = ExecutionTimer() rval = upload_common.create_job(*args, **kwds) log.debug(f"Created upload job {create_job_timer}") return rval
[docs]class UploadToolAction(BaseUploadToolAction): def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history): check_timer = ExecutionTimer() uploaded_datasets = upload_common.get_uploaded_datasets( trans, "", incoming, dataset_upload_inputs, history=history ) if not uploaded_datasets: return None, "No data was entered in the upload form, please go back and choose data to upload." json_file_path = upload_common.create_paramfile(trans, uploaded_datasets) data_list = [ud.data for ud in uploaded_datasets] log.debug(f"Checked uploads {check_timer}") return self._create_job(trans, incoming, tool, json_file_path, data_list, history=history)
[docs]class FetchUploadToolAction(BaseUploadToolAction): def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history): # Now replace references in requests with these. files = incoming.get("files", []) files_iter = iter(files) request = json.loads(incoming.get("request_json")) def replace_file_srcs(request_part): if isinstance(request_part, dict): if request_part.get("src", None) == "files": try: path_def = next(files_iter) except StopIteration: path_def = None if path_def is None or path_def["file_data"] is None: raise RequestParameterMissingException( "Failed to find uploaded file matching target with src='files'" ) request_part["path"] = path_def["file_data"]["local_filename"] if "name" not in request_part: request_part["name"] = path_def["file_data"]["filename"] request_part["src"] = "path" else: for value in request_part.values(): replace_file_srcs(value) elif isinstance(request_part, list): for value in request_part: replace_file_srcs(value) replace_file_srcs(request) outputs = [] for target in request.get("targets", []): destination = target.get("destination") destination_type = destination.get("type") # Start by just pre-creating HDAs. if destination_type == "hdas": if target.get("elements_from"): # Dynamic collection required I think. continue _precreate_fetched_hdas(trans, history, target, outputs) if destination_type == "hdca": _precreate_fetched_collection_instance(trans, history, target, outputs) incoming["request_json"] = json.dumps(request) return self._create_job(trans, incoming, tool, None, outputs, history=history)
def _precreate_fetched_hdas(trans, history, target, outputs): for item in target.get("elements", []): name = item.get("name", None) if name is None: src = item.get("src", None) if src == "url": url = item.get("url") if name is None: name = url.split("/")[-1] elif src == "path": path = item["path"] if name is None: name = os.path.basename(path) file_type = item.get("ext", "auto") dbkey = item.get("dbkey", "?") uploaded_dataset = Bunch(type="file", name=name, file_type=file_type, dbkey=dbkey) tag_list = item.get("tags", []) data = upload_common.new_upload( trans, "", uploaded_dataset, library_bunch=None, history=history, tag_list=tag_list ) outputs.append(data) item["object_id"] = data.id def _precreate_fetched_collection_instance(trans, history, target, outputs): collection_type = target.get("collection_type") if not collection_type: # Can't precreate collections of unknown type at this time. return name = target.get("name") if not name: return tags = target.get("tags", []) collections_manager = trans.app.dataset_collection_manager collection_type_description = collections_manager.collection_type_descriptions.for_collection_type(collection_type) structure = UninitializedTree(collection_type_description) hdca = collections_manager.precreate_dataset_collection_instance( trans, history, name, structure=structure, tags=tags ) outputs.append(hdca) # Following flushed needed for an ID. with transaction(trans.sa_session): trans.sa_session.commit() target["destination"]["object_id"] = hdca.id