Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.files.sources.googlecloudstorage

try:
    from fs_gcsfs import GCSFS
    from google.cloud.storage import Client
    from google.oauth2.credentials import Credentials
except ImportError:
    GCSFS = None

from typing import (
    Optional,
    Union,
)

from galaxy.files.models import (
    BaseFileSourceConfiguration,
    BaseFileSourceTemplateConfiguration,
    FilesSourceRuntimeContext,
)
from galaxy.util.config_templates import TemplateExpansion
from ._pyfilesystem2 import PyFilesystem2FilesSource


class GoogleCloudStorageFileSourceTemplateConfiguration(BaseFileSourceTemplateConfiguration):
    bucket_name: Union[str, TemplateExpansion]
    root_path: Union[str, TemplateExpansion, None] = None
    project: Union[str, TemplateExpansion, None] = None
    anonymous: Union[bool, TemplateExpansion, None] = True
    token: Union[str, TemplateExpansion, None] = None
    token_uri: Union[str, TemplateExpansion, None] = None
    client_id: Union[str, TemplateExpansion, None] = None
    client_secret: Union[str, TemplateExpansion, None] = None
    refresh_token: Union[str, TemplateExpansion, None] = None


class GoogleCloudStorageFileSourceConfiguration(BaseFileSourceConfiguration):
    bucket_name: str
    root_path: Optional[str] = None
    project: Optional[str] = None
    anonymous: Optional[bool] = True
    token: Optional[str] = None
    token_uri: Optional[str] = None
    client_id: Optional[str] = None
    client_secret: Optional[str] = None
    refresh_token: Optional[str] = None


[docs] class GoogleCloudStorageFilesSource( PyFilesystem2FilesSource[ GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration ] ): plugin_type = "googlecloudstorage" required_module = GCSFS required_package = "fs-gcsfs" template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration resolved_config_class = GoogleCloudStorageFileSourceConfiguration def _open_fs(self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration]): if GCSFS is None: raise self.required_package_exception config = context.config if config.anonymous: client = Client.create_anonymous_client() elif config.token: client = Client( project=config.project, credentials=Credentials( token=config.token, token_uri=config.token_uri, client_id=config.client_id, client_secret=config.client_secret, refresh_token=config.refresh_token, ), ) handle = GCSFS(bucket_name=config.bucket_name, root_path=config.root_path or "", retry=0, client=client) return handle
__all__ = ("GoogleCloudStorageFilesSource",)