Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.googlecloudstorage

import logging
from typing import (
    Optional,
    Union,
)

from galaxy.files.models import (
    AnyRemoteEntry,
    FilesSourceRuntimeContext,
)
from galaxy.files.sources._fsspec import (
    CacheOptionsDictType,
    FsspecBaseFileSourceConfiguration,
    FsspecBaseFileSourceTemplateConfiguration,
    FsspecFilesSource,
)
from galaxy.util.config_templates import TemplateExpansion

try:
    from gcsfs import GCSFileSystem
except ImportError:
    GCSFileSystem = None


REQUIRED_PACKAGE = "gcsfs"

log = logging.getLogger(__name__)


class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration):
    bucket_name: Union[str, TemplateExpansion]
    root_path: Union[str, TemplateExpansion, None] = None
    project: Union[str, TemplateExpansion, None] = None
    anonymous: Union[bool, TemplateExpansion, None] = True
    service_account_json: Union[str, TemplateExpansion, None] = None
    # OAuth credentials
    client_id: Union[str, TemplateExpansion, None] = None
    client_secret: Union[str, TemplateExpansion, None] = None
    token: Union[str, TemplateExpansion, None] = None
    refresh_token: Union[str, TemplateExpansion, None] = None
    token_uri: Union[str, TemplateExpansion, None] = "https://oauth2.googleapis.com/token"


class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguration):
    bucket_name: str
    root_path: Optional[str] = None
    project: Optional[str] = None
    anonymous: Optional[bool] = True
    service_account_json: Optional[str] = None
    # OAuth credentials
    client_id: Optional[str] = None
    client_secret: Optional[str] = None
    token: Optional[str] = None
    refresh_token: Optional[str] = None
    token_uri: Optional[str] = "https://oauth2.googleapis.com/token"


[docs] class GoogleCloudStorageFilesSource( FsspecFilesSource[GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration] ): plugin_type = "googlecloudstorage" required_module = GCSFileSystem required_package = REQUIRED_PACKAGE template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration resolved_config_class = GoogleCloudStorageFileSourceConfiguration def _open_fs( self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], cache_options: CacheOptionsDictType, ): if GCSFileSystem is None: raise self.required_package_exception config = context.config token: Union[str, dict[str, Optional[str]], None] if config.anonymous: # Use token='anon' for anonymous access to public buckets token = "anon" elif config.service_account_json: # Path to service account JSON file token = config.service_account_json elif config.token: # OAuth credentials passed as a dictionary token = { "access_token": config.token, "refresh_token": config.refresh_token, "client_id": config.client_id, "client_secret": config.client_secret, "token_uri": config.token_uri, } else: # Default: use application default credentials token = None fs = GCSFileSystem( project=config.project, token=token, **cache_options, ) return fs def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfiguration) -> str: """Adapt the path to the GCS bucket format, including root_path if configured.""" bucket = config.bucket_name root = (config.root_path or "").strip("/") if path.startswith("/"): path = path[1:] # Build path: bucket / root_path / path if root and path: return f"{bucket}/{root}/{path}" elif root: return f"{bucket}/{root}" elif path: return f"{bucket}/{path}" return bucket def _adapt_entry_path(self, filesystem_path: str) -> str: """Remove the GCS bucket name and root_path from the filesystem path.""" if self.template_config.bucket_name: bucket = self.template_config.bucket_name root = (self.template_config.root_path or "").strip("/") full_prefix = f"{bucket}/{root}" if root else bucket if filesystem_path == full_prefix: return "/" return "/" + filesystem_path.removeprefix(f"{full_prefix}/") return "/" + filesystem_path def _list( self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], path="/", recursive=False, write_intent: bool = False, limit: Optional[int] = None, offset: Optional[int] = None, query: Optional[str] = None, sort_by: Optional[str] = None, ) -> tuple[list[AnyRemoteEntry], int]: bucket_path = self._to_bucket_path(path, context.config) return super()._list( context=context, path=bucket_path, recursive=recursive, limit=limit, offset=offset, query=query, sort_by=sort_by, ) def _realize_to( self, source_path: str, native_path: str, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], ): bucket_path = self._to_bucket_path(source_path, context.config) super()._realize_to(source_path=bucket_path, native_path=native_path, context=context) def _write_from( self, target_path: str, native_path: str, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], ): bucket_path = self._to_bucket_path(target_path, context.config) super()._write_from(target_path=bucket_path, native_path=native_path, context=context)
[docs] def score_url_match(self, url: str): bucket_name = self.template_config.bucket_name # For security, we need to ensure that a partial match doesn't work if bucket_name and (url.startswith(f"gs://{bucket_name}/") or url == f"gs://{bucket_name}"): return len(f"gs://{bucket_name}") elif bucket_name and (url.startswith(f"gcs://{bucket_name}/") or url == f"gcs://{bucket_name}"): return len(f"gcs://{bucket_name}") else: return super().score_url_match(url)
__all__ = ("GoogleCloudStorageFilesSource",)