This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.s3fs

import functools
import logging
import os
from typing import Any, Dict, List

    import s3fs
except ImportError:
    s3fs = None

from ..sources import BaseFilesSource


log = logging.getLogger(__name__)

[docs]class S3FsFilesSource(BaseFilesSource): plugin_type = 's3fs'
[docs] def __init__(self, **kwd): if s3fs is None: raise Exception("Package s3fs unavailable but required for this file source plugin.") props = self._parse_common_config_opts(kwd) self._bucket = props.pop("bucket", '') self._endpoint_url = props.pop('endpoint_url', None) assert self._endpoint_url or self._bucket self._props = props
def _list(self, path="/", recursive=True, user_context=None): fs = self._open_fs(user_context=user_context) if recursive: res: List[Dict[str, Any]] = [] bucket_path = self._bucket_path(path) for p, dirs, files in fs.walk(bucket_path, detail=True): to_dict = functools.partial(self._resource_info_to_dict, p) res.extend(map(to_dict, dirs.values())) res.extend(map(to_dict, files.values())) return res else: bucket_path = self._bucket_path(path) res = fs.ls(bucket_path, detail=True) to_dict = functools.partial(self._resource_info_to_dict, path) return list(map(to_dict, res)) def _realize_to(self, source_path, native_path, user_context=None): bucket_path = self._bucket_path(source_path) self._open_fs(user_context=user_context).download(bucket_path, native_path) def _write_from(self, target_path, native_path, user_context=None): raise NotImplementedError() def _bucket_path(self, path): if not path.startswith("/"): path = f"/{path}" return f"{self._bucket}{path}" def _open_fs(self, user_context=None): if self._endpoint_url: self._props.update({'client_kwargs': {'endpoint_url': self._endpoint_url}}) fs = s3fs.S3FileSystem(**self._props) return fs def _resource_info_to_dict(self, dir_path, resource_info): name = os.path.basename(resource_info["name"]) path = os.path.join(dir_path, name) uri = self.uri_from_path(path) if resource_info["type"] == "directory": return {"class": "Directory", "name": name, "uri": uri, "path": path} else: return { "class": "File", "name": name, "size": resource_info["size"], # should this be mtime... "ctime": self.to_dict_time(resource_info["LastModified"]), "uri": uri, "path": path, } def _serialization_props(self, user_context=None): effective_props = {} for key, val in self._props.items(): effective_props[key] = self._evaluate_prop(val, user_context=user_context) effective_props["bucket"] = self._bucket return effective_props
__all__ = ('S3FsFilesSource',)