Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.files.sources.googlecloudstorage
try:
from fs_gcsfs import GCSFS
from google.cloud.storage import Client
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
except ImportError:
GCSFS = None
import os
from typing import (
Optional,
Union,
)
from galaxy.files.models import (
AnyRemoteEntry,
BaseFileSourceConfiguration,
BaseFileSourceTemplateConfiguration,
FilesSourceRuntimeContext,
RemoteDirectory,
RemoteFile,
)
from galaxy.util.config_templates import TemplateExpansion
from ._pyfilesystem2 import PyFilesystem2FilesSource
class GoogleCloudStorageFileSourceTemplateConfiguration(BaseFileSourceTemplateConfiguration):
bucket_name: Union[str, TemplateExpansion]
root_path: Union[str, TemplateExpansion, None] = None
project: Union[str, TemplateExpansion, None] = None
anonymous: Union[bool, TemplateExpansion, None] = True
service_account_json: Union[str, TemplateExpansion, None] = None
token: Union[str, TemplateExpansion, None] = None
token_uri: Union[str, TemplateExpansion, None] = None
client_id: Union[str, TemplateExpansion, None] = None
client_secret: Union[str, TemplateExpansion, None] = None
refresh_token: Union[str, TemplateExpansion, None] = None
class GoogleCloudStorageFileSourceConfiguration(BaseFileSourceConfiguration):
bucket_name: str
root_path: Optional[str] = None
project: Optional[str] = None
anonymous: Optional[bool] = True
service_account_json: Optional[str] = None
token: Optional[str] = None
token_uri: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
refresh_token: Optional[str] = None
[docs]
class GoogleCloudStorageFilesSource(
PyFilesystem2FilesSource[
GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration
]
):
plugin_type = "googlecloudstorage"
required_module = GCSFS
required_package = "fs-gcsfs"
template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration
resolved_config_class = GoogleCloudStorageFileSourceConfiguration
def _open_fs(self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration]):
if GCSFS is None:
raise self.required_package_exception
config = context.config
if config.anonymous:
client = Client.create_anonymous_client()
elif config.service_account_json:
credentials = service_account.Credentials.from_service_account_file(config.service_account_json)
client = Client(project=config.project, credentials=credentials)
elif config.token:
client = Client(
project=config.project,
credentials=Credentials(
token=config.token,
token_uri=config.token_uri,
client_id=config.client_id,
client_secret=config.client_secret,
refresh_token=config.refresh_token,
),
)
handle = GCSFS(bucket_name=config.bucket_name, root_path=config.root_path or "", retry=0, client=client)
return handle
def _list(
self,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
path="/",
recursive=False,
write_intent: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Optional[str] = None,
sort_by: Optional[str] = None,
) -> tuple[list[AnyRemoteEntry], int]:
"""
Override base class _list to work around fs_gcsfs limitation with virtual directories.
GCS doesn't require directory marker objects, but fs_gcsfs's getinfo() requires them.
This implementation uses the GCS API directly to list blobs, bypassing the problematic
getinfo() validation that fails for virtual directories.
"""
if recursive:
# For recursive listing, fall back to the base implementation
return super()._list(context, path, recursive, write_intent, limit, offset, query, sort_by)
# Open filesystem to get access to the bucket
with self._open_fs(context) as fs_handle:
# Access the bucket from the GCSFS object
bucket = fs_handle.bucket
# Convert path to GCS prefix format
# Remove leading/trailing slashes and add trailing slash for directory prefix
normalized_path = path.strip("/")
if normalized_path:
prefix = normalized_path + "/"
else:
prefix = ""
# List blobs with delimiter to get immediate children only (non-recursive)
delimiter = "/"
# Collect directories (prefixes) and files (blobs)
entries: list[AnyRemoteEntry] = []
# First iterator: Get directories from prefixes
page_iterator_dirs = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
for page in page_iterator_dirs.pages:
for dir_prefix in page.prefixes:
# Remove the parent prefix and trailing slash to get just the dir name
dir_name = dir_prefix[len(prefix) :].rstrip("/")
if dir_name:
full_path = os.path.join("/", normalized_path, dir_name) if normalized_path else f"/{dir_name}"
uri = self.uri_from_path(full_path)
entries.append(RemoteDirectory(name=dir_name, uri=uri, path=full_path))
# Second iterator: Get files from blobs
page_iterator_files = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
for blob in page_iterator_files:
# Skip directory marker objects (empty blobs ending with /)
if blob.name.endswith("/"):
continue
# Get just the filename (remove prefix)
file_name = blob.name[len(prefix) :]
if file_name:
full_path = os.path.join("/", normalized_path, file_name) if normalized_path else f"/{file_name}"
uri = self.uri_from_path(full_path)
# Convert blob metadata to RemoteFile
ctime = None
if blob.time_created:
ctime = blob.time_created.isoformat()
entries.append(
RemoteFile(name=file_name, size=blob.size or 0, ctime=ctime, uri=uri, path=full_path)
)
# Apply query filter if provided
if query:
query_lower = query.lower()
entries = [e for e in entries if query_lower in e.name.lower()]
# Get total count before pagination
total_count = len(entries)
# Apply pagination
if offset is not None or limit is not None:
start = offset or 0
end = start + limit if limit is not None else None
entries = entries[start:end]
return entries, total_count
def _realize_to(
self,
source_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
"""
Override to download files directly from GCS, bypassing fs_gcsfs's directory marker checks.
"""
with self._open_fs(context) as fs_handle:
bucket = fs_handle.bucket
# Convert path to GCS blob key
normalized_path = source_path.strip("/")
# Get the blob
blob = bucket.get_blob(normalized_path)
if not blob:
raise Exception(f"File not found: {source_path}")
# Download directly to file
with open(native_path, "wb") as write_file:
blob.download_to_file(write_file)
def _write_from(
self,
target_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
"""
Override to upload files directly to GCS, bypassing fs_gcsfs's directory marker checks.
"""
with self._open_fs(context) as fs_handle:
bucket = fs_handle.bucket
# Convert path to GCS blob key
normalized_path = target_path.strip("/")
# Create blob and upload
blob = bucket.blob(normalized_path)
with open(native_path, "rb") as read_file:
blob.upload_from_file(read_file)
__all__ = ("GoogleCloudStorageFilesSource",)