Source code for galaxy.tools.data_fetch

import argparse
import errno
import json
import os
import shutil
import sys
import tempfile
from io import StringIO
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
)

import bdbag.bdbag_api

from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
    handle_upload,
    UploadProblemException,
)
from galaxy.files.uris import (
    stream_to_file,
    stream_url_to_file,
)
from galaxy.util import (
    in_directory,
    safe_makedirs,
)
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import (
    HASH_NAMES,
    memory_bound_hexdigest,
)

DESCRIPTION = """Data Import Script"""


[docs]def main(argv=None): if argv is None: argv = sys.argv[1:] args = _arg_parser().parse_args(argv) registry = Registry() registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry) do_fetch(args.request, working_directory=args.working_directory or os.getcwd(), registry=registry)
[docs]def do_fetch( request_path: str, working_directory: str, registry: Registry, file_sources_dict: Optional[Dict] = None, ): assert os.path.exists(request_path) with open(request_path) as f: request = json.load(f) allow_failed_collections = request.get("allow_failed_collections", False) upload_config = UploadConfig( request, registry, working_directory, allow_failed_collections, file_sources_dict, ) galaxy_json = _request_to_galaxy_json(upload_config, request) galaxy_json_path = os.path.join(working_directory, "galaxy.json") with open(galaxy_json_path, "w") as f: json.dump(galaxy_json, f) return working_directory
def _request_to_galaxy_json(upload_config: "UploadConfig", request): targets = request.get("targets", []) fetched_targets = [] for target in targets: fetched_target = _fetch_target(upload_config, target) fetched_targets.append(fetched_target) return {"__unnamed_outputs": fetched_targets} def _fetch_target(upload_config: "UploadConfig", target): destination = target.get("destination", None) assert destination, "No destination defined." def expand_elements_from(target_or_item): items = None if elements_from := target_or_item.get("elements_from", None): if elements_from == "archive": decompressed_directory = _decompress_target(upload_config, target_or_item) items = _directory_to_items(decompressed_directory) elif elements_from == "bagit": _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False) items = _bagit_to_items(elements_from_path) elif elements_from == "bagit_archive": decompressed_directory = _decompress_target(upload_config, target_or_item) items = _bagit_to_items(decompressed_directory) elif elements_from == "directory": _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False) items = _directory_to_items(elements_from_path) else: raise Exception(f"Unknown elements from type encountered [{elements_from}]") if items: del target_or_item["elements_from"] target_or_item["elements"] = items expansion_error = None try: _for_each_src(expand_elements_from, target) except Exception as e: expansion_error = f"Error expanding elements/items for upload destination. {str(e)}" if expansion_error is None: items = target.get("elements", None) assert items is not None, f"No element definition found for destination [{destination}]" else: items = [] fetched_target = {} fetched_target["destination"] = destination destination_type = destination["type"] is_collection = destination_type == "hdca" failed_elements = [] if "collection_type" in target: fetched_target["collection_type"] = target["collection_type"] if "name" in target: fetched_target["name"] = target["name"] def _copy_and_validate_simple_attributes(src_item, target_metadata): info = src_item.get("info", None) created_from_basename = src_item.get("created_from_basename", None) tags = src_item.get("tags", []) if info is not None: target_metadata["info"] = info if (object_id := src_item.get("object_id", None)) is not None: target_metadata["object_id"] = object_id if tags: target_metadata["tags"] = tags if created_from_basename: target_metadata["created_from_basename"] = created_from_basename if "error_message" in src_item: target_metadata["error_message"] = src_item["error_message"] return target_metadata def _resolve_item(item): # Might be a dataset or a composite upload. requested_ext = item.get("ext", None) registry = upload_config.registry datatype = registry.get_datatype_by_extension(requested_ext) composite = item.pop("composite", None) if datatype and datatype.composite_type: composite_type = datatype.composite_type assert composite_type == "auto_primary_file", "basic composite uploads not yet implemented" # get_composite_dataset_name finds dataset name from basename of contents # and such but we're not implementing that here yet. yagni? # also need name... metadata = { composite_file.substitute_name_with_metadata: datatype.metadata_spec[ composite_file.substitute_name_with_metadata ].default for composite_file in datatype.composite_files.values() if composite_file.substitute_name_with_metadata } name = item.get("name") or "Composite Dataset" metadata["base_name"] = name dataset = Bunch( name=name, metadata=metadata, ) writable_files = datatype.get_writable_files_for_dataset(dataset) primary_file = stream_to_file( StringIO(datatype.generate_primary_file(dataset)), prefix="upload_auto_primary_file", dir=upload_config.working_directory, ) extra_files_path = f"{primary_file}_extra" os.mkdir(extra_files_path) rval: Dict[str, Any] = { "name": name, "filename": primary_file, "ext": requested_ext, "link_data_only": False, "sources": [], "hashes": [], "extra_files": extra_files_path, } _copy_and_validate_simple_attributes(item, rval) composite_items = composite.get("elements", []) keys = list(writable_files.keys()) composite_item_idx = 0 for composite_item in composite_items: if composite_item_idx >= len(keys): # raise exception - too many files? pass key = keys[composite_item_idx] writable_file = writable_files[key] _, src_target = _has_src_to_path(upload_config, composite_item) # do the writing sniff.handle_composite_file( datatype, src_target, extra_files_path, key, writable_file.is_binary, upload_config.working_directory, f"{os.path.basename(extra_files_path)}_", composite_item, ) composite_item_idx += 1 writable_files_idx = composite_item_idx while writable_files_idx < len(keys): key = keys[writable_files_idx] writable_file = writable_files[key] if not writable_file.optional: # raise Exception, non-optional file missing pass writable_files_idx += 1 return rval else: if composite: raise Exception(f"Non-composite datatype [{datatype}] attempting to be created with composite data.") return _resolve_item_with_primary(item) def _resolve_item_with_primary(item): error_message = None converted_path = None deferred = upload_config.get_option(item, "deferred") name: str path: Optional[str] if not deferred: name, path = _has_src_to_path(upload_config, item, is_dataset=True) else: name, path = _has_src_to_name(item) or "Deferred Dataset", None sources = [] url = item.get("url") source_dict = {"source_uri": url} if url: sources.append(source_dict) hashes = item.get("hashes", []) for hash_dict in hashes: hash_function = hash_dict.get("hash_function") hash_value = hash_dict.get("hash_value") try: _handle_hash_validation(upload_config, hash_function, hash_value, path) except Exception as e: error_message = str(e) item["error_message"] = error_message dbkey = item.get("dbkey", "?") link_data_only = upload_config.link_data_only if "link_data_only" in item: # Allow overriding this on a per file basis. link_data_only = _link_data_only(item) ext = "data" staged_extra_files = None requested_ext = item.get("ext", "auto") to_posix_lines = upload_config.get_option(item, "to_posix_lines") space_to_tab = upload_config.get_option(item, "space_to_tab") auto_decompress = upload_config.get_option(item, "auto_decompress") effective_state = "ok" if not deferred and not error_message: in_place = item.get("in_place", False) purge_source = item.get("purge_source", True) registry = upload_config.registry check_content = upload_config.check_content assert path # if deferred won't be in this branch. stdout, ext, datatype, is_binary, converted_path, converted_newlines, converted_spaces = handle_upload( registry=registry, path=path, requested_ext=requested_ext, name=name, tmp_prefix="data_fetch_upload_", tmp_dir=upload_config.working_directory, check_content=check_content, link_data_only=link_data_only, in_place=in_place, auto_decompress=auto_decompress, convert_to_posix_lines=to_posix_lines, convert_spaces_to_tabs=space_to_tab, ) transform = [] if converted_newlines: transform.append({"action": "to_posix_lines"}) if converted_spaces: transform.append({"action": "spaces_to_tabs"}) if link_data_only: # Never alter a file that will not be copied to Galaxy's local file store. if datatype.dataset_content_needs_grooming(path): err_msg = ( "The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be " "<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed." ) raise UploadProblemException(err_msg) # If this file is not in the workdir make sure it gets there. if not link_data_only and converted_path: path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place) elif not link_data_only: path = upload_config.ensure_in_working_directory(path, purge_source, in_place) extra_files = item.get("extra_files") if extra_files: # TODO: optimize to just copy the whole directory to extra files instead. assert not upload_config.link_data_only, "linking composite dataset files not yet implemented" extra_files_path = f"{path}_extra" staged_extra_files = extra_files_path os.mkdir(extra_files_path) def walk_extra_files(items, prefix=""): for item in items: if "elements" in item: name = item.get("name") if not prefix: item_prefix = name else: item_prefix = os.path.join(prefix, name) walk_extra_files(item.get("elements"), prefix=item_prefix) else: src_name, src_path = _has_src_to_path(upload_config, item) if prefix: rel_path = os.path.join(prefix, src_name) else: rel_path = src_name file_output_path = os.path.join(extra_files_path, rel_path) parent_dir = os.path.dirname(file_output_path) if not os.path.exists(parent_dir): safe_makedirs(parent_dir) shutil.move(src_path, file_output_path) walk_extra_files(extra_files.get("elements", [])) # TODO: # in galaxy json add 'extra_files' and point at target derived from extra_files: needs_grooming = not link_data_only and datatype and datatype.dataset_content_needs_grooming(path) # type: ignore[arg-type] if needs_grooming: # Groom the dataset content if necessary transform.append( {"action": "datatype_groom", "datatype_ext": ext, "datatype_class": datatype.__class__.__name__} ) assert path datatype.groom_dataset_content(path) if len(transform) > 0: source_dict["transform"] = transform elif not error_message: transform = [] if to_posix_lines: transform.append({"action": "to_posix_lines"}) if space_to_tab: transform.append({"action": "spaces_to_tabs"}) effective_state = "deferred" registry = upload_config.registry ext = sniff.guess_ext_from_file_name(name, registry=registry, requested_ext=requested_ext) rval = { "name": name, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only, "sources": sources, "hashes": hashes, "info": f"uploaded {ext} file", "state": effective_state, } if path: rval["filename"] = path if staged_extra_files: rval["extra_files"] = os.path.abspath(staged_extra_files) return _copy_and_validate_simple_attributes(item, rval) def _resolve_item_capture_error(item): try: return _resolve_item(item) except Exception as e: rval = {"error_message": str(e)} rval = _copy_and_validate_simple_attributes(item, rval) failed_elements.append(rval) return rval if expansion_error is None: elements = elements_tree_map(_resolve_item_capture_error, items) if is_collection and not upload_config.allow_failed_collections and len(failed_elements) > 0: element_error = "Failed to fetch collection element(s):\n" for failed_element in failed_elements: element_error += f"\n- {failed_element['error_message']}" fetched_target["error_message"] = element_error fetched_target["elements"] = None else: fetched_target["elements"] = elements else: fetched_target["elements"] = [] fetched_target["error_message"] = expansion_error return fetched_target def _bagit_to_items(directory): bdbag.bdbag_api.resolve_fetch(directory) bdbag.bdbag_api.validate_bag(directory) items = _directory_to_items(os.path.join(directory, "data")) return items def _decompress_target(upload_config: "UploadConfig", target): elements_from_name, elements_from_path = _has_src_to_path(upload_config, target, is_dataset=False) # by default Galaxy will check for a directory with a single file and interpret that # as the new root for expansion, this is a good user experience for uploading single # files in a archive but not great from an API perspective. Allow disabling by setting # fuzzy_root to False to literally interpret the target. fuzzy_root = target.get("fuzzy_root", True) temp_directory = os.path.abspath(tempfile.mkdtemp(prefix=elements_from_name, dir=upload_config.working_directory)) cf = CompressedFile(elements_from_path) result = cf.extract(temp_directory) return result if fuzzy_root else temp_directory
[docs]def elements_tree_map(f, items): new_items = [] for item in items: if "elements" in item: new_item = item.copy() new_item["elements"] = elements_tree_map(f, item["elements"]) new_items.append(new_item) else: new_items.append(f(item)) return new_items
def _directory_to_items(directory): items: List[Dict[str, Any]] = [] dir_elements: Dict[str, Any] = {} for root, dirs, files in os.walk(directory): if root in dir_elements: target = dir_elements[root] else: target = items for dir in sorted(dirs): dir_dict = {"name": dir, "elements": []} dir_elements[os.path.join(root, dir)] = dir_dict["elements"] target.append(dir_dict) for file in sorted(files): target.append({"src": "path", "path": os.path.join(root, file)}) return items def _has_src_to_name(item) -> Optional[str]: # Logic should broadly match logic of _has_src_to_path but not resolve the item # into a path. name = item.get("name") src = item.get("src") if src == "url": url = item.get("url") if name is None: name = url.split("/")[-1] elif src == "path": path = item["path"] if name is None: name = os.path.basename(path) return name def _has_src_to_path(upload_config, item, is_dataset=False) -> Tuple[str, str]: assert "src" in item, item src = item.get("src") name = item.get("name") if src == "url": url = item.get("url") try: path = stream_url_to_file(url, file_sources=upload_config.file_sources, dir=upload_config.working_directory) except Exception as e: raise Exception(f"Failed to fetch url {url}. {str(e)}") if not is_dataset: # Actual target dataset will validate and put results in dict # that gets passed back to Galaxy. for hash_function in HASH_NAMES: hash_value = item.get(hash_function) if hash_value: _handle_hash_validation(upload_config, hash_function, hash_value, path) if name is None: name = url.split("/")[-1] elif src == "pasted": path = stream_to_file(StringIO(item["paste_content"]), dir=upload_config.working_directory) if name is None: name = "Pasted Entry" else: assert src == "path" path = item["path"] if name is None: name = os.path.basename(path) return name, path def _handle_hash_validation(upload_config, hash_function, hash_value, path): if upload_config.validate_hashes: calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path) if calculated_hash_value != hash_value: raise Exception( f"Failed to validate upload with [{hash_function}] - expected [{hash_value}] got [{calculated_hash_value}]" ) def _arg_parser(): parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument("--galaxy-root") parser.add_argument("--datatypes-registry") parser.add_argument("--request-version") parser.add_argument("--request") parser.add_argument("--working-directory") return parser
[docs]def get_file_sources(working_directory, file_sources_as_dict=None): from galaxy.files import ConfiguredFileSources file_sources = None if file_sources_as_dict is None: file_sources_path = os.path.join(working_directory, "file_sources.json") if os.path.exists(file_sources_path): file_sources_as_dict = None with open(file_sources_path) as f: file_sources_as_dict = json.load(f) if file_sources_as_dict is not None: file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) if file_sources is None: ConfiguredFileSources.from_dict(None) return file_sources
[docs]class UploadConfig:
[docs] def __init__( self, request, registry, working_directory, allow_failed_collections, file_sources_dict=None, ): self.registry = registry self.working_directory = working_directory self.allow_failed_collections = allow_failed_collections self.check_content = request.get("check_content", True) self.to_posix_lines = request.get("to_posix_lines", False) self.space_to_tab = request.get("space_to_tab", False) self.auto_decompress = request.get("auto_decompress", False) self.validate_hashes = request.get("validate_hashes", False) self.deferred = request.get("deferred", False) self.link_data_only = _link_data_only(request) self.file_sources_dict = file_sources_dict self._file_sources = None self.__workdir = os.path.abspath(working_directory) self.__upload_count = 0
@property def file_sources(self): if self._file_sources is None: self._file_sources = get_file_sources(self.working_directory, file_sources_as_dict=self.file_sources_dict) return self._file_sources
[docs] def get_option(self, item, key): """Return item[key] if specified otherwise use default from UploadConfig. This default represents the default for the whole request instead item which is the option for individual files. """ if key in item: return item[key] else: return getattr(self, key)
def __new_dataset_path(self): path = os.path.join(self.working_directory, f"gxupload_{self.__upload_count}") self.__upload_count += 1 return path
[docs] def ensure_in_working_directory(self, path, purge_source, in_place): if in_directory(path, self.__workdir): return path new_path = self.__new_dataset_path() if purge_source: try: shutil.move(path, new_path) # Drop .info file if it exists try: os.remove(f"{path}.info") except FileNotFoundError: pass except OSError as e: # We may not have permission to remove converted_path if e.errno != errno.EACCES: raise else: shutil.copy(path, new_path) return new_path
def _link_data_only(has_config_dict): link_data_only = has_config_dict.get("link_data_only", False) if not isinstance(link_data_only, bool): # Allow the older string values of 'copy_files' and 'link_to_files' link_data_only = link_data_only == "copy_files" return link_data_only def _for_each_src(f, obj): if isinstance(obj, list): for item in obj: _for_each_src(f, item) if isinstance(obj, dict): if "src" in obj: f(obj) for value in obj.values(): _for_each_src(f, value) if __name__ == "__main__": main()