Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.s3fs
import functools
import logging
import os
from typing import (
Any,
cast,
Dict,
List,
Optional,
)
from typing_extensions import Unpack
from . import (
FilesSourceOptions,
FilesSourceProperties,
)
try:
import s3fs
except ImportError:
s3fs = None
from . import BaseFilesSource
DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
log = logging.getLogger(__name__)
class S3FsFilesSourceProperties(FilesSourceProperties, total=False):
bucket: str
endpoint_url: int
user: str
passwd: str
client_kwargs: dict # internally computed. Should not be specified in config file
[docs]class S3FsFilesSource(BaseFilesSource):
plugin_type = "s3fs"
[docs] def __init__(self, **kwd: Unpack[S3FsFilesSourceProperties]):
if s3fs is None:
raise Exception("Package s3fs unavailable but required for this file source plugin.")
props: S3FsFilesSourceProperties = cast(S3FsFilesSourceProperties, self._parse_common_config_opts(kwd))
# There is a possibility that the bucket name could be parameterized: e.g.
# bucket: ${user.preferences['generic_s3|bucket']}
# that's ok, because we evaluate the bucket name again later. The bucket property here will only
# be used by `score_url_match`. In the case of a parameterized bucket name, we will always return
# a score of 4 as the url will only match the s3:// part.
self._bucket = props.get("bucket", "")
self._endpoint_url = props.pop("endpoint_url", None)
self._props = props
if self._endpoint_url:
self._props.update({"client_kwargs": {"endpoint_url": self._endpoint_url}})
def _list(self, path="/", recursive=True, user_context=None, opts: Optional[FilesSourceOptions] = None):
_props = self._serialization_props(user_context)
# we need to pop the 'bucket' here, because the argument is not recognised in a downstream function
_bucket_name = _props.pop("bucket", "")
fs = self._open_fs(props=_props, opts=opts)
if recursive:
res: List[Dict[str, Any]] = []
bucket_path = self._bucket_path(_bucket_name, path)
for p, dirs, files in fs.walk(bucket_path, detail=True):
to_dict = functools.partial(self._resource_info_to_dict, p)
res.extend(map(to_dict, dirs.values()))
res.extend(map(to_dict, files.values()))
return res
else:
bucket_path = self._bucket_path(_bucket_name, path)
res = fs.ls(bucket_path, detail=True)
to_dict = functools.partial(self._resource_info_to_dict, path)
return list(map(to_dict, res))
def _realize_to(self, source_path, native_path, user_context=None, opts: Optional[FilesSourceOptions] = None):
_props = self._serialization_props(user_context)
_bucket_name = _props.pop("bucket", "")
fs = self._open_fs(props=_props, opts=opts)
bucket_path = self._bucket_path(_bucket_name, source_path)
fs.download(bucket_path, native_path)
def _write_from(self, target_path, native_path, user_context=None, opts: Optional[FilesSourceOptions] = None):
_props = self._serialization_props(user_context)
_bucket_name = _props.pop("bucket", "")
fs = self._open_fs(props=_props, opts=opts)
bucket_path = self._bucket_path(_bucket_name, target_path)
fs.upload(native_path, bucket_path)
def _bucket_path(self, bucket_name: str, path: str):
if path.startswith("s3://"):
return path.replace("s3://", "")
elif not path.startswith("/"):
path = f"/{path}"
return f"{bucket_name}{path}"
def _open_fs(self, props: FilesSourceProperties, opts: Optional[FilesSourceOptions] = None):
extra_props = opts.extra_props or {} if opts else {}
fs = s3fs.S3FileSystem(**{**props, **extra_props})
return fs
def _resource_info_to_dict(self, dir_path: str, resource_info):
name = os.path.basename(resource_info["name"])
path = os.path.join(dir_path, name)
uri = self.uri_from_path(path)
if resource_info["type"] == "directory":
return {"class": "Directory", "name": name, "uri": uri, "path": path}
else:
return {
"class": "File",
"name": name,
"size": resource_info["size"],
# should this be mtime...
"ctime": self.to_dict_time(resource_info["LastModified"]),
"uri": uri,
"path": path,
}
def _serialization_props(self, user_context=None) -> S3FsFilesSourceProperties:
effective_props = {}
for key, val in self._props.items():
effective_props[key] = self._evaluate_prop(val, user_context=user_context)
return cast(S3FsFilesSourceProperties, effective_props)
[docs] def score_url_match(self, url: str):
# For security, we need to ensure that a partial match doesn't work. e.g. s3://{bucket}something/myfiles
if self._bucket and (url.startswith(f"s3://{self._bucket}/") or url == f"s3://{self._bucket}"):
return len(f"s3://{self._bucket}")
elif not self._bucket and url.startswith("s3://"):
return len("s3://")
else:
return super().score_url_match(url)
__all__ = ("S3FsFilesSource",)