Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_fetch
import argparse
import errno
import json
import os
import shutil
import sys
import tempfile
import bdbag.bdbag_api
from six.moves import StringIO
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
handle_upload,
UploadProblemException,
)
from galaxy.util import in_directory
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import HASH_NAMES, memory_bound_hexdigest
DESCRIPTION = """Data Import Script"""
[docs]def main(argv=None):
if argv is None:
argv = sys.argv[1:]
args = _arg_parser().parse_args(argv)
registry = Registry()
registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry)
request_path = args.request
assert os.path.exists(request_path)
with open(request_path) as f:
request = json.load(f)
upload_config = UploadConfig(request, registry)
galaxy_json = _request_to_galaxy_json(upload_config, request)
with open("galaxy.json", "w") as f:
json.dump(galaxy_json, f)
def _request_to_galaxy_json(upload_config, request):
targets = request.get("targets", [])
fetched_targets = []
for target in targets:
fetched_target = _fetch_target(upload_config, target)
fetched_targets.append(fetched_target)
return {"__unnamed_outputs": fetched_targets}
def _fetch_target(upload_config, target):
destination = target.get("destination", None)
assert destination, "No destination defined."
def expand_elements_from(target_or_item):
elements_from = target_or_item.get("elements_from", None)
items = None
if elements_from:
if elements_from == "archive":
decompressed_directory = _decompress_target(upload_config, target_or_item)
items = _directory_to_items(decompressed_directory)
elif elements_from == "bagit":
_, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False)
items = _bagit_to_items(elements_from_path)
elif elements_from == "bagit_archive":
decompressed_directory = _decompress_target(upload_config, target_or_item)
items = _bagit_to_items(decompressed_directory)
elif elements_from == "directory":
_, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False)
items = _directory_to_items(elements_from_path)
else:
raise Exception("Unknown elements from type encountered [%s]" % elements_from)
if items:
del target_or_item["elements_from"]
target_or_item["elements"] = items
_for_each_src(expand_elements_from, target)
items = target.get("elements", None)
assert items is not None, "No element definition found for destination [%s]" % destination
fetched_target = {}
fetched_target["destination"] = destination
if "collection_type" in target:
fetched_target["collection_type"] = target["collection_type"]
if "name" in target:
fetched_target["name"] = target["name"]
def _resolve_src(item):
converted_path = None
name, path = _has_src_to_path(upload_config, item, is_dataset=True)
sources = []
url = item.get("url")
if url:
sources.append({"source_uri": url})
hashes = item.get("hashes", [])
for hash_dict in hashes:
hash_function = hash_dict.get("hash_function")
hash_value = hash_dict.get("hash_value")
_handle_hash_validation(upload_config, hash_function, hash_value, path)
dbkey = item.get("dbkey", "?")
requested_ext = item.get("ext", "auto")
info = item.get("info", None)
tags = item.get("tags", [])
object_id = item.get("object_id", None)
link_data_only = upload_config.link_data_only
if "link_data_only" in item:
# Allow overriding this on a per file basis.
link_data_only = _link_data_only(item)
to_posix_lines = upload_config.get_option(item, "to_posix_lines")
space_to_tab = upload_config.get_option(item, "space_to_tab")
auto_decompress = upload_config.get_option(item, "auto_decompress")
in_place = item.get("in_place", False)
purge_source = item.get("purge_source", True)
registry = upload_config.registry
check_content = upload_config.check_content
stdout, ext, datatype, is_binary, converted_path = handle_upload(
registry=registry,
path=path,
requested_ext=requested_ext,
name=name,
tmp_prefix='data_fetch_upload_',
tmp_dir=".",
check_content=check_content,
link_data_only=link_data_only,
in_place=in_place,
auto_decompress=auto_decompress,
convert_to_posix_lines=to_posix_lines,
convert_spaces_to_tabs=space_to_tab,
)
if link_data_only:
# Never alter a file that will not be copied to Galaxy's local file store.
if datatype.dataset_content_needs_grooming(path):
err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \
'<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.'
raise UploadProblemException(err_msg)
# If this file is not in the workdir make sure it gets there.
if not link_data_only and converted_path:
path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place)
elif not link_data_only:
path = upload_config.ensure_in_working_directory(path, purge_source, in_place)
if not link_data_only and datatype and datatype.dataset_content_needs_grooming(path):
# Groom the dataset content if necessary
datatype.groom_dataset_content(path)
rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only, "sources": sources, "hashes": hashes}
if info is not None:
rval["info"] = info
if object_id is not None:
rval["object_id"] = object_id
if tags:
rval["tags"] = tags
return rval
elements = elements_tree_map(_resolve_src, items)
fetched_target["elements"] = elements
return fetched_target
def _bagit_to_items(directory):
bdbag.bdbag_api.resolve_fetch(directory)
bdbag.bdbag_api.validate_bag(directory)
items = _directory_to_items(os.path.join(directory, "data"))
return items
def _decompress_target(upload_config, target):
elements_from_name, elements_from_path = _has_src_to_path(upload_config, target, is_dataset=False)
temp_directory = tempfile.mkdtemp(prefix=elements_from_name, dir=".")
decompressed_directory = CompressedFile(elements_from_path).extract(temp_directory)
return decompressed_directory
[docs]def elements_tree_map(f, items):
new_items = []
for item in items:
if "elements" in item:
new_item = item.copy()
new_item["elements"] = elements_tree_map(f, item["elements"])
new_items.append(new_item)
else:
new_items.append(f(item))
return new_items
def _directory_to_items(directory):
items = []
dir_elements = {}
for root, dirs, files in os.walk(directory):
if root in dir_elements:
target = dir_elements[root]
else:
target = items
for dir in sorted(dirs):
dir_dict = {"name": dir, "elements": []}
dir_elements[os.path.join(root, dir)] = dir_dict["elements"]
target.append(dir_dict)
for file in sorted(files):
target.append({"src": "path", "path": os.path.join(root, file)})
return items
def _has_src_to_path(upload_config, item, is_dataset=False):
assert "src" in item, item
src = item.get("src")
name = item.get("name")
if src == "url":
url = item.get("url")
path = sniff.stream_url_to_file(url)
if not is_dataset:
# Actual target dataset will validate and put results in dict
# that gets passed back to Galaxy.
for hash_function in HASH_NAMES:
hash_value = item.get(hash_function)
if hash_value:
_handle_hash_validation(upload_config, hash_function, hash_value, path)
if name is None:
name = url.split("/")[-1]
elif src == "pasted":
path = sniff.stream_to_file(StringIO(item["paste_content"]))
if name is None:
name = "Pasted Entry"
else:
assert src == "path"
path = item["path"]
if name is None:
name = os.path.basename(path)
return name, path
def _handle_hash_validation(upload_config, hash_function, hash_value, path):
if upload_config.validate_hashes:
calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path)
if calculated_hash_value != hash_value:
raise Exception("Failed to validate upload with [%s] - expected [%s] got [%s]" % (hash_function, hash_value, calculated_hash_value))
def _arg_parser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("--galaxy-root")
parser.add_argument("--datatypes-registry")
parser.add_argument("--request-version")
parser.add_argument("--request")
return parser
[docs]class UploadConfig(object):
[docs] def __init__(self, request, registry):
self.registry = registry
self.check_content = request.get("check_content" , True)
self.to_posix_lines = request.get("to_posix_lines", False)
self.space_to_tab = request.get("space_to_tab", False)
self.auto_decompress = request.get("auto_decompress", False)
self.validate_hashes = request.get("validate_hashes", False)
self.link_data_only = _link_data_only(request)
self.__workdir = os.path.abspath(".")
self.__upload_count = 0
[docs] def get_option(self, item, key):
"""Return item[key] if specified otherwise use default from UploadConfig.
This default represents the default for the whole request instead item which
is the option for individual files.
"""
if key in item:
return item[key]
else:
return getattr(self, key)
def __new_dataset_path(self):
path = "gxupload_%d" % self.__upload_count
self.__upload_count += 1
return path
[docs] def ensure_in_working_directory(self, path, purge_source, in_place):
if in_directory(path, self.__workdir):
return path
new_path = self.__new_dataset_path()
if purge_source:
try:
shutil.move(path, new_path)
except OSError as e:
# We may not have permission to remove converted_path
if e.errno != errno.EACCES:
raise
else:
shutil.copy(path, new_path)
return new_path
def _link_data_only(has_config_dict):
link_data_only = has_config_dict.get("link_data_only", False)
if not isinstance(link_data_only, bool):
# Allow the older string values of 'copy_files' and 'link_to_files'
link_data_only = link_data_only == "copy_files"
return link_data_only
def _for_each_src(f, obj):
if isinstance(obj, list):
for item in obj:
_for_each_src(f, item)
if isinstance(obj, dict):
if "src" in obj:
f(obj)
for key, value in obj.items():
_for_each_src(f, value)
if __name__ == "__main__":
main()