Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.tools.data_fetch

import argparse
import errno
import json
import os
import shutil
import sys
import tempfile
from io import StringIO
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
)

import bdbag.bdbag_api

from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
    handle_upload,
    UploadProblemException,
)
from galaxy.files.uris import (
    stream_to_file,
    stream_url_to_file,
)
from galaxy.util import (
    in_directory,
    safe_makedirs,
)
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import (
    HASH_NAMES,
    memory_bound_hexdigest,
)

DESCRIPTION = """Data Import Script"""


[docs]def main(argv=None): if argv is None: argv = sys.argv[1:] args = _arg_parser().parse_args(argv) registry = Registry() registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry) do_fetch(args.request, working_directory=args.working_directory or os.getcwd(), registry=registry)
[docs]def do_fetch( request_path: str, working_directory: str, registry: Registry, file_sources_dict: Optional[Dict] = None, ): assert os.path.exists(request_path) with open(request_path) as f: request = json.load(f) allow_failed_collections = request.get("allow_failed_collections", False) upload_config = UploadConfig( request, registry, working_directory, allow_failed_collections, file_sources_dict, ) galaxy_json = _request_to_galaxy_json(upload_config, request) galaxy_json_path = os.path.join(working_directory, "galaxy.json") with open(galaxy_json_path, "w") as f: json.dump(galaxy_json, f) return working_directory
def _request_to_galaxy_json(upload_config: "UploadConfig", request): targets = request.get("targets", []) fetched_targets = [] for target in targets: fetched_target = _fetch_target(upload_config, target) fetched_targets.append(fetched_target) return {"__unnamed_outputs": fetched_targets} def _fetch_target(upload_config: "UploadConfig", target): destination = target.get("destination", None) assert destination, "No destination defined." def expand_elements_from(target_or_item): items = None if elements_from := target_or_item.get("elements_from", None): if elements_from == "archive": decompressed_directory = _decompress_target(upload_config, target_or_item) items = _directory_to_items(decompressed_directory) elif elements_from == "bagit": _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False) items = _bagit_to_items(elements_from_path) elif elements_from == "bagit_archive": decompressed_directory = _decompress_target(upload_config, target_or_item) items = _bagit_to_items(decompressed_directory) elif elements_from == "directory": _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False) items = _directory_to_items(elements_from_path) else: raise Exception(f"Unknown elements from type encountered [{elements_from}]") if items: del target_or_item["elements_from"] target_or_item["elements"] = items expansion_error = None try: _for_each_src(expand_elements_from, target) except Exception as e: expansion_error = f"Error expanding elements/items for upload destination. {str(e)}" if expansion_error is None: items = target.get("elements", None) assert items is not None, f"No element definition found for destination [{destination}]" else: items = [] fetched_target = {} fetched_target["destination"] = destination destination_type = destination["type"] is_collection = destination_type == "hdca" failed_elements = [] if "collection_type" in target: fetched_target["collection_type"] = target["collection_type"] if "name" in target: fetched_target["name"] = target["name"] def _copy_and_validate_simple_attributes(src_item, target_metadata): info = src_item.get("info", None) created_from_basename = src_item.get("created_from_basename", None) tags = src_item.get("tags", []) if info is not None: target_metadata["info"] = info if (object_id := src_item.get("object_id", None)) is not None: target_metadata["object_id"] = object_id if tags: target_metadata["tags"] = tags if created_from_basename: target_metadata["created_from_basename"] = created_from_basename if "error_message" in src_item: target_metadata["error_message"] = src_item["error_message"] return target_metadata def _resolve_item(item): # Might be a dataset or a composite upload. requested_ext = item.get("ext", None) registry = upload_config.registry datatype = registry.get_datatype_by_extension(requested_ext) composite = item.pop("composite", None) if datatype and datatype.composite_type: composite_type = datatype.composite_type assert composite_type == "auto_primary_file", "basic composite uploads not yet implemented" # get_composite_dataset_name finds dataset name from basename of contents # and such but we're not implementing that here yet. yagni? # also need name... metadata = { composite_file.substitute_name_with_metadata: datatype.metadata_spec[ composite_file.substitute_name_with_metadata ].default for composite_file in datatype.composite_files.values() if composite_file.substitute_name_with_metadata } name = item.get("name") or "Composite Dataset" metadata["base_name"] = name dataset = Bunch( name=name, metadata=metadata, ) writable_files = datatype.get_writable_files_for_dataset(dataset) primary_file = stream_to_file( StringIO(datatype.generate_primary_file(dataset)), prefix="upload_auto_primary_file", dir=upload_config.working_directory, ) extra_files_path = f"{primary_file}_extra" os.mkdir(extra_files_path) rval: Dict[str, Any] = { "name": name, "filename": primary_file, "ext": requested_ext, "link_data_only": False, "sources": [], "hashes": [], "extra_files": extra_files_path, } _copy_and_validate_simple_attributes(item, rval) composite_items = composite.get("elements", []) keys = list(writable_files.keys()) composite_item_idx = 0 for composite_item in composite_items: if composite_item_idx >= len(keys): # raise exception - too many files? pass key = keys[composite_item_idx] writable_file = writable_files[key] _, src_target = _has_src_to_path(upload_config, composite_item) # do the writing sniff.handle_composite_file( datatype, src_target, extra_files_path, key, writable_file.is_binary, upload_config.working_directory, f"{os.path.basename(extra_files_path)}_", composite_item, ) composite_item_idx += 1 writable_files_idx = composite_item_idx while writable_files_idx < len(keys): key = keys[writable_files_idx] writable_file = writable_files[key] if not writable_file.optional: # raise Exception, non-optional file missing pass writable_files_idx += 1 return rval else: if composite: raise Exception(f"Non-composite datatype [{datatype}] attempting to be created with composite data.") return _resolve_item_with_primary(item) def _resolve_item_with_primary(item): error_message = None converted_path = None deferred = upload_config.get_option(item, "deferred") name: str path: Optional[str] if not deferred: name, path = _has_src_to_path(upload_config, item, is_dataset=True) else: name, path = _has_src_to_name(item) or "Deferred Dataset", None sources = [] url = item.get("url") source_dict = {"source_uri": url} if url: sources.append(source_dict) hashes = item.get("hashes", []) for hash_dict in hashes: hash_function = hash_dict.get("hash_function") hash_value = hash_dict.get("hash_value") try: _handle_hash_validation(upload_config, hash_function, hash_value, path) except Exception as e: error_message = str(e) item["error_message"] = error_message dbkey = item.get("dbkey", "?") link_data_only = upload_config.link_data_only if "link_data_only" in item: # Allow overriding this on a per file basis. link_data_only = _link_data_only(item) ext = "data" staged_extra_files = None requested_ext = item.get("ext", "auto") to_posix_lines = upload_config.get_option(item, "to_posix_lines") space_to_tab = upload_config.get_option(item, "space_to_tab") auto_decompress = upload_config.get_option(item, "auto_decompress") effective_state = "ok" if not deferred and not error_message: in_place = item.get("in_place", False) purge_source = item.get("purge_source", True) registry = upload_config.registry check_content = upload_config.check_content assert path # if deferred won't be in this branch. stdout, ext, datatype, is_binary, converted_path, converted_newlines, converted_spaces = handle_upload( registry=registry, path=path, requested_ext=requested_ext, name=name, tmp_prefix="data_fetch_upload_", tmp_dir=upload_config.working_directory, check_content=check_content, link_data_only=link_data_only, in_place=in_place, auto_decompress=auto_decompress, convert_to_posix_lines=to_posix_lines, convert_spaces_to_tabs=space_to_tab, ) transform = [] if converted_newlines: transform.append({"action": "to_posix_lines"}) if converted_spaces: transform.append({"action": "spaces_to_tabs"}) if link_data_only: # Never alter a file that will not be copied to Galaxy's local file store. if datatype.dataset_content_needs_grooming(path): err_msg = ( "The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be " + "<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed." ) raise UploadProblemException(err_msg) # If this file is not in the workdir make sure it gets there. if not link_data_only and converted_path: path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place) elif not link_data_only: path = upload_config.ensure_in_working_directory(path, purge_source, in_place) extra_files = item.get("extra_files") if extra_files: # TODO: optimize to just copy the whole directory to extra files instead. assert not upload_config.link_data_only, "linking composite dataset files not yet implemented" extra_files_path = f"{path}_extra" staged_extra_files = extra_files_path os.mkdir(extra_files_path) def walk_extra_files(items, prefix=""): for item in items: if "elements" in item: name = item.get("name") if not prefix: item_prefix = name else: item_prefix = os.path.join(prefix, name) walk_extra_files(item.get("elements"), prefix=item_prefix) else: src_name, src_path = _has_src_to_path(upload_config, item) if prefix: rel_path = os.path.join(prefix, src_name) else: rel_path = src_name file_output_path = os.path.join(extra_files_path, rel_path) parent_dir = os.path.dirname(file_output_path) if not os.path.exists(parent_dir): safe_makedirs(parent_dir) shutil.move(src_path, file_output_path) walk_extra_files(extra_files.get("elements", [])) # TODO: # in galaxy json add 'extra_files' and point at target derived from extra_files: needs_grooming = not link_data_only and datatype and datatype.dataset_content_needs_grooming(path) # type: ignore[arg-type] if needs_grooming: # Groom the dataset content if necessary transform.append( {"action": "datatype_groom", "datatype_ext": ext, "datatype_class": datatype.__class__.__name__} ) assert path datatype.groom_dataset_content(path) if len(transform) > 0: source_dict["transform"] = transform elif not error_message: transform = [] if to_posix_lines: transform.append({"action": "to_posix_lines"}) if space_to_tab: transform.append({"action": "spaces_to_tabs"}) effective_state = "deferred" registry = upload_config.registry ext = sniff.guess_ext_from_file_name(name, registry=registry, requested_ext=requested_ext) rval = { "name": name, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only, "sources": sources, "hashes": hashes, "info": f"uploaded {ext} file", "state": effective_state, } if path: rval["filename"] = path if staged_extra_files: rval["extra_files"] = os.path.abspath(staged_extra_files) return _copy_and_validate_simple_attributes(item, rval) def _resolve_item_capture_error(item): try: return _resolve_item(item) except Exception as e: rval = {"error_message": str(e)} rval = _copy_and_validate_simple_attributes(item, rval) failed_elements.append(rval) return rval if expansion_error is None: elements = elements_tree_map(_resolve_item_capture_error, items) if is_collection and not upload_config.allow_failed_collections and len(failed_elements) > 0: element_error = "Failed to fetch collection element(s):\n" for failed_element in failed_elements: element_error += f"\n- {failed_element['error_message']}" fetched_target["error_message"] = element_error fetched_target["elements"] = None else: fetched_target["elements"] = elements else: fetched_target["elements"] = [] fetched_target["error_message"] = expansion_error return fetched_target def _bagit_to_items(directory): bdbag.bdbag_api.resolve_fetch(directory) bdbag.bdbag_api.validate_bag(directory) items = _directory_to_items(os.path.join(directory, "data")) return items def _decompress_target(upload_config: "UploadConfig", target): elements_from_name, elements_from_path = _has_src_to_path(upload_config, target, is_dataset=False) # by default Galaxy will check for a directory with a single file and interpret that # as the new root for expansion, this is a good user experience for uploading single # files in a archive but not great from an API perspective. Allow disabling by setting # fuzzy_root to False to literally interpret the target. fuzzy_root = target.get("fuzzy_root", True) temp_directory = os.path.abspath(tempfile.mkdtemp(prefix=elements_from_name, dir=upload_config.working_directory)) cf = CompressedFile(elements_from_path) result = cf.extract(temp_directory) return result if fuzzy_root else temp_directory
[docs]def elements_tree_map(f, items): new_items = [] for item in items: if "elements" in item: new_item = item.copy() new_item["elements"] = elements_tree_map(f, item["elements"]) new_items.append(new_item) else: new_items.append(f(item)) return new_items
def _directory_to_items(directory): items: List[Dict[str, Any]] = [] dir_elements: Dict[str, Any] = {} for root, dirs, files in os.walk(directory): if root in dir_elements: target = dir_elements[root] else: target = items for dir in sorted(dirs): dir_dict = {"name": dir, "elements": []} dir_elements[os.path.join(root, dir)] = dir_dict["elements"] target.append(dir_dict) for file in sorted(files): target.append({"src": "path", "path": os.path.join(root, file)}) return items def _has_src_to_name(item) -> Optional[str]: # Logic should broadly match logic of _has_src_to_path but not resolve the item # into a path. name = item.get("name") src = item.get("src") if src == "url": url = item.get("url") if name is None: name = url.split("/")[-1] elif src == "path": path = item["path"] if name is None: name = os.path.basename(path) return name def _has_src_to_path(upload_config, item, is_dataset=False) -> Tuple[str, str]: assert "src" in item, item src = item.get("src") name = item.get("name") if src == "url": url = item.get("url") try: path = stream_url_to_file(url, file_sources=upload_config.file_sources, dir=upload_config.working_directory) except Exception as e: raise Exception(f"Failed to fetch url {url}. {str(e)}") if not is_dataset: # Actual target dataset will validate and put results in dict # that gets passed back to Galaxy. for hash_function in HASH_NAMES: hash_value = item.get(hash_function) if hash_value: _handle_hash_validation(upload_config, hash_function, hash_value, path) if name is None: name = url.split("/")[-1] elif src == "pasted": path = stream_to_file(StringIO(item["paste_content"]), dir=upload_config.working_directory) if name is None: name = "Pasted Entry" else: assert src == "path" path = item["path"] if name is None: name = os.path.basename(path) return name, path def _handle_hash_validation(upload_config, hash_function, hash_value, path): if upload_config.validate_hashes: calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path) if calculated_hash_value != hash_value: raise Exception( f"Failed to validate upload with [{hash_function}] - expected [{hash_value}] got [{calculated_hash_value}]" ) def _arg_parser(): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument("--galaxy-root") parser.add_argument("--datatypes-registry") parser.add_argument("--request-version") parser.add_argument("--request") parser.add_argument("--working-directory") return parser
[docs]def get_file_sources(working_directory, file_sources_as_dict=None): from galaxy.files import ConfiguredFileSources file_sources = None if file_sources_as_dict is None: file_sources_path = os.path.join(working_directory, "file_sources.json") if os.path.exists(file_sources_path): file_sources_as_dict = None with open(file_sources_path) as f: file_sources_as_dict = json.load(f) if file_sources_as_dict is not None: file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) if file_sources is None: ConfiguredFileSources.from_dict(None) return file_sources
[docs]class UploadConfig:
[docs] def __init__( self, request, registry, working_directory, allow_failed_collections, file_sources_dict=None, ): self.registry = registry self.working_directory = working_directory self.allow_failed_collections = allow_failed_collections self.check_content = request.get("check_content", True) self.to_posix_lines = request.get("to_posix_lines", False) self.space_to_tab = request.get("space_to_tab", False) self.auto_decompress = request.get("auto_decompress", False) self.validate_hashes = request.get("validate_hashes", False) self.deferred = request.get("deferred", False) self.link_data_only = _link_data_only(request) self.file_sources_dict = file_sources_dict self._file_sources = None self.__workdir = os.path.abspath(working_directory) self.__upload_count = 0
@property def file_sources(self): if self._file_sources is None: self._file_sources = get_file_sources(self.working_directory, file_sources_as_dict=self.file_sources_dict) return self._file_sources
[docs] def get_option(self, item, key): """Return item[key] if specified otherwise use default from UploadConfig. This default represents the default for the whole request instead item which is the option for individual files. """ if key in item: return item[key] else: return getattr(self, key)
def __new_dataset_path(self): path = os.path.join(self.working_directory, f"gxupload_{self.__upload_count}") self.__upload_count += 1 return path
[docs] def ensure_in_working_directory(self, path, purge_source, in_place): if in_directory(path, self.__workdir): return path new_path = self.__new_dataset_path() if purge_source: try: shutil.move(path, new_path) # Drop .info file if it exists try: os.remove(f"{path}.info") except FileNotFoundError: pass except OSError as e: # We may not have permission to remove converted_path if e.errno != errno.EACCES: raise else: shutil.copy(path, new_path) return new_path
def _link_data_only(has_config_dict): link_data_only = has_config_dict.get("link_data_only", False) if not isinstance(link_data_only, bool): # Allow the older string values of 'copy_files' and 'link_to_files' link_data_only = link_data_only == "copy_files" return link_data_only def _for_each_src(f, obj): if isinstance(obj, list): for item in obj: _for_each_src(f, item) if isinstance(obj, dict): if "src" in obj: f(obj) for value in obj.values(): _for_each_src(f, value) if __name__ == "__main__": main()