Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.s3fs
import logging
from typing import (
Optional,
Union,
)
from galaxy import exceptions
from galaxy.files.models import (
AnyRemoteEntry,
FilesSourceRuntimeContext,
)
from galaxy.files.sources._fsspec import (
CacheOptionsDictType,
FsspecBaseFileSourceConfiguration,
FsspecBaseFileSourceTemplateConfiguration,
FsspecFilesSource,
)
from galaxy.util.config_templates import TemplateExpansion
try:
from s3fs import S3FileSystem
except ImportError:
S3FileSystem = None
DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
REQUIRED_PACKAGE = FS_PLUGIN_TYPE = "s3fs"
log = logging.getLogger(__name__)
class S3FSFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration):
anon: Union[bool, TemplateExpansion] = False
endpoint_url: Union[str, TemplateExpansion, None] = None
bucket: Union[str, TemplateExpansion, None] = None
secret: Union[str, TemplateExpansion, None] = None
key: Union[str, TemplateExpansion, None] = None
class S3FSFileSourceConfiguration(FsspecBaseFileSourceConfiguration):
anon: bool = False
endpoint_url: Optional[str] = None
bucket: Optional[str] = None
secret: Optional[str] = None
key: Optional[str] = None
[docs]
class S3FsFilesSource(FsspecFilesSource[S3FSFileSourceTemplateConfiguration, S3FSFileSourceConfiguration]):
plugin_type = FS_PLUGIN_TYPE
required_module = S3FileSystem
required_package = REQUIRED_PACKAGE
template_config_class = S3FSFileSourceTemplateConfiguration
resolved_config_class = S3FSFileSourceConfiguration
def _open_fs(
self,
context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration],
cache_options: CacheOptionsDictType,
):
if S3FileSystem is None:
raise self.required_package_exception
config = context.config
client_kwargs = {"endpoint_url": config.endpoint_url} if config.endpoint_url else None
fs = S3FileSystem(
anon=config.anon,
key=config.key,
secret=config.secret,
client_kwargs=client_kwargs,
**cache_options,
)
return fs
def _to_bucket_path(self, path: str, config: S3FSFileSourceConfiguration) -> str:
"""Adapt the path to the S3 bucket format."""
if path.startswith("s3://"):
return path.replace("s3://", "")
bucket = config.bucket
if not bucket and not path.startswith("s3://"):
raise exceptions.MessageException("Bucket name is required for S3FsFilesSource.")
return self._bucket_path(bucket or "", path)
def _adapt_entry_path(self, filesystem_path: str) -> str:
"""Remove the S3 bucket name from the filesystem path."""
if self.template_config.bucket:
bucket_prefix = f"{self.template_config.bucket}/"
return filesystem_path.replace(bucket_prefix, "", 1)
return filesystem_path
def _list(
self,
context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration],
path="/",
recursive=False,
write_intent: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Optional[str] = None,
sort_by: Optional[str] = None,
) -> tuple[list[AnyRemoteEntry], int]:
bucket_path = self._to_bucket_path(path, context.config)
return super()._list(
context=context,
path=bucket_path,
recursive=recursive,
limit=limit,
offset=offset,
query=query,
sort_by=sort_by,
)
def _realize_to(
self, source_path: str, native_path: str, context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration]
):
bucket_path = self._to_bucket_path(source_path, context.config)
super()._realize_to(source_path=bucket_path, native_path=native_path, context=context)
def _write_from(
self, target_path: str, native_path: str, context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration]
):
bucket_path = self._to_bucket_path(target_path, context.config)
super()._write_from(target_path=bucket_path, native_path=native_path, context=context)
def _bucket_path(self, bucket_name: str, path: str):
if path.startswith("s3://"):
return path.replace("s3://", "")
elif not path.startswith("/"):
path = f"/{path}"
return f"{bucket_name}{path}"
[docs]
def score_url_match(self, url: str):
# We need to use template_config here because this is called before the template is expanded.
bucket_name = self.template_config.bucket
# For security, we need to ensure that a partial match doesn't work. e.g. s3://{bucket}something/myfiles
if bucket_name and (url.startswith(f"s3://{bucket_name}/") or url == f"s3://{bucket_name}"):
return len(f"s3://{bucket_name}")
elif not bucket_name and url.startswith("s3://"):
return len("s3://")
else:
return super().score_url_match(url)
__all__ = ("S3FsFilesSource",)