Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tools.data_fetch
import argparse
import errno
import json
import os
import shutil
import sys
import tempfile
from io import StringIO
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
)
import bdbag.bdbag_api
from galaxy.datatypes import sniff
from galaxy.datatypes.registry import Registry
from galaxy.datatypes.upload_util import (
handle_upload,
UploadProblemException,
)
from galaxy.files.uris import (
stream_to_file,
stream_url_to_file,
)
from galaxy.util import (
in_directory,
safe_makedirs,
)
from galaxy.util.bunch import Bunch
from galaxy.util.compression_utils import CompressedFile
from galaxy.util.hash_util import (
HASH_NAMES,
memory_bound_hexdigest,
)
DESCRIPTION = """Data Import Script"""
[docs]def main(argv=None):
if argv is None:
argv = sys.argv[1:]
args = _arg_parser().parse_args(argv)
registry = Registry()
registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry)
do_fetch(args.request, working_directory=args.working_directory or os.getcwd(), registry=registry)
[docs]def do_fetch(
request_path: str,
working_directory: str,
registry: Registry,
file_sources_dict: Optional[Dict] = None,
):
assert os.path.exists(request_path)
with open(request_path) as f:
request = json.load(f)
allow_failed_collections = request.get("allow_failed_collections", False)
upload_config = UploadConfig(
request,
registry,
working_directory,
allow_failed_collections,
file_sources_dict,
)
galaxy_json = _request_to_galaxy_json(upload_config, request)
galaxy_json_path = os.path.join(working_directory, "galaxy.json")
with open(galaxy_json_path, "w") as f:
json.dump(galaxy_json, f)
return working_directory
def _request_to_galaxy_json(upload_config: "UploadConfig", request):
targets = request.get("targets", [])
fetched_targets = []
for target in targets:
fetched_target = _fetch_target(upload_config, target)
fetched_targets.append(fetched_target)
return {"__unnamed_outputs": fetched_targets}
def _fetch_target(upload_config: "UploadConfig", target):
destination = target.get("destination", None)
assert destination, "No destination defined."
def expand_elements_from(target_or_item):
elements_from = target_or_item.get("elements_from", None)
items = None
if elements_from:
if elements_from == "archive":
decompressed_directory = _decompress_target(upload_config, target_or_item)
items = _directory_to_items(decompressed_directory)
elif elements_from == "bagit":
_, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False)
items = _bagit_to_items(elements_from_path)
elif elements_from == "bagit_archive":
decompressed_directory = _decompress_target(upload_config, target_or_item)
items = _bagit_to_items(decompressed_directory)
elif elements_from == "directory":
_, elements_from_path = _has_src_to_path(upload_config, target_or_item, is_dataset=False)
items = _directory_to_items(elements_from_path)
else:
raise Exception(f"Unknown elements from type encountered [{elements_from}]")
if items:
del target_or_item["elements_from"]
target_or_item["elements"] = items
expansion_error = None
try:
_for_each_src(expand_elements_from, target)
except Exception as e:
expansion_error = f"Error expanding elements/items for upload destination. {str(e)}"
if expansion_error is None:
items = target.get("elements", None)
assert items is not None, f"No element definition found for destination [{destination}]"
else:
items = []
fetched_target = {}
fetched_target["destination"] = destination
destination_type = destination["type"]
is_collection = destination_type == "hdca"
failed_elements = []
if "collection_type" in target:
fetched_target["collection_type"] = target["collection_type"]
if "name" in target:
fetched_target["name"] = target["name"]
def _copy_and_validate_simple_attributes(src_item, target_metadata):
info = src_item.get("info", None)
created_from_basename = src_item.get("created_from_basename", None)
tags = src_item.get("tags", [])
object_id = src_item.get("object_id", None)
if info is not None:
target_metadata["info"] = info
if object_id is not None:
target_metadata["object_id"] = object_id
if tags:
target_metadata["tags"] = tags
if created_from_basename:
target_metadata["created_from_basename"] = created_from_basename
if "error_message" in src_item:
target_metadata["error_message"] = src_item["error_message"]
return target_metadata
def _resolve_item(item):
# Might be a dataset or a composite upload.
requested_ext = item.get("ext", None)
registry = upload_config.registry
datatype = registry.get_datatype_by_extension(requested_ext)
composite = item.pop("composite", None)
if datatype and datatype.composite_type:
composite_type = datatype.composite_type
assert composite_type == "auto_primary_file", "basic composite uploads not yet implemented"
# get_composite_dataset_name finds dataset name from basename of contents
# and such but we're not implementing that here yet. yagni?
# also need name...
metadata = {
composite_file.substitute_name_with_metadata: datatype.metadata_spec[
composite_file.substitute_name_with_metadata
].default
for composite_file in datatype.composite_files.values()
if composite_file.substitute_name_with_metadata
}
name = item.get("name") or "Composite Dataset"
metadata["base_name"] = name
dataset = Bunch(
name=name,
metadata=metadata,
)
writable_files = datatype.get_writable_files_for_dataset(dataset)
primary_file = stream_to_file(
StringIO(datatype.generate_primary_file(dataset)),
prefix="upload_auto_primary_file",
dir=upload_config.working_directory,
)
extra_files_path = f"{primary_file}_extra"
os.mkdir(extra_files_path)
rval: Dict[str, Any] = {
"name": name,
"filename": primary_file,
"ext": requested_ext,
"link_data_only": False,
"sources": [],
"hashes": [],
"extra_files": extra_files_path,
}
_copy_and_validate_simple_attributes(item, rval)
composite_items = composite.get("elements", [])
keys = list(writable_files.keys())
composite_item_idx = 0
for composite_item in composite_items:
if composite_item_idx >= len(keys):
# raise exception - too many files?
pass
key = keys[composite_item_idx]
writable_file = writable_files[key]
_, src_target = _has_src_to_path(upload_config, composite_item)
# do the writing
sniff.handle_composite_file(
datatype,
src_target,
extra_files_path,
key,
writable_file.is_binary,
upload_config.working_directory,
f"{os.path.basename(extra_files_path)}_",
composite_item,
)
composite_item_idx += 1
writable_files_idx = composite_item_idx
while writable_files_idx < len(keys):
key = keys[writable_files_idx]
writable_file = writable_files[key]
if not writable_file.optional:
# raise Exception, non-optional file missing
pass
writable_files_idx += 1
return rval
else:
if composite:
raise Exception(f"Non-composite datatype [{datatype}] attempting to be created with composite data.")
return _resolve_item_with_primary(item)
def _resolve_item_with_primary(item):
error_message = None
converted_path = None
deferred = upload_config.get_option(item, "deferred")
name: str
path: Optional[str]
if not deferred:
name, path = _has_src_to_path(upload_config, item, is_dataset=True)
else:
name, path = _has_src_to_name(item) or "Deferred Dataset", None
sources = []
url = item.get("url")
source_dict = {"source_uri": url}
if url:
sources.append(source_dict)
hashes = item.get("hashes", [])
for hash_dict in hashes:
hash_function = hash_dict.get("hash_function")
hash_value = hash_dict.get("hash_value")
try:
_handle_hash_validation(upload_config, hash_function, hash_value, path)
except Exception as e:
error_message = str(e)
item["error_message"] = error_message
dbkey = item.get("dbkey", "?")
link_data_only = upload_config.link_data_only
if "link_data_only" in item:
# Allow overriding this on a per file basis.
link_data_only = _link_data_only(item)
ext = "data"
staged_extra_files = None
requested_ext = item.get("ext", "auto")
to_posix_lines = upload_config.get_option(item, "to_posix_lines")
space_to_tab = upload_config.get_option(item, "space_to_tab")
auto_decompress = upload_config.get_option(item, "auto_decompress")
effective_state = "ok"
if not deferred and not error_message:
in_place = item.get("in_place", False)
purge_source = item.get("purge_source", True)
registry = upload_config.registry
check_content = upload_config.check_content
assert path # if deferred won't be in this branch.
stdout, ext, datatype, is_binary, converted_path, converted_newlines, converted_spaces = handle_upload(
registry=registry,
path=path,
requested_ext=requested_ext,
name=name,
tmp_prefix="data_fetch_upload_",
tmp_dir=upload_config.working_directory,
check_content=check_content,
link_data_only=link_data_only,
in_place=in_place,
auto_decompress=auto_decompress,
convert_to_posix_lines=to_posix_lines,
convert_spaces_to_tabs=space_to_tab,
)
transform = []
if converted_newlines:
transform.append({"action": "to_posix_lines"})
if converted_spaces:
transform.append({"action": "spaces_to_tabs"})
if link_data_only:
# Never alter a file that will not be copied to Galaxy's local file store.
if datatype.dataset_content_needs_grooming(path):
err_msg = (
"The uploaded files need grooming, so change your <b>Copy data into Galaxy?</b> selection to be "
+ "<b>Copy files into Galaxy</b> instead of <b>Link to files without copying into Galaxy</b> so grooming can be performed."
)
raise UploadProblemException(err_msg)
# If this file is not in the workdir make sure it gets there.
if not link_data_only and converted_path:
path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place)
elif not link_data_only:
path = upload_config.ensure_in_working_directory(path, purge_source, in_place)
extra_files = item.get("extra_files")
if extra_files:
# TODO: optimize to just copy the whole directory to extra files instead.
assert not upload_config.link_data_only, "linking composite dataset files not yet implemented"
extra_files_path = f"{path}_extra"
staged_extra_files = extra_files_path
os.mkdir(extra_files_path)
def walk_extra_files(items, prefix=""):
for item in items:
if "elements" in item:
name = item.get("name")
if not prefix:
item_prefix = name
else:
item_prefix = os.path.join(prefix, name)
walk_extra_files(item.get("elements"), prefix=item_prefix)
else:
src_name, src_path = _has_src_to_path(upload_config, item)
if prefix:
rel_path = os.path.join(prefix, src_name)
else:
rel_path = src_name
file_output_path = os.path.join(extra_files_path, rel_path)
parent_dir = os.path.dirname(file_output_path)
if not os.path.exists(parent_dir):
safe_makedirs(parent_dir)
shutil.move(src_path, file_output_path)
walk_extra_files(extra_files.get("elements", []))
# TODO:
# in galaxy json add 'extra_files' and point at target derived from extra_files:
needs_grooming = not link_data_only and datatype and datatype.dataset_content_needs_grooming(path)
if needs_grooming:
# Groom the dataset content if necessary
transform.append(
{"action": "datatype_groom", "datatype_ext": ext, "datatype_class": datatype.__class__.__name__}
)
datatype.groom_dataset_content(path)
if len(transform) > 0:
source_dict["transform"] = transform
elif not error_message:
transform = []
if to_posix_lines:
transform.append({"action": "to_posix_lines"})
if space_to_tab:
transform.append({"action": "spaces_to_tabs"})
effective_state = "deferred"
registry = upload_config.registry
ext = sniff.guess_ext_from_file_name(name, registry=registry, requested_ext=requested_ext)
rval = {
"name": name,
"dbkey": dbkey,
"ext": ext,
"link_data_only": link_data_only,
"sources": sources,
"hashes": hashes,
"info": f"uploaded {ext} file",
"state": effective_state,
}
if path:
rval["filename"] = path
if staged_extra_files:
rval["extra_files"] = os.path.abspath(staged_extra_files)
return _copy_and_validate_simple_attributes(item, rval)
def _resolve_item_capture_error(item):
try:
return _resolve_item(item)
except Exception as e:
rval = {"error_message": str(e)}
rval = _copy_and_validate_simple_attributes(item, rval)
failed_elements.append(rval)
return rval
if expansion_error is None:
elements = elements_tree_map(_resolve_item_capture_error, items)
if is_collection and not upload_config.allow_failed_collections and len(failed_elements) > 0:
element_error = "Failed to fetch collection element(s):\n"
for failed_element in failed_elements:
element_error += f"\n- {failed_element['error_message']}"
fetched_target["error_message"] = element_error
fetched_target["elements"] = None
else:
fetched_target["elements"] = elements
else:
fetched_target["elements"] = []
fetched_target["error_message"] = expansion_error
return fetched_target
def _bagit_to_items(directory):
bdbag.bdbag_api.resolve_fetch(directory)
bdbag.bdbag_api.validate_bag(directory)
items = _directory_to_items(os.path.join(directory, "data"))
return items
def _decompress_target(upload_config: "UploadConfig", target):
elements_from_name, elements_from_path = _has_src_to_path(upload_config, target, is_dataset=False)
# by default Galaxy will check for a directory with a single file and interpret that
# as the new root for expansion, this is a good user experience for uploading single
# files in a archive but not great from an API perspective. Allow disabling by setting
# fuzzy_root to False to literally interpret the target.
fuzzy_root = target.get("fuzzy_root", True)
temp_directory = os.path.abspath(tempfile.mkdtemp(prefix=elements_from_name, dir=upload_config.working_directory))
cf = CompressedFile(elements_from_path)
result = cf.extract(temp_directory)
return result if fuzzy_root else temp_directory
[docs]def elements_tree_map(f, items):
new_items = []
for item in items:
if "elements" in item:
new_item = item.copy()
new_item["elements"] = elements_tree_map(f, item["elements"])
new_items.append(new_item)
else:
new_items.append(f(item))
return new_items
def _directory_to_items(directory):
items: List[Dict[str, Any]] = []
dir_elements: Dict[str, Any] = {}
for root, dirs, files in os.walk(directory):
if root in dir_elements:
target = dir_elements[root]
else:
target = items
for dir in sorted(dirs):
dir_dict = {"name": dir, "elements": []}
dir_elements[os.path.join(root, dir)] = dir_dict["elements"]
target.append(dir_dict)
for file in sorted(files):
target.append({"src": "path", "path": os.path.join(root, file)})
return items
def _has_src_to_name(item) -> Optional[str]:
# Logic should broadly match logic of _has_src_to_path but not resolve the item
# into a path.
name = item.get("name")
src = item.get("src")
if src == "url":
url = item.get("url")
if name is None:
name = url.split("/")[-1]
elif src == "path":
path = item["path"]
if name is None:
name = os.path.basename(path)
return name
def _has_src_to_path(upload_config, item, is_dataset=False) -> Tuple[str, str]:
assert "src" in item, item
src = item.get("src")
name = item.get("name")
if src == "url":
url = item.get("url")
try:
path = stream_url_to_file(url, file_sources=upload_config.file_sources, dir=upload_config.working_directory)
except Exception as e:
raise Exception(f"Failed to fetch url {url}. {str(e)}")
if not is_dataset:
# Actual target dataset will validate and put results in dict
# that gets passed back to Galaxy.
for hash_function in HASH_NAMES:
hash_value = item.get(hash_function)
if hash_value:
_handle_hash_validation(upload_config, hash_function, hash_value, path)
if name is None:
name = url.split("/")[-1]
elif src == "pasted":
path = stream_to_file(StringIO(item["paste_content"]), dir=upload_config.working_directory)
if name is None:
name = "Pasted Entry"
else:
assert src == "path"
path = item["path"]
if name is None:
name = os.path.basename(path)
return name, path
def _handle_hash_validation(upload_config, hash_function, hash_value, path):
if upload_config.validate_hashes:
calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path)
if calculated_hash_value != hash_value:
raise Exception(
f"Failed to validate upload with [{hash_function}] - expected [{hash_value}] got [{calculated_hash_value}]"
)
def _arg_parser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("--galaxy-root")
parser.add_argument("--datatypes-registry")
parser.add_argument("--request-version")
parser.add_argument("--request")
parser.add_argument("--working-directory")
return parser
[docs]def get_file_sources(working_directory, file_sources_as_dict=None):
from galaxy.files import ConfiguredFileSources
file_sources = None
if file_sources_as_dict is None:
file_sources_path = os.path.join(working_directory, "file_sources.json")
if os.path.exists(file_sources_path):
file_sources_as_dict = None
with open(file_sources_path) as f:
file_sources_as_dict = json.load(f)
if file_sources_as_dict is not None:
file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict)
if file_sources is None:
ConfiguredFileSources.from_dict(None)
return file_sources
[docs]class UploadConfig:
[docs] def __init__(
self,
request,
registry,
working_directory,
allow_failed_collections,
file_sources_dict=None,
):
self.registry = registry
self.working_directory = working_directory
self.allow_failed_collections = allow_failed_collections
self.check_content = request.get("check_content", True)
self.to_posix_lines = request.get("to_posix_lines", False)
self.space_to_tab = request.get("space_to_tab", False)
self.auto_decompress = request.get("auto_decompress", False)
self.validate_hashes = request.get("validate_hashes", False)
self.deferred = request.get("deferred", False)
self.link_data_only = _link_data_only(request)
self.file_sources_dict = file_sources_dict
self._file_sources = None
self.__workdir = os.path.abspath(working_directory)
self.__upload_count = 0
@property
def file_sources(self):
if self._file_sources is None:
self._file_sources = get_file_sources(self.working_directory, file_sources_as_dict=self.file_sources_dict)
return self._file_sources
[docs] def get_option(self, item, key):
"""Return item[key] if specified otherwise use default from UploadConfig.
This default represents the default for the whole request instead item which
is the option for individual files.
"""
if key in item:
return item[key]
else:
return getattr(self, key)
def __new_dataset_path(self):
path = os.path.join(self.working_directory, f"gxupload_{self.__upload_count}")
self.__upload_count += 1
return path
[docs] def ensure_in_working_directory(self, path, purge_source, in_place):
if in_directory(path, self.__workdir):
return path
new_path = self.__new_dataset_path()
if purge_source:
try:
shutil.move(path, new_path)
# Drop .info file if it exists
try:
os.remove(f"{path}.info")
except FileNotFoundError:
pass
except OSError as e:
# We may not have permission to remove converted_path
if e.errno != errno.EACCES:
raise
else:
shutil.copy(path, new_path)
return new_path
def _link_data_only(has_config_dict):
link_data_only = has_config_dict.get("link_data_only", False)
if not isinstance(link_data_only, bool):
# Allow the older string values of 'copy_files' and 'link_to_files'
link_data_only = link_data_only == "copy_files"
return link_data_only
def _for_each_src(f, obj):
if isinstance(obj, list):
for item in obj:
_for_each_src(f, item)
if isinstance(obj, dict):
if "src" in obj:
f(obj)
for value in obj.values():
_for_each_src(f, value)
if __name__ == "__main__":
main()