Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.data_fetch

import argparse
import errno
import json
import os
import shutil
import sys
import tempfile

import bdbag.bdbag_api
from six.moves import StringIO

from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
    handle_upload,
    UploadProblemException,
)
from galaxy.util import in_directory, safe_makedirs
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import HASH_NAMES, memory_bound_hexdigest

DESCRIPTION = """Data Import Script"""


[docs]def main(argv=None): if argv is None: argv = sys.argv[1:] args = _arg_parser().parse_args(argv) registry = Registry() registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry) request_path = args.request assert os.path.exists(request_path) with open(request_path) as f: request = json.load(f) working_directory = args.working_directory or os.getcwd() allow_failed_collections = request.get("allow_failed_collections", False) upload_config = UploadConfig(request, registry, working_directory, allow_failed_collections) galaxy_json = _request_to_galaxy_json(upload_config, request) galaxy_json_path = os.path.join(working_directory, "galaxy.json") with open(galaxy_json_path, "w") as f: json.dump(galaxy_json, f)
def _request_to_galaxy_json(upload_config, request): targets = request.get("targets", []) fetched_targets = [] for target in targets: fetched_target = _fetch_target(upload_config, target) fetched_targets.append(fetched_target) return {"__unnamed_outputs": fetched_targets} def _fetch_target(upload_config, target): destination = target.get("destination", None) assert destination, "No destination defined." def expand_elements_from(target_or_item): elements_from = target_or_item.get("elements_from", None) items = None if elements_from: if elements_from == "archive": decompressed_directory = _decompress_target(upload_config, target_or_item) items = _directory_to_items(decompressed_directory) elif elements_from == "bagit": _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False) items = _bagit_to_items(elements_from_path) elif elements_from == "bagit_archive": decompressed_directory = _decompress_target(upload_config, target_or_item) items = _bagit_to_items(decompressed_directory) elif elements_from == "directory": _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False) items = _directory_to_items(elements_from_path) else: raise Exception("Unknown elements from type encountered [%s]" % elements_from) if items: del target_or_item["elements_from"] target_or_item["elements"] = items expansion_error = None try: _for_each_src(expand_elements_from, target) except Exception as e: expansion_error = "Error expanding elements/items for upload destination. %s" % str(e) if expansion_error is None: items = target.get("elements", None) assert items is not None, "No element definition found for destination [%s]" % destination else: items = [] fetched_target = {} fetched_target["destination"] = destination destination_type = destination["type"] is_collection = destination_type == "hdca" failed_elements = [] if "collection_type" in target: fetched_target["collection_type"] = target["collection_type"] if "name" in target: fetched_target["name"] = target["name"] def _copy_and_validate_simple_attributes(src_item, target_metadata): info = src_item.get("info", None) created_from_basename = src_item.get("created_from_basename", None) tags = src_item.get("tags", []) object_id = src_item.get("object_id", None) if info is not None: target_metadata["info"] = info if object_id is not None: target_metadata["object_id"] = object_id if tags: target_metadata["tags"] = tags if created_from_basename: target_metadata["created_from_basename"] = created_from_basename if "error_message" in src_item: target_metadata["error_message"] = src_item["error_message"] return target_metadata def _resolve_item(item): # Might be a dataset or a composite upload. requested_ext = item.get("ext", None) registry = upload_config.registry datatype = registry.get_datatype_by_extension(requested_ext) composite = item.pop("composite", None) if datatype and datatype.composite_type: composite_type = datatype.composite_type writable_files = datatype.writable_files assert composite_type == "auto_primary_file", "basic composite uploads not yet implemented" # get_composite_dataset_name finds dataset name from basename of contents # and such but we're not implementing that here yet. yagni? # also need name... dataset_bunch = Bunch() name = item.get("name") or 'Composite Dataset' dataset_bunch.name = name primary_file = sniff.stream_to_file(StringIO(datatype.generate_primary_file(dataset_bunch)), prefix='upload_auto_primary_file', dir=".") extra_files_path = primary_file + "_extra" os.mkdir(extra_files_path) rval = { "name": name, "filename": primary_file, "ext": requested_ext, "link_data_only": False, "sources": [], "hashes": [], "extra_files": extra_files_path, } _copy_and_validate_simple_attributes(item, rval) composite_items = composite.get("elements", []) keys = [value.name for value in writable_files.values()] composite_item_idx = 0 for composite_item in composite_items: if composite_item_idx >= len(keys): # raise exception - too many files? pass key = keys[composite_item_idx] writable_file = writable_files[key] _, src_target = _has_src_to_path(upload_config, composite_item) # do the writing sniff.handle_composite_file( datatype, src_target, extra_files_path, key, writable_file.is_binary, ".", os.path.basename(extra_files_path) + "_", composite_item, ) composite_item_idx += 1 writable_files_idx = composite_item_idx while writable_files_idx < len(keys): key = keys[writable_files_idx] writable_file = writable_files[key] if not writable_file.optional: # raise Exception, non-optional file missing pass writable_files_idx += 1 return rval else: if composite: raise Exception("Non-composite datatype [%s] attempting to be created with composite data." % datatype) return _resolve_item_with_primary(item) def _resolve_item_with_primary(item): error_message = None converted_path = None name, path = _has_src_to_path(upload_config, item, is_dataset=True) sources = [] url = item.get("url") if url: sources.append({"source_uri": url}) hashes = item.get("hashes", []) for hash_dict in hashes: hash_function = hash_dict.get("hash_function") hash_value = hash_dict.get("hash_value") try: _handle_hash_validation(upload_config, hash_function, hash_value, path) except Exception as e: error_message = str(e) item["error_message"] = error_message dbkey = item.get("dbkey", "?") link_data_only = upload_config.link_data_only if "link_data_only" in item: # Allow overriding this on a per file basis. link_data_only = _link_data_only(item) ext = "data" staged_extra_files = None if not error_message: requested_ext = item.get("ext", "auto") to_posix_lines = upload_config.get_option(item, "to_posix_lines") space_to_tab = upload_config.get_option(item, "space_to_tab") auto_decompress = upload_config.get_option(item, "auto_decompress") in_place = item.get("in_place", False) purge_source = item.get("purge_source", True) registry = upload_config.registry check_content = upload_config.check_content stdout, ext, datatype, is_binary, converted_path = handle_upload( registry=registry, path=path, requested_ext=requested_ext, name=name, tmp_prefix='data_fetch_upload_', tmp_dir=".", check_content=check_content, link_data_only=link_data_only, in_place=in_place, auto_decompress=auto_decompress, convert_to_posix_lines=to_posix_lines, convert_spaces_to_tabs=space_to_tab, ) if link_data_only: # Never alter a file that will not be copied to Galaxy's local file store. if datatype.dataset_content_needs_grooming(path): err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \ '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.' raise UploadProblemException(err_msg) # If this file is not in the workdir make sure it gets there. if not link_data_only and converted_path: path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place) elif not link_data_only: path = upload_config.ensure_in_working_directory(path, purge_source, in_place) extra_files = item.get("extra_files") if extra_files: # TODO: optimize to just copy the whole directory to extra files instead. assert not upload_config.link_data_only, "linking composite dataset files not yet implemented" extra_files_path = path + "_extra" staged_extra_files = extra_files_path os.mkdir(extra_files_path) def walk_extra_files(items, prefix=""): for item in items: if "elements" in item: name = item.get("name") if not prefix: item_prefix = name else: item_prefix = os.path.join(prefix, name) walk_extra_files(item.get("elements"), prefix=item_prefix) else: name, src_path = _has_src_to_path(upload_config, item) if prefix: rel_path = os.path.join(prefix, name) else: rel_path = name file_output_path = os.path.join(staged_extra_files, rel_path) parent_dir = os.path.dirname(file_output_path) if not os.path.exists(parent_dir): safe_makedirs(parent_dir) shutil.move(src_path, file_output_path) walk_extra_files(extra_files.get("elements", [])) # TODO: # in galaxy json add 'extra_files' and point at target derived from extra_files: if not link_data_only and datatype and datatype.dataset_content_needs_grooming(path): # Groom the dataset content if necessary datatype.groom_dataset_content(path) rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only, "sources": sources, "hashes": hashes} if staged_extra_files: rval["extra_files"] = os.path.abspath(staged_extra_files) return _copy_and_validate_simple_attributes(item, rval) def _resolve_item_capture_error(item): try: return _resolve_item(item) except Exception as e: rval = {"error_message": str(e)} rval = _copy_and_validate_simple_attributes(item, rval) failed_elements.append(rval) return rval if expansion_error is None: elements = elements_tree_map(_resolve_item_capture_error, items) if is_collection and not upload_config.allow_failed_collections and len(failed_elements) > 0: element_error = "Failed to fetch collection element(s):\n" for failed_element in failed_elements: element_error += "\n- %s" % failed_element["error_message"] fetched_target["error_message"] = element_error fetched_target["elements"] = None else: fetched_target["elements"] = elements else: fetched_target["elements"] = [] fetched_target["error_message"] = expansion_error return fetched_target def _bagit_to_items(directory): bdbag.bdbag_api.resolve_fetch(directory) bdbag.bdbag_api.validate_bag(directory) items = _directory_to_items(os.path.join(directory, "data")) return items def _decompress_target(upload_config, target): elements_from_name, elements_from_path = _has_src_to_path(upload_config, target, is_dataset=False) # by default Galaxy will check for a directory with a single file and interpret that # as the new root for expansion, this is a good user experience for uploading single # files in a archive but not great from an API perspective. Allow disabling by setting # fuzzy_root to False to literally interpret the target. fuzzy_root = target.get("fuzzy_root", True) temp_directory = os.path.abspath(tempfile.mkdtemp(prefix=elements_from_name, dir=".")) cf = CompressedFile(elements_from_path) result = cf.extract(temp_directory) return result if fuzzy_root else temp_directory
[docs]def elements_tree_map(f, items): new_items = [] for item in items: if "elements" in item: new_item = item.copy() new_item["elements"] = elements_tree_map(f, item["elements"]) new_items.append(new_item) else: new_items.append(f(item)) return new_items
def _directory_to_items(directory): items = [] dir_elements = {} for root, dirs, files in os.walk(directory): if root in dir_elements: target = dir_elements[root] else: target = items for dir in sorted(dirs): dir_dict = {"name": dir, "elements": []} dir_elements[os.path.join(root, dir)] = dir_dict["elements"] target.append(dir_dict) for file in sorted(files): target.append({"src": "path", "path": os.path.join(root, file)}) return items def _has_src_to_path(upload_config, item, is_dataset=False): assert "src" in item, item src = item.get("src") name = item.get("name") if src == "url": url = item.get("url") try: path = sniff.stream_url_to_file(url, file_sources=get_file_sources(upload_config.working_directory)) except Exception as e: raise Exception("Failed to fetch url %s. %s" % (url, str(e))) if not is_dataset: # Actual target dataset will validate and put results in dict # that gets passed back to Galaxy. for hash_function in HASH_NAMES: hash_value = item.get(hash_function) if hash_value: _handle_hash_validation(upload_config, hash_function, hash_value, path) if name is None: name = url.split("/")[-1] elif src == "pasted": path = sniff.stream_to_file(StringIO(item["paste_content"])) if name is None: name = "Pasted Entry" else: assert src == "path" path = item["path"] if name is None: name = os.path.basename(path) return name, path def _handle_hash_validation(upload_config, hash_function, hash_value, path): if upload_config.validate_hashes: calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path) if calculated_hash_value != hash_value: raise Exception("Failed to validate upload with [{}] - expected [{}] got [{}]".format(hash_function, hash_value, calculated_hash_value)) def _arg_parser(): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument("--galaxy-root") parser.add_argument("--datatypes-registry") parser.add_argument("--request-version") parser.add_argument("--request") parser.add_argument("--working-directory") return parser _file_sources = None
[docs]def get_file_sources(working_directory): global _file_sources if _file_sources is None: from galaxy.files import ConfiguredFileSources file_sources = None file_sources_path = os.path.join(working_directory, "file_sources.json") if os.path.exists(file_sources_path): file_sources_as_dict = None with open(file_sources_path) as f: file_sources_as_dict = json.load(f) if file_sources_as_dict is not None: file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) if file_sources is None: ConfiguredFileSources.from_dict(None) _file_sources = file_sources return _file_sources
[docs]class UploadConfig:
[docs] def __init__(self, request, registry, working_directory, allow_failed_collections): self.registry = registry self.working_directory = working_directory self.allow_failed_collections = allow_failed_collections self.check_content = request.get("check_content" , True) self.to_posix_lines = request.get("to_posix_lines", False) self.space_to_tab = request.get("space_to_tab", False) self.auto_decompress = request.get("auto_decompress", False) self.validate_hashes = request.get("validate_hashes", False) self.link_data_only = _link_data_only(request) self.__workdir = os.path.abspath(".") self.__upload_count = 0
[docs] def get_option(self, item, key): """Return item[key] if specified otherwise use default from UploadConfig. This default represents the default for the whole request instead item which is the option for individual files. """ if key in item: return item[key] else: return getattr(self, key)
def __new_dataset_path(self): path = "gxupload_%d" % self.__upload_count self.__upload_count += 1 return path
[docs] def ensure_in_working_directory(self, path, purge_source, in_place): if in_directory(path, self.__workdir): return path new_path = self.__new_dataset_path() if purge_source: try: shutil.move(path, new_path) except OSError as e: # We may not have permission to remove converted_path if e.errno != errno.EACCES: raise else: shutil.copy(path, new_path) return new_path
def _link_data_only(has_config_dict): link_data_only = has_config_dict.get("link_data_only", False) if not isinstance(link_data_only, bool): # Allow the older string values of 'copy_files' and 'link_to_files' link_data_only = link_data_only == "copy_files" return link_data_only def _for_each_src(f, obj): if isinstance(obj, list): for item in obj: _for_each_src(f, item) if isinstance(obj, dict): if "src" in obj: f(obj) for key, value in obj.items(): _for_each_src(f, value) if __name__ == "__main__": main()