Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.objectstore.cloud

"""
Object Store plugin for Cloud storage.
"""

import logging
import os
import os.path

from ._caching_base import CachingConcreteObjectStore
from ._util import UsesAxel
from .caching import enable_cache_monitor
from .s3 import parse_config_xml

try:
    from cloudbridge.factory import (
        CloudProviderFactory,
        ProviderList,
    )
    from cloudbridge.interfaces.exceptions import InvalidNameException
except ImportError:
    CloudProviderFactory = None
    ProviderList = None

log = logging.getLogger(__name__)

NO_CLOUDBRIDGE_ERROR_MESSAGE = (
    "Cloud ObjectStore is configured, but no CloudBridge dependency available."
    "Please install CloudBridge or modify ObjectStore configuration."
)


[docs]class Cloud(CachingConcreteObjectStore, UsesAxel): """ Object store that stores objects as items in an cloud storage. A local cache exists that is used as an intermediate location for files between Galaxy and the cloud storage. """ store_type = "cloud"
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) bucket_dict = config_dict["bucket"] cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) self.provider = config_dict["provider"] self.credentials = config_dict["auth"] self.bucket_name = bucket_dict.get("name") self.use_rr = bucket_dict.get("use_reduced_redundancy", False) self.max_chunk_size = bucket_dict.get("max_chunk_size", 250) self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path self.cache_updated_data = cache_dict.get("cache_updated_data", True) self._initialize()
def _initialize(self): if CloudProviderFactory is None: raise Exception(NO_CLOUDBRIDGE_ERROR_MESSAGE) self.conn = self._get_connection(self.provider, self.credentials) self.bucket = self._get_bucket(self.bucket_name) self._ensure_staging_path_writable() self._start_cache_monitor_if_needed() self._init_axel() @staticmethod def _get_connection(provider, credentials): log.debug(f"Configuring `{provider}` Connection") if provider == "aws": config = {"aws_access_key": credentials["access_key"], "aws_secret_key": credentials["secret_key"]} if "region" in credentials: config["aws_region_name"] = credentials["region"] connection = CloudProviderFactory().create_provider(ProviderList.AWS, config) elif provider == "azure": config = { "azure_subscription_id": credentials["subscription_id"], "azure_client_id": credentials["client_id"], "azure_secret": credentials["secret"], "azure_tenant": credentials["tenant"], } connection = CloudProviderFactory().create_provider(ProviderList.AZURE, config) elif provider == "google": config = {"gcp_service_creds_file": credentials["credentials_file"]} connection = CloudProviderFactory().create_provider(ProviderList.GCP, config) else: raise Exception(f"Unsupported provider `{provider}`.") # Ideally it would be better to assert if the connection is # authorized to perform operations required by ObjectStore # before returning it (and initializing ObjectStore); hence # any related issues can be handled properly here, and ObjectStore # can "trust" the connection is established. # # However, the mechanism implemented in Cloudbridge to assert if # a user/service is authorized to perform an operation, assumes # the user/service is granted with an elevated privileges, such # as admin/owner-level access to all resources. For a detailed # discussion see: # # https://github.com/CloudVE/cloudbridge/issues/135 # # Hence, if a resource owner wants to only authorize Galaxy to r/w # a bucket/container on the provider, but does not allow it to access # other resources, Cloudbridge may fail asserting credentials. # For instance, to r/w an Amazon S3 bucket, the resource owner # also needs to authorize full access to Amazon EC2, because Cloudbridge # leverages EC2-specific functions to assert the credentials. # # Therefore, to adhere with principle of least privilege, we do not # assert credentials; instead, we handle exceptions raised as a # result of signing API calls to cloud provider (e.g., GCP) using # incorrect, invalid, or unauthorized credentials. return connection
[docs] @classmethod def parse_xml(clazz, config_xml): # The following reads common cloud-based storage configuration # as implemented for the S3 backend. Hence, it also attempts to # parse S3-specific configuration (e.g., credentials); however, # such provider-specific configuration is overwritten in the # following. config = parse_config_xml(config_xml) try: provider = config_xml.attrib.get("provider") if provider is None: msg = "Missing `provider` attribute from the Cloud backend of the ObjectStore." log.error(msg) raise Exception(msg) provider = provider.lower() config["provider"] = provider # Read any provider-specific configuration. auth_element = config_xml.findall("auth")[0] missing_config = [] if provider == "aws": akey = auth_element.get("access_key") skey = auth_element.get("secret_key") config["auth"] = {"access_key": akey, "secret_key": skey} if "region" in auth_element: config["auth"]["region"] = auth_element["region"] elif provider == "azure": sid = auth_element.get("subscription_id") if sid is None: missing_config.append("subscription_id") cid = auth_element.get("client_id") if cid is None: missing_config.append("client_id") sec = auth_element.get("secret") if sec is None: missing_config.append("secret") ten = auth_element.get("tenant") if ten is None: missing_config.append("tenant") config["auth"] = {"subscription_id": sid, "client_id": cid, "secret": sec, "tenant": ten} elif provider == "google": cre = auth_element.get("credentials_file") if not os.path.isfile(cre): msg = f"The following file specified for GCP credentials not found: {cre}" log.error(msg) raise OSError(msg) if cre is None: missing_config.append("credentials_file") config["auth"] = {"credentials_file": cre} else: msg = f"Unsupported provider `{provider}`." log.error(msg) raise Exception(msg) if len(missing_config) > 0: msg = ( f"The following configuration required for {provider} cloud backend " f"are missing: {missing_config}" ) log.error(msg) raise Exception(msg) else: return config except Exception: log.exception("Malformed ObjectStore Configuration XML -- unable to continue") raise
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update(self._config_to_dict()) return as_dict
def _config_to_dict(self): return { "provider": self.provider, "auth": self.credentials, "bucket": { "name": self.bucket_name, "use_reduced_redundancy": self.use_rr, }, "cache": { "size": self.cache_size, "path": self.staging_path, "cache_updated_data": self.cache_updated_data, }, } def _get_bucket(self, bucket_name): try: bucket = self.conn.storage.buckets.get(bucket_name) if bucket is None: log.debug("Bucket not found, creating a bucket with handle '%s'", bucket_name) bucket = self.conn.storage.buckets.create(bucket_name) log.debug("Using cloud ObjectStore with bucket '%s'", bucket.name) return bucket except InvalidNameException: log.exception("Invalid bucket name -- unable to continue") raise except Exception: # These two generic exceptions will be replaced by specific exceptions # once proper exceptions are exposed by CloudBridge. log.exception(f"Could not get bucket '{bucket_name}'") raise Exception def _get_remote_size(self, rel_path): try: obj = self.bucket.objects.get(rel_path) return obj.size except Exception: log.exception("Could not get size of key '%s' from S3", rel_path) return -1 def _exists_remotely(self, rel_path): exists = False try: # A hackish way of testing if the rel_path is a folder vs a file is_dir = rel_path[-1] == "/" if is_dir: keyresult = self.bucket.objects.list(prefix=rel_path) if len(keyresult) > 0: exists = True else: exists = False else: exists = True if self.bucket.objects.get(rel_path) is not None else False except Exception: log.exception("Trouble checking existence of S3 key '%s'", rel_path) return False return exists def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: log.debug("Pulling key '%s' into cache to %s", rel_path, local_destination) key = self.bucket.objects.get(rel_path) remote_size = key.size if not self._caching_allowed(rel_path, remote_size): return False log.debug("Pulled key '%s' into cache to %s", rel_path, local_destination) self._download_to(key, local_destination) return True except Exception: log.exception("Problem downloading key '%s' from S3 bucket '%s'", rel_path, self.bucket.name) return False def _download_directory_into_cache(self, rel_path, cache_path): # List objects in the specified cloud folder objects = self.bucket.objects.list(prefix=rel_path) for obj in objects: remote_file_path = obj.name local_file_path = os.path.join(cache_path, os.path.relpath(remote_file_path, rel_path)) # Create directories if they don't exist os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # Download the file self._download_to(obj, local_file_path) def _download_to(self, key, local_destination): if self.use_axel: url = key.generate_url(7200) return self._axel_download(url, local_destination) else: with open(local_destination, "wb+") as downloaded_file_handle: key.save_content(downloaded_file_handle) def _push_string_to_path(self, rel_path: str, from_string: str) -> bool: try: if not self.bucket.objects.get(rel_path): created_obj = self.bucket.objects.create(rel_path) created_obj.upload(from_string) else: self.bucket.objects.get(rel_path).upload(from_string) return True except Exception: log.exception("Trouble pushing to cloud '%s' from string", rel_path) return False def _push_file_to_path(self, rel_path: str, source_file: str) -> bool: try: if not self.bucket.objects.get(rel_path): created_obj = self.bucket.objects.create(rel_path) created_obj.upload_from_file(source_file) else: self.bucket.objects.get(rel_path).upload_from_file(source_file) return True except Exception: log.exception("Trouble pushing to cloud '%s' from file '%s'", rel_path, source_file) return False def _delete_remote_all(self, rel_path: str) -> bool: try: results = self.bucket.objects.list(prefix=rel_path) for key in results: log.debug("Deleting key %s", key.name) key.delete() return True except Exception: log.exception("Could not delete key '%s' from cloud", rel_path) return False def _delete_existing_remote(self, rel_path: str) -> bool: try: key = self.bucket.objects.get(rel_path) log.debug("Deleting key %s", key.name) key.delete() return True except Exception: log.exception("Could not delete key '%s' from cloud", rel_path) return False def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: key = self.bucket.objects.get(rel_path) return key.generate_url(expires_in=86400) # 24hrs except Exception: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None def _get_store_usage_percent(self, obj): return 0.0
[docs] def shutdown(self): self._shutdown_cache_monitor()