Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.s3fs

import logging
from typing import (
    Optional,
    Union,
)

from galaxy import exceptions
from galaxy.files.models import (
    AnyRemoteEntry,
    FilesSourceRuntimeContext,
)
from galaxy.files.sources._fsspec import (
    CacheOptionsDictType,
    FsspecBaseFileSourceConfiguration,
    FsspecBaseFileSourceTemplateConfiguration,
    FsspecFilesSource,
)
from galaxy.util.config_templates import TemplateExpansion

try:
    from s3fs import S3FileSystem
except ImportError:
    S3FileSystem = None


DEFAULT_ENFORCE_SYMLINK_SECURITY = True
DEFAULT_DELETE_ON_REALIZE = False
REQUIRED_PACKAGE = FS_PLUGIN_TYPE = "s3fs"

log = logging.getLogger(__name__)


class S3FSFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration):
    anon: Union[bool, TemplateExpansion] = False
    endpoint_url: Union[str, TemplateExpansion, None] = None
    bucket: Union[str, TemplateExpansion, None] = None
    secret: Union[str, TemplateExpansion, None] = None
    key: Union[str, TemplateExpansion, None] = None


class S3FSFileSourceConfiguration(FsspecBaseFileSourceConfiguration):
    anon: bool = False
    endpoint_url: Optional[str] = None
    bucket: Optional[str] = None
    secret: Optional[str] = None
    key: Optional[str] = None


[docs] class S3FsFilesSource(FsspecFilesSource[S3FSFileSourceTemplateConfiguration, S3FSFileSourceConfiguration]): plugin_type = FS_PLUGIN_TYPE required_module = S3FileSystem required_package = REQUIRED_PACKAGE template_config_class = S3FSFileSourceTemplateConfiguration resolved_config_class = S3FSFileSourceConfiguration def _open_fs( self, context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration], cache_options: CacheOptionsDictType, ): if S3FileSystem is None: raise self.required_package_exception config = context.config client_kwargs = {"endpoint_url": config.endpoint_url} if config.endpoint_url else None fs = S3FileSystem( anon=config.anon, key=config.key, secret=config.secret, client_kwargs=client_kwargs, **cache_options, ) return fs def _to_bucket_path(self, path: str, config: S3FSFileSourceConfiguration) -> str: """Adapt the path to the S3 bucket format.""" if path.startswith("s3://"): return path.replace("s3://", "") bucket = config.bucket if not bucket and not path.startswith("s3://"): raise exceptions.MessageException("Bucket name is required for S3FsFilesSource.") return self._bucket_path(bucket or "", path) def _adapt_entry_path(self, filesystem_path: str) -> str: """Remove the S3 bucket name from the filesystem path.""" if self.template_config.bucket: bucket_prefix = f"{self.template_config.bucket}/" return filesystem_path.replace(bucket_prefix, "", 1) return filesystem_path def _list( self, context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration], path="/", recursive=False, write_intent: bool = False, limit: Optional[int] = None, offset: Optional[int] = None, query: Optional[str] = None, sort_by: Optional[str] = None, ) -> tuple[list[AnyRemoteEntry], int]: bucket_path = self._to_bucket_path(path, context.config) return super()._list( context=context, path=bucket_path, recursive=recursive, limit=limit, offset=offset, query=query, sort_by=sort_by, ) def _realize_to( self, source_path: str, native_path: str, context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration] ): bucket_path = self._to_bucket_path(source_path, context.config) super()._realize_to(source_path=bucket_path, native_path=native_path, context=context) def _write_from( self, target_path: str, native_path: str, context: FilesSourceRuntimeContext[S3FSFileSourceConfiguration] ): bucket_path = self._to_bucket_path(target_path, context.config) super()._write_from(target_path=bucket_path, native_path=native_path, context=context) def _bucket_path(self, bucket_name: str, path: str): if path.startswith("s3://"): return path.replace("s3://", "") elif not path.startswith("/"): path = f"/{path}" return f"{bucket_name}{path}"
[docs] def score_url_match(self, url: str): # We need to use template_config here because this is called before the template is expanded. bucket_name = self.template_config.bucket # For security, we need to ensure that a partial match doesn't work. e.g. s3://{bucket}something/myfiles if bucket_name and (url.startswith(f"s3://{bucket_name}/") or url == f"s3://{bucket_name}"): return len(f"s3://{bucket_name}") elif not bucket_name and url.startswith("s3://"): return len("s3://") else: return super().score_url_match(url)
__all__ = ("S3FsFilesSource",)