Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.s3fs
import functools
import logging
import os
from typing import (
Any,
Dict,
List,
)
try:
import s3fs
except ImportError:
s3fs = None
from ..sources import BaseFilesSource
DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
log = logging.getLogger(__name__)
[docs]class S3FsFilesSource(BaseFilesSource):
plugin_type = "s3fs"
[docs] def __init__(self, **kwd):
if s3fs is None:
raise Exception("Package s3fs unavailable but required for this file source plugin.")
props = self._parse_common_config_opts(kwd)
self._bucket = props.pop("bucket", "")
self._endpoint_url = props.pop("endpoint_url", None)
assert self._endpoint_url or self._bucket
self._props = props
def _list(self, path="/", recursive=True, user_context=None):
fs = self._open_fs(user_context=user_context)
if recursive:
res: List[Dict[str, Any]] = []
bucket_path = self._bucket_path(path)
for p, dirs, files in fs.walk(bucket_path, detail=True):
to_dict = functools.partial(self._resource_info_to_dict, p)
res.extend(map(to_dict, dirs.values()))
res.extend(map(to_dict, files.values()))
return res
else:
bucket_path = self._bucket_path(path)
res = fs.ls(bucket_path, detail=True)
to_dict = functools.partial(self._resource_info_to_dict, path)
return list(map(to_dict, res))
def _realize_to(self, source_path, native_path, user_context=None):
bucket_path = self._bucket_path(source_path)
self._open_fs(user_context=user_context).download(bucket_path, native_path)
def _write_from(self, target_path, native_path, user_context=None):
raise NotImplementedError()
def _bucket_path(self, path):
if not path.startswith("/"):
path = f"/{path}"
return f"{self._bucket}{path}"
def _open_fs(self, user_context=None):
if self._endpoint_url:
self._props.update({"client_kwargs": {"endpoint_url": self._endpoint_url}})
fs = s3fs.S3FileSystem(**self._props)
return fs
def _resource_info_to_dict(self, dir_path, resource_info):
name = os.path.basename(resource_info["name"])
path = os.path.join(dir_path, name)
uri = self.uri_from_path(path)
if resource_info["type"] == "directory":
return {"class": "Directory", "name": name, "uri": uri, "path": path}
else:
return {
"class": "File",
"name": name,
"size": resource_info["size"],
# should this be mtime...
"ctime": self.to_dict_time(resource_info["LastModified"]),
"uri": uri,
"path": path,
}
def _serialization_props(self, user_context=None):
effective_props = {}
for key, val in self._props.items():
effective_props[key] = self._evaluate_prop(val, user_context=user_context)
effective_props["bucket"] = self._bucket
return effective_props
__all__ = ("S3FsFilesSource",)