Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.s3fs

import functools
import logging
import os
from typing import (
    Any,
    cast,
    Dict,
    List,
    Optional,
)

from typing_extensions import Unpack

from . import (
    FilesSourceOptions,
    FilesSourceProperties,
)

try:
    import s3fs
except ImportError:
    s3fs = None

from . import BaseFilesSource

DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False

log = logging.getLogger(__name__)


class S3FsFilesSourceProperties(FilesSourceProperties, total=False):
    bucket: str
    endpoint_url: int
    user: str
    passwd: str
    client_kwargs: dict  # internally computed. Should not be specified in config file


[docs]class S3FsFilesSource(BaseFilesSource): plugin_type = "s3fs"
[docs] def __init__(self, **kwd: Unpack[S3FsFilesSourceProperties]): if s3fs is None: raise Exception("Package s3fs unavailable but required for this file source plugin.") props: S3FsFilesSourceProperties = cast(S3FsFilesSourceProperties, self._parse_common_config_opts(kwd)) # There is a possibility that the bucket name could be parameterized: e.g. # bucket: ${user.preferences['generic_s3|bucket']} # that's ok, because we evaluate the bucket name again later. The bucket property here will only # be used by `score_url_match`. In the case of a parameterized bucket name, we will always return # a score of 4 as the url will only match the s3:// part. self._bucket = props.get("bucket", "") self._endpoint_url = props.pop("endpoint_url", None) self._props = props if self._endpoint_url: self._props.update({"client_kwargs": {"endpoint_url": self._endpoint_url}})
def _list(self, path="/", recursive=True, user_context=None, opts: Optional[FilesSourceOptions] = None): _props = self._serialization_props(user_context) # we need to pop the 'bucket' here, because the argument is not recognised in a downstream function _bucket_name = _props.pop("bucket", "") fs = self._open_fs(props=_props, opts=opts) if recursive: res: List[Dict[str, Any]] = [] bucket_path = self._bucket_path(_bucket_name, path) for p, dirs, files in fs.walk(bucket_path, detail=True): to_dict = functools.partial(self._resource_info_to_dict, p) res.extend(map(to_dict, dirs.values())) res.extend(map(to_dict, files.values())) return res else: bucket_path = self._bucket_path(_bucket_name, path) res = fs.ls(bucket_path, detail=True) to_dict = functools.partial(self._resource_info_to_dict, path) return list(map(to_dict, res)) def _realize_to(self, source_path, native_path, user_context=None, opts: Optional[FilesSourceOptions] = None): _props = self._serialization_props(user_context) _bucket_name = _props.pop("bucket", "") fs = self._open_fs(props=_props, opts=opts) bucket_path = self._bucket_path(_bucket_name, source_path) fs.download(bucket_path, native_path) def _write_from(self, target_path, native_path, user_context=None, opts: Optional[FilesSourceOptions] = None): _props = self._serialization_props(user_context) _bucket_name = _props.pop("bucket", "") fs = self._open_fs(props=_props, opts=opts) bucket_path = self._bucket_path(_bucket_name, target_path) fs.upload(native_path, bucket_path) def _bucket_path(self, bucket_name: str, path: str): if path.startswith("s3://"): return path.replace("s3://", "") elif not path.startswith("/"): path = f"/{path}" return f"{bucket_name}{path}" def _open_fs(self, props: FilesSourceProperties, opts: Optional[FilesSourceOptions] = None): extra_props = opts.extra_props or {} if opts else {} fs = s3fs.S3FileSystem(**{**props, **extra_props}) return fs def _resource_info_to_dict(self, dir_path: str, resource_info): name = os.path.basename(resource_info["name"]) path = os.path.join(dir_path, name) uri = self.uri_from_path(path) if resource_info["type"] == "directory": return {"class": "Directory", "name": name, "uri": uri, "path": path} else: return { "class": "File", "name": name, "size": resource_info["size"], # should this be mtime... "ctime": self.to_dict_time(resource_info["LastModified"]), "uri": uri, "path": path, } def _serialization_props(self, user_context=None) -> S3FsFilesSourceProperties: effective_props = {} for key, val in self._props.items(): effective_props[key] = self._evaluate_prop(val, user_context=user_context) return cast(S3FsFilesSourceProperties, effective_props)
[docs] def score_url_match(self, url: str): # For security, we need to ensure that a partial match doesn't work. e.g. s3://{bucket}something/myfiles if self._bucket and (url.startswith(f"s3://{self._bucket}/") or url == f"s3://{self._bucket}"): return len(f"s3://{self._bucket}") elif not self._bucket and url.startswith("s3://"): return len("s3://") else: return super().score_url_match(url)
__all__ = ("S3FsFilesSource",)