Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.googlecloudstorage
import logging
from typing import (
Optional,
Union,
)
from galaxy.files.models import (
AnyRemoteEntry,
FilesSourceRuntimeContext,
)
from galaxy.files.sources._fsspec import (
CacheOptionsDictType,
FsspecBaseFileSourceConfiguration,
FsspecBaseFileSourceTemplateConfiguration,
FsspecFilesSource,
)
from galaxy.util.config_templates import TemplateExpansion
try:
from gcsfs import GCSFileSystem
except ImportError:
GCSFileSystem = None
REQUIRED_PACKAGE = "gcsfs"
log = logging.getLogger(__name__)
class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration):
bucket_name: Union[str, TemplateExpansion]
root_path: Union[str, TemplateExpansion, None] = None
project: Union[str, TemplateExpansion, None] = None
anonymous: Union[bool, TemplateExpansion, None] = True
service_account_json: Union[str, TemplateExpansion, None] = None
# OAuth credentials
client_id: Union[str, TemplateExpansion, None] = None
client_secret: Union[str, TemplateExpansion, None] = None
token: Union[str, TemplateExpansion, None] = None
refresh_token: Union[str, TemplateExpansion, None] = None
token_uri: Union[str, TemplateExpansion, None] = "https://oauth2.googleapis.com/token"
class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguration):
bucket_name: str
root_path: Optional[str] = None
project: Optional[str] = None
anonymous: Optional[bool] = True
service_account_json: Optional[str] = None
# OAuth credentials
client_id: Optional[str] = None
client_secret: Optional[str] = None
token: Optional[str] = None
refresh_token: Optional[str] = None
token_uri: Optional[str] = "https://oauth2.googleapis.com/token"
[docs]
class GoogleCloudStorageFilesSource(
FsspecFilesSource[GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration]
):
plugin_type = "googlecloudstorage"
required_module = GCSFileSystem
required_package = REQUIRED_PACKAGE
template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration
resolved_config_class = GoogleCloudStorageFileSourceConfiguration
def _open_fs(
self,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
cache_options: CacheOptionsDictType,
):
if GCSFileSystem is None:
raise self.required_package_exception
config = context.config
token: Union[str, dict[str, Optional[str]], None]
if config.anonymous:
# Use token='anon' for anonymous access to public buckets
token = "anon"
elif config.service_account_json:
# Path to service account JSON file
token = config.service_account_json
elif config.token:
# OAuth credentials passed as a dictionary
token = {
"access_token": config.token,
"refresh_token": config.refresh_token,
"client_id": config.client_id,
"client_secret": config.client_secret,
"token_uri": config.token_uri,
}
else:
# Default: use application default credentials
token = None
fs = GCSFileSystem(
project=config.project,
token=token,
**cache_options,
)
return fs
def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfiguration) -> str:
"""Adapt the path to the GCS bucket format, including root_path if configured."""
bucket = config.bucket_name
root = (config.root_path or "").strip("/")
if path.startswith("/"):
path = path[1:]
# Build path: bucket / root_path / path
if root and path:
return f"{bucket}/{root}/{path}"
elif root:
return f"{bucket}/{root}"
elif path:
return f"{bucket}/{path}"
return bucket
def _adapt_entry_path(self, filesystem_path: str) -> str:
"""Remove the GCS bucket name and root_path from the filesystem path."""
if self.template_config.bucket_name:
bucket = self.template_config.bucket_name
root = (self.template_config.root_path or "").strip("/")
full_prefix = f"{bucket}/{root}" if root else bucket
if filesystem_path == full_prefix:
return "/"
return "/" + filesystem_path.removeprefix(f"{full_prefix}/")
return "/" + filesystem_path
def _list(
self,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
path="/",
recursive=False,
write_intent: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Optional[str] = None,
sort_by: Optional[str] = None,
) -> tuple[list[AnyRemoteEntry], int]:
bucket_path = self._to_bucket_path(path, context.config)
return super()._list(
context=context,
path=bucket_path,
recursive=recursive,
limit=limit,
offset=offset,
query=query,
sort_by=sort_by,
)
def _realize_to(
self,
source_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
bucket_path = self._to_bucket_path(source_path, context.config)
super()._realize_to(source_path=bucket_path, native_path=native_path, context=context)
def _write_from(
self,
target_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
bucket_path = self._to_bucket_path(target_path, context.config)
super()._write_from(target_path=bucket_path, native_path=native_path, context=context)
[docs]
def score_url_match(self, url: str):
bucket_name = self.template_config.bucket_name
# For security, we need to ensure that a partial match doesn't work
if bucket_name and (url.startswith(f"gs://{bucket_name}/") or url == f"gs://{bucket_name}"):
return len(f"gs://{bucket_name}")
elif bucket_name and (url.startswith(f"gcs://{bucket_name}/") or url == f"gcs://{bucket_name}"):
return len(f"gcs://{bucket_name}")
else:
return super().score_url_match(url)
__all__ = ("GoogleCloudStorageFilesSource",)