try:
from fs_gcsfs import GCSFS
from google.cloud.storage import Client
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
except ImportError:
GCSFS = None
import os
from typing import (
Optional,
Union,
)
from galaxy.files.models import (
AnyRemoteEntry,
BaseFileSourceConfiguration,
BaseFileSourceTemplateConfiguration,
FilesSourceRuntimeContext,
RemoteDirectory,
RemoteFile,
)
from galaxy.util.config_templates import TemplateExpansion
from ._pyfilesystem2 import PyFilesystem2FilesSource
class GoogleCloudStorageFileSourceTemplateConfiguration(BaseFileSourceTemplateConfiguration):
bucket_name: Union[str, TemplateExpansion]
root_path: Union[str, TemplateExpansion, None] = None
project: Union[str, TemplateExpansion, None] = None
anonymous: Union[bool, TemplateExpansion, None] = True
service_account_json: Union[str, TemplateExpansion, None] = None
token: Union[str, TemplateExpansion, None] = None
token_uri: Union[str, TemplateExpansion, None] = None
client_id: Union[str, TemplateExpansion, None] = None
client_secret: Union[str, TemplateExpansion, None] = None
refresh_token: Union[str, TemplateExpansion, None] = None
class GoogleCloudStorageFileSourceConfiguration(BaseFileSourceConfiguration):
bucket_name: str
root_path: Optional[str] = None
project: Optional[str] = None
anonymous: Optional[bool] = True
service_account_json: Optional[str] = None
token: Optional[str] = None
token_uri: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
refresh_token: Optional[str] = None
[docs]
class GoogleCloudStorageFilesSource(
PyFilesystem2FilesSource[
GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration
]
):
plugin_type = "googlecloudstorage"
required_module = GCSFS
required_package = "fs-gcsfs"
template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration
resolved_config_class = GoogleCloudStorageFileSourceConfiguration
def _open_fs(self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration]):
if GCSFS is None:
raise self.required_package_exception
config = context.config
if config.anonymous:
client = Client.create_anonymous_client()
elif config.service_account_json:
credentials = service_account.Credentials.from_service_account_file(config.service_account_json)
client = Client(project=config.project, credentials=credentials)
elif config.token:
client = Client(
project=config.project,
credentials=Credentials(
token=config.token,
token_uri=config.token_uri,
client_id=config.client_id,
client_secret=config.client_secret,
refresh_token=config.refresh_token,
),
)
handle = GCSFS(bucket_name=config.bucket_name, root_path=config.root_path or "", retry=0, client=client)
return handle
def _list(
self,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
path="/",
recursive=False,
write_intent: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Optional[str] = None,
sort_by: Optional[str] = None,
) -> tuple[list[AnyRemoteEntry], int]:
"""
Override base class _list to work around fs_gcsfs limitation with virtual directories.
GCS doesn't require directory marker objects, but fs_gcsfs's getinfo() requires them.
This implementation uses the GCS API directly to list blobs, bypassing the problematic
getinfo() validation that fails for virtual directories.
"""
if recursive:
# For recursive listing, fall back to the base implementation
return super()._list(context, path, recursive, write_intent, limit, offset, query, sort_by)
# Open filesystem to get access to the bucket
with self._open_fs(context) as fs_handle:
# Access the bucket from the GCSFS object
bucket = fs_handle.bucket
# Convert path to GCS prefix format
# Remove leading/trailing slashes and add trailing slash for directory prefix
normalized_path = path.strip("/")
if normalized_path:
prefix = normalized_path + "/"
else:
prefix = ""
# List blobs with delimiter to get immediate children only (non-recursive)
delimiter = "/"
# Collect directories (prefixes) and files (blobs)
entries: list[AnyRemoteEntry] = []
# First iterator: Get directories from prefixes
page_iterator_dirs = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
for page in page_iterator_dirs.pages:
for dir_prefix in page.prefixes:
# Remove the parent prefix and trailing slash to get just the dir name
dir_name = dir_prefix[len(prefix) :].rstrip("/")
if dir_name:
full_path = os.path.join("/", normalized_path, dir_name) if normalized_path else f"/{dir_name}"
uri = self.uri_from_path(full_path)
entries.append(RemoteDirectory(name=dir_name, uri=uri, path=full_path))
# Second iterator: Get files from blobs
page_iterator_files = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
for blob in page_iterator_files:
# Skip directory marker objects (empty blobs ending with /)
if blob.name.endswith("/"):
continue
# Get just the filename (remove prefix)
file_name = blob.name[len(prefix) :]
if file_name:
full_path = os.path.join("/", normalized_path, file_name) if normalized_path else f"/{file_name}"
uri = self.uri_from_path(full_path)
# Convert blob metadata to RemoteFile
ctime = None
if blob.time_created:
ctime = blob.time_created.isoformat()
entries.append(
RemoteFile(name=file_name, size=blob.size or 0, ctime=ctime, uri=uri, path=full_path)
)
# Apply query filter if provided
if query:
query_lower = query.lower()
entries = [e for e in entries if query_lower in e.name.lower()]
# Get total count before pagination
total_count = len(entries)
# Apply pagination
if offset is not None or limit is not None:
start = offset or 0
end = start + limit if limit is not None else None
entries = entries[start:end]
return entries, total_count
def _realize_to(
self,
source_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
"""
Override to download files directly from GCS, bypassing fs_gcsfs's directory marker checks.
"""
with self._open_fs(context) as fs_handle:
bucket = fs_handle.bucket
# Convert path to GCS blob key
normalized_path = source_path.strip("/")
# Get the blob
blob = bucket.get_blob(normalized_path)
if not blob:
raise Exception(f"File not found: {source_path}")
# Download directly to file
with open(native_path, "wb") as write_file:
blob.download_to_file(write_file)
def _write_from(
self,
target_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
"""
Override to upload files directly to GCS, bypassing fs_gcsfs's directory marker checks.
"""
with self._open_fs(context) as fs_handle:
bucket = fs_handle.bucket
# Convert path to GCS blob key
normalized_path = target_path.strip("/")
# Create blob and upload
blob = bucket.blob(normalized_path)
with open(native_path, "rb") as read_file:
blob.upload_from_file(read_file)
__all__ = ("GoogleCloudStorageFilesSource",)