Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_fetch
import argparse
import errno
import json
import os
import shutil
import sys
import tempfile
import bdbag.bdbag_api
from six.moves import StringIO
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
handle_upload,
UploadProblemException,
)
from galaxy.util import in_directory
from galaxy.util.compression_utils import CompressedFile
DESCRIPTION = """Data Import Script"""
[docs]def main(argv=None):
if argv is None:
argv = sys.argv[1:]
args = _arg_parser().parse_args(argv)
registry = Registry()
registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry)
request_path = args.request
assert os.path.exists(request_path)
with open(request_path) as f:
request = json.load(f)
upload_config = UploadConfig(request, registry)
galaxy_json = _request_to_galaxy_json(upload_config, request)
with open("galaxy.json", "w") as f:
json.dump(galaxy_json, f)
def _request_to_galaxy_json(upload_config, request):
targets = request.get("targets", [])
fetched_targets = []
for target in targets:
fetched_target = _fetch_target(upload_config, target)
fetched_targets.append(fetched_target)
return {"__unnamed_outputs": fetched_targets}
def _fetch_target(upload_config, target):
destination = target.get("destination", None)
assert destination, "No destination defined."
def expand_elements_from(target_or_item):
elements_from = target_or_item.get("elements_from", None)
items = None
if elements_from:
if elements_from == "archive":
decompressed_directory = _decompress_target(target_or_item)
items = _directory_to_items(decompressed_directory)
elif elements_from == "bagit":
_, elements_from_path = _has_src_to_path(target_or_item)
items = _bagit_to_items(elements_from_path)
elif elements_from == "bagit_archive":
decompressed_directory = _decompress_target(target_or_item)
items = _bagit_to_items(decompressed_directory)
elif elements_from == "directory":
_, elements_from_path = _has_src_to_path(target_or_item)
items = _directory_to_items(elements_from_path)
else:
raise Exception("Unknown elements from type encountered [%s]" % elements_from)
if items:
del target_or_item["elements_from"]
target_or_item["elements"] = items
_for_each_src(expand_elements_from, target)
items = target.get("elements", None)
assert items is not None, "No element definition found for destination [%s]" % destination
fetched_target = {}
fetched_target["destination"] = destination
if "collection_type" in target:
fetched_target["collection_type"] = target["collection_type"]
if "name" in target:
fetched_target["name"] = target["name"]
def _resolve_src(item):
converted_path = None
name, path = _has_src_to_path(item)
dbkey = item.get("dbkey", "?")
requested_ext = item.get("ext", "auto")
info = item.get("info", None)
tags = item.get("tags", [])
object_id = item.get("object_id", None)
link_data_only = upload_config.link_data_only
if "link_data_only" in item:
# Allow overriding this on a per file basis.
link_data_only = _link_data_only(item)
to_posix_lines = upload_config.get_option(item, "to_posix_lines")
space_to_tab = upload_config.get_option(item, "space_to_tab")
auto_decompress = upload_config.get_option(item, "auto_decompress")
in_place = item.get("in_place", False)
purge_source = item.get("purge_source", True)
registry = upload_config.registry
check_content = upload_config.check_content
stdout, ext, datatype, is_binary, converted_path = handle_upload(
registry=registry,
path=path,
requested_ext=requested_ext,
name=name,
tmp_prefix='data_fetch_upload_',
tmp_dir=".",
check_content=check_content,
link_data_only=link_data_only,
in_place=in_place,
auto_decompress=auto_decompress,
convert_to_posix_lines=to_posix_lines,
convert_spaces_to_tabs=space_to_tab,
)
if link_data_only:
# Never alter a file that will not be copied to Galaxy's local file store.
if datatype.dataset_content_needs_grooming(path):
err_msg = 'The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be ' + \
'<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed.'
raise UploadProblemException(err_msg)
# If this file is not in the workdir make sure it gets there.
if not link_data_only and converted_path:
path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place)
elif not link_data_only:
path = upload_config.ensure_in_working_directory(path, purge_source, in_place)
if not link_data_only and datatype and datatype.dataset_content_needs_grooming(path):
# Groom the dataset content if necessary
datatype.groom_dataset_content(path)
rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only}
if info is not None:
rval["info"] = info
if object_id is not None:
rval["object_id"] = object_id
if tags:
rval["tags"] = tags
return rval
elements = elements_tree_map(_resolve_src, items)
fetched_target["elements"] = elements
return fetched_target
def _bagit_to_items(directory):
bdbag.bdbag_api.resolve_fetch(directory)
bdbag.bdbag_api.validate_bag(directory)
items = _directory_to_items(os.path.join(directory, "data"))
return items
def _decompress_target(target):
elements_from_name, elements_from_path = _has_src_to_path(target)
temp_directory = tempfile.mkdtemp(prefix=elements_from_name, dir=".")
decompressed_directory = CompressedFile(elements_from_path).extract(temp_directory)
return decompressed_directory
[docs]def elements_tree_map(f, items):
new_items = []
for item in items:
if "elements" in item:
new_item = item.copy()
new_item["elements"] = elements_tree_map(f, item["elements"])
new_items.append(new_item)
else:
new_items.append(f(item))
return new_items
def _directory_to_items(directory):
items = []
dir_elements = {}
for root, dirs, files in os.walk(directory):
if root in dir_elements:
target = dir_elements[root]
else:
target = items
for dir in sorted(dirs):
dir_dict = {"name": dir, "elements": []}
dir_elements[os.path.join(root, dir)] = dir_dict["elements"]
target.append(dir_dict)
for file in sorted(files):
target.append({"src": "path", "path": os.path.join(root, file)})
return items
def _has_src_to_path(item):
assert "src" in item, item
src = item.get("src")
name = item.get("name")
if src == "url":
url = item.get("url")
path = sniff.stream_url_to_file(url)
if name is None:
name = url.split("/")[-1]
elif src == "pasted":
path = sniff.stream_to_file(StringIO(item["paste_content"]))
if name is None:
name = "Pasted Entry"
else:
assert src == "path"
path = item["path"]
if name is None:
name = os.path.basename(path)
return name, path
def _arg_parser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("--galaxy-root")
parser.add_argument("--datatypes-registry")
parser.add_argument("--request-version")
parser.add_argument("--request")
return parser
[docs]class UploadConfig(object):
[docs] def __init__(self, request, registry):
self.registry = registry
self.check_content = request.get("check_content" , True)
self.to_posix_lines = request.get("to_posix_lines", False)
self.space_to_tab = request.get("space_to_tab", False)
self.auto_decompress = request.get("auto_decompress", False)
self.link_data_only = _link_data_only(request)
self.__workdir = os.path.abspath(".")
self.__upload_count = 0
[docs] def get_option(self, item, key):
"""Return item[key] if specified otherwise use default from UploadConfig.
This default represents the default for the whole request instead item which
is the option for individual files.
"""
if key in item:
return item[key]
else:
return getattr(self, key)
def __new_dataset_path(self):
path = "gxupload_%d" % self.__upload_count
self.__upload_count += 1
return path
[docs] def ensure_in_working_directory(self, path, purge_source, in_place):
if in_directory(path, self.__workdir):
return path
new_path = self.__new_dataset_path()
if purge_source:
try:
shutil.move(path, new_path)
except OSError as e:
# We may not have permission to remove converted_path
if e.errno != errno.EACCES:
raise
else:
shutil.copy(path, new_path)
return new_path
def _link_data_only(has_config_dict):
link_data_only = has_config_dict.get("link_data_only", False)
if not isinstance(link_data_only, bool):
# Allow the older string values of 'copy_files' and 'link_to_files'
link_data_only = link_data_only == "copy_files"
return link_data_only
def _for_each_src(f, obj):
if isinstance(obj, list):
for item in obj:
_for_each_src(f, item)
if isinstance(obj, dict):
if "src" in obj:
f(obj)
for key, value in obj.items():
_for_each_src(f, value)
if __name__ == "__main__":
main()