Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_fetch
import argparse
import errno
import json
import os
import shutil
import sys
import tempfile
import bdbag.bdbag_api
from six.moves import StringIO
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
    handle_upload,
    UploadProblemException,
)
from galaxy.util import in_directory
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import HASH_NAMES, memory_bound_hexdigest
DESCRIPTION = """Data Import Script"""
[docs]def main(argv=None):
    if argv is None:
        argv = sys.argv[1:]
    args = _arg_parser().parse_args(argv)
    registry = Registry()
    registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry)
    request_path = args.request
    assert os.path.exists(request_path)
    with open(request_path) as f:
        request = json.load(f)
    upload_config = UploadConfig(request, registry)
    galaxy_json = _request_to_galaxy_json(upload_config, request)
    with open("galaxy.json", "w") as f:
        json.dump(galaxy_json, f)
def _request_to_galaxy_json(upload_config, request):
    targets = request.get("targets", [])
    fetched_targets = []
    for target in targets:
        fetched_target = _fetch_target(upload_config, target)
        fetched_targets.append(fetched_target)
    return {"__unnamed_outputs": fetched_targets}
def _fetch_target(upload_config, target):
    destination = target.get("destination", None)
    assert destination, "No destination defined."
    def expand_elements_from(target_or_item):
        elements_from = target_or_item.get("elements_from", None)
        items = None
        if elements_from:
            if elements_from == "archive":
                decompressed_directory = _decompress_target(upload_config, target_or_item)
                items = _directory_to_items(decompressed_directory)
            elif elements_from == "bagit":
                _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False)
                items = _bagit_to_items(elements_from_path)
            elif elements_from == "bagit_archive":
                decompressed_directory = _decompress_target(upload_config, target_or_item)
                items = _bagit_to_items(decompressed_directory)
            elif elements_from == "directory":
                _, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False)
                items = _directory_to_items(elements_from_path)
            else:
                raise Exception("Unknown elements from type encountered [%s]" % elements_from)
        if items:
            del target_or_item["elements_from"]
            target_or_item["elements"] = items
    _for_each_src(expand_elements_from, target)
    items = target.get("elements", None)
    assert items is not None, "No element definition found for destination [%s]" % destination
    fetched_target = {}
    fetched_target["destination"] = destination
    if "collection_type" in target:
        fetched_target["collection_type"] = target["collection_type"]
    if "name" in target:
        fetched_target["name"] = target["name"]
    def _resolve_src(item):
        converted_path = None
        name, path = _has_src_to_path(upload_config, item, is_dataset=True)
        sources = []
        url = item.get("url")
        if url:
            sources.append({"source_uri": url})
        hashes = item.get("hashes", [])
        for hash_dict in hashes:
            hash_function = hash_dict.get("hash_function")
            hash_value = hash_dict.get("hash_value")
            _handle_hash_validation(upload_config, hash_function, hash_value, path)
        dbkey = item.get("dbkey", "?")
        requested_ext = item.get("ext", "auto")
        info = item.get("info", None)
        created_from_basename = item.get("created_from_basename", None)
        tags = item.get("tags", [])
        object_id = item.get("object_id", None)
        link_data_only = upload_config.link_data_only
        if "link_data_only" in item:
            # Allow overriding this on a per file basis.
            link_data_only = _link_data_only(item)
        to_posix_lines = upload_config.get_option(item, "to_posix_lines")
        space_to_tab = upload_config.get_option(item, "space_to_tab")
        auto_decompress = upload_config.get_option(item, "auto_decompress")
        in_place = item.get("in_place", False)
        purge_source = item.get("purge_source", True)
        registry = upload_config.registry
        check_content = upload_config.check_content
        stdout, ext, datatype, is_binary, converted_path = handle_upload(
            registry=registry,
            path=path,
            requested_ext=requested_ext,
            name=name,
            tmp_prefix='data_fetch_upload_',
            tmp_dir=".",
            check_content=check_content,
            link_data_only=link_data_only,
            in_place=in_place,
            auto_decompress=auto_decompress,
            convert_to_posix_lines=to_posix_lines,
            convert_spaces_to_tabs=space_to_tab,
        )
        if link_data_only:
            # Never alter a file that will not be copied to Galaxy's local file store.
            if datatype.dataset_content_needs_grooming(path):
                err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \
                    '<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.'
                raise UploadProblemException(err_msg)
        # If this file is not in the workdir make sure it gets there.
        if not link_data_only and converted_path:
            path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place)
        elif not link_data_only:
            path = upload_config.ensure_in_working_directory(path, purge_source, in_place)
        if not link_data_only and datatype and datatype.dataset_content_needs_grooming(path):
            # Groom the dataset content if necessary
            datatype.groom_dataset_content(path)
        rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only, "sources": sources, "hashes": hashes}
        if info is not None:
            rval["info"] = info
        if object_id is not None:
            rval["object_id"] = object_id
        if tags:
            rval["tags"] = tags
        if created_from_basename:
            rval["created_from_basename"] = created_from_basename
        return rval
    elements = elements_tree_map(_resolve_src, items)
    fetched_target["elements"] = elements
    return fetched_target
def _bagit_to_items(directory):
    bdbag.bdbag_api.resolve_fetch(directory)
    bdbag.bdbag_api.validate_bag(directory)
    items = _directory_to_items(os.path.join(directory, "data"))
    return items
def _decompress_target(upload_config, target):
    elements_from_name, elements_from_path = _has_src_to_path(upload_config, target, is_dataset=False)
    temp_directory = tempfile.mkdtemp(prefix=elements_from_name, dir=".")
    decompressed_directory = CompressedFile(elements_from_path).extract(temp_directory)
    return decompressed_directory
[docs]def elements_tree_map(f, items):
    new_items = []
    for item in items:
        if "elements" in item:
            new_item = item.copy()
            new_item["elements"] = elements_tree_map(f, item["elements"])
            new_items.append(new_item)
        else:
            new_items.append(f(item))
    return new_items
def _directory_to_items(directory):
    items = []
    dir_elements = {}
    for root, dirs, files in os.walk(directory):
        if root in dir_elements:
            target = dir_elements[root]
        else:
            target = items
        for dir in sorted(dirs):
            dir_dict = {"name": dir, "elements": []}
            dir_elements[os.path.join(root, dir)] = dir_dict["elements"]
            target.append(dir_dict)
        for file in sorted(files):
            target.append({"src": "path", "path": os.path.join(root, file)})
    return items
def _has_src_to_path(upload_config, item, is_dataset=False):
    assert "src" in item, item
    src = item.get("src")
    name = item.get("name")
    if src == "url":
        url = item.get("url")
        path = sniff.stream_url_to_file(url)
        if not is_dataset:
            # Actual target dataset will validate and put results in dict
            # that gets passed back to Galaxy.
            for hash_function in HASH_NAMES:
                hash_value = item.get(hash_function)
                if hash_value:
                    _handle_hash_validation(upload_config, hash_function, hash_value, path)
        if name is None:
            name = url.split("/")[-1]
    elif src == "pasted":
        path = sniff.stream_to_file(StringIO(item["paste_content"]))
        if name is None:
            name = "Pasted Entry"
    else:
        assert src == "path"
        path = item["path"]
        if name is None:
            name = os.path.basename(path)
    return name, path
def _handle_hash_validation(upload_config, hash_function, hash_value, path):
    if upload_config.validate_hashes:
        calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path)
        if calculated_hash_value != hash_value:
            raise Exception("Failed to validate upload with [%s] - expected [%s] got [%s]" % (hash_function, hash_value, calculated_hash_value))
def _arg_parser():
    parser = argparse.ArgumentParser(description=DESCRIPTION)
    parser.add_argument("--galaxy-root")
    parser.add_argument("--datatypes-registry")
    parser.add_argument("--request-version")
    parser.add_argument("--request")
    return parser
[docs]class UploadConfig(object):
[docs]    def __init__(self, request, registry):
        self.registry = registry
        self.check_content = request.get("check_content" , True)
        self.to_posix_lines = request.get("to_posix_lines", False)
        self.space_to_tab = request.get("space_to_tab", False)
        self.auto_decompress = request.get("auto_decompress", False)
        self.validate_hashes = request.get("validate_hashes", False)
        self.link_data_only = _link_data_only(request)
        self.__workdir = os.path.abspath(".")
        self.__upload_count = 0
[docs]    def get_option(self, item, key):
        """Return item[key] if specified otherwise use default from UploadConfig.
        This default represents the default for the whole request instead item which
        is the option for individual files.
        """
        if key in item:
            return item[key]
        else:
            return getattr(self, key)
    def __new_dataset_path(self):
        path = "gxupload_%d" % self.__upload_count
        self.__upload_count += 1
        return path
[docs]    def ensure_in_working_directory(self, path, purge_source, in_place):
        if in_directory(path, self.__workdir):
            return path
        new_path = self.__new_dataset_path()
        if purge_source:
            try:
                shutil.move(path, new_path)
            except OSError as e:
                # We may not have permission to remove converted_path
                if e.errno != errno.EACCES:
                    raise
        else:
            shutil.copy(path, new_path)
        return new_path
def _link_data_only(has_config_dict):
    link_data_only = has_config_dict.get("link_data_only", False)
    if not isinstance(link_data_only, bool):
        # Allow the older string values of 'copy_files' and 'link_to_files'
        link_data_only = link_data_only == "copy_files"
    return link_data_only
def _for_each_src(f, obj):
    if isinstance(obj, list):
        for item in obj:
            _for_each_src(f, item)
    if isinstance(obj, dict):
        if "src" in obj:
            f(obj)
        for key, value in obj.items():
            _for_each_src(f, value)
if __name__ == "__main__":
    main()