Source code for galaxy.objectstore.cloud

"""
Object Store plugin for Cloud storage.
"""

import logging
import os
import os.path

from ._caching_base import CachingConcreteObjectStore
from ._util import UsesAxel
from .caching import enable_cache_monitor
from .s3 import parse_config_xml

try:
    from cloudbridge.factory import (
        CloudProviderFactory,
        ProviderList,
    )
    from cloudbridge.interfaces.exceptions import InvalidNameException
except ImportError:
    CloudProviderFactory = None
    ProviderList = None

log = logging.getLogger(__name__)

NO_CLOUDBRIDGE_ERROR_MESSAGE = (
    "Cloud ObjectStore is configured, but no CloudBridge dependency available."
    "Please install CloudBridge or modify ObjectStore configuration."
)


[docs]class Cloud(CachingConcreteObjectStore, UsesAxel): """ Object store that stores objects as items in an cloud storage. A local cache exists that is used as an intermediate location for files between Galaxy and the cloud storage. """ store_type = "cloud"
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) bucket_dict = config_dict["bucket"] cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) self.provider = config_dict["provider"] self.credentials = config_dict["auth"] self.bucket_name = bucket_dict.get("name") self.use_rr = bucket_dict.get("use_reduced_redundancy", False) self.max_chunk_size = bucket_dict.get("max_chunk_size", 250) self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path self.cache_updated_data = cache_dict.get("cache_updated_data", True) self._initialize()
def _initialize(self): if CloudProviderFactory is None: raise Exception(NO_CLOUDBRIDGE_ERROR_MESSAGE) self.conn = self._get_connection(self.provider, self.credentials) self.bucket = self._get_bucket(self.bucket_name) self._ensure_staging_path_writable() self._start_cache_monitor_if_needed() self._init_axel() @staticmethod def _get_connection(provider, credentials): log.debug(f"Configuring `{provider}` Connection") if provider == "aws": config = {"aws_access_key": credentials["access_key"], "aws_secret_key": credentials["secret_key"]} if "region" in credentials: config["aws_region_name"] = credentials["region"] connection = CloudProviderFactory().create_provider(ProviderList.AWS, config) elif provider == "azure": config = { "azure_subscription_id": credentials["subscription_id"], "azure_client_id": credentials["client_id"], "azure_secret": credentials["secret"], "azure_tenant": credentials["tenant"], } connection = CloudProviderFactory().create_provider(ProviderList.AZURE, config) elif provider == "google": config = {"gcp_service_creds_file": credentials["credentials_file"]} connection = CloudProviderFactory().create_provider(ProviderList.GCP, config) else: raise Exception(f"Unsupported provider `{provider}`.") # Ideally it would be better to assert if the connection is # authorized to perform operations required by ObjectStore # before returning it (and initializing ObjectStore); hence # any related issues can be handled properly here, and ObjectStore # can "trust" the connection is established. # # However, the mechanism implemented in Cloudbridge to assert if # a user/service is authorized to perform an operation, assumes # the user/service is granted with an elevated privileges, such # as admin/owner-level access to all resources. For a detailed # discussion see: # # https://github.com/CloudVE/cloudbridge/issues/135 # # Hence, if a resource owner wants to only authorize Galaxy to r/w # a bucket/container on the provider, but does not allow it to access # other resources, Cloudbridge may fail asserting credentials. # For instance, to r/w an Amazon S3 bucket, the resource owner # also needs to authorize full access to Amazon EC2, because Cloudbridge # leverages EC2-specific functions to assert the credentials. # # Therefore, to adhere with principle of least privilege, we do not # assert credentials; instead, we handle exceptions raised as a # result of signing API calls to cloud provider (e.g., GCP) using # incorrect, invalid, or unauthorized credentials. return connection
[docs] @classmethod def parse_xml(clazz, config_xml): # The following reads common cloud-based storage configuration # as implemented for the S3 backend. Hence, it also attempts to # parse S3-specific configuration (e.g., credentials); however, # such provider-specific configuration is overwritten in the # following. config = parse_config_xml(config_xml) try: provider = config_xml.attrib.get("provider") if provider is None: msg = "Missing `provider` attribute from the Cloud backend of the ObjectStore." log.error(msg) raise Exception(msg) provider = provider.lower() config["provider"] = provider # Read any provider-specific configuration. auth_element = config_xml.findall("auth")[0] missing_config = [] if provider == "aws": akey = auth_element.get("access_key") skey = auth_element.get("secret_key") config["auth"] = {"access_key": akey, "secret_key": skey} if "region" in auth_element: config["auth"]["region"] = auth_element["region"] elif provider == "azure": sid = auth_element.get("subscription_id") if sid is None: missing_config.append("subscription_id") cid = auth_element.get("client_id") if cid is None: missing_config.append("client_id") sec = auth_element.get("secret") if sec is None: missing_config.append("secret") ten = auth_element.get("tenant") if ten is None: missing_config.append("tenant") config["auth"] = {"subscription_id": sid, "client_id": cid, "secret": sec, "tenant": ten} elif provider == "google": cre = auth_element.get("credentials_file") if not os.path.isfile(cre): msg = f"The following file specified for GCP credentials not found: {cre}" log.error(msg) raise OSError(msg) if cre is None: missing_config.append("credentials_file") config["auth"] = {"credentials_file": cre} else: msg = f"Unsupported provider `{provider}`." log.error(msg) raise Exception(msg) if len(missing_config) > 0: msg = ( f"The following configuration required for {provider} cloud backend " f"are missing: {missing_config}" ) log.error(msg) raise Exception(msg) else: return config except Exception: log.exception("Malformed ObjectStore Configuration XML -- unable to continue") raise
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update(self._config_to_dict()) return as_dict
def _config_to_dict(self): return { "provider": self.provider, "auth": self.credentials, "bucket": { "name": self.bucket_name, "use_reduced_redundancy": self.use_rr, }, "cache": { "size": self.cache_size, "path": self.staging_path, "cache_updated_data": self.cache_updated_data, }, } def _get_bucket(self, bucket_name): try: bucket = self.conn.storage.buckets.get(bucket_name) if bucket is None: log.debug("Bucket not found, creating a bucket with handle '%s'", bucket_name) bucket = self.conn.storage.buckets.create(bucket_name) log.debug("Using cloud ObjectStore with bucket '%s'", bucket.name) return bucket except InvalidNameException: log.exception("Invalid bucket name -- unable to continue") raise except Exception: # These two generic exceptions will be replaced by specific exceptions # once proper exceptions are exposed by CloudBridge. log.exception(f"Could not get bucket '{bucket_name}'") raise Exception def _get_remote_size(self, rel_path): try: obj = self.bucket.objects.get(rel_path) return obj.size except Exception: log.exception("Could not get size of key '%s' from S3", rel_path) return -1 def _exists_remotely(self, rel_path): exists = False try: # A hackish way of testing if the rel_path is a folder vs a file is_dir = rel_path[-1] == "/" if is_dir: keyresult = self.bucket.objects.list(prefix=rel_path) if len(keyresult) > 0: exists = True else: exists = False else: exists = True if self.bucket.objects.get(rel_path) is not None else False except Exception: log.exception("Trouble checking existence of S3 key '%s'", rel_path) return False return exists def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: log.debug("Pulling key '%s' into cache to %s", rel_path, local_destination) key = self.bucket.objects.get(rel_path) remote_size = key.size if not self._caching_allowed(rel_path, remote_size): return False log.debug("Pulled key '%s' into cache to %s", rel_path, local_destination) self._download_to(key, local_destination) return True except Exception: log.exception("Problem downloading key '%s' from S3 bucket '%s'", rel_path, self.bucket.name) return False def _download_directory_into_cache(self, rel_path, cache_path): # List objects in the specified cloud folder objects = self.bucket.objects.list(prefix=rel_path) for obj in objects: remote_file_path = obj.name local_file_path = os.path.join(cache_path, os.path.relpath(remote_file_path, rel_path)) # Create directories if they don't exist os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # Download the file self._download_to(obj, local_file_path) def _download_to(self, key, local_destination): if self.use_axel: url = key.generate_url(7200) return self._axel_download(url, local_destination) else: with open(local_destination, "wb+") as downloaded_file_handle: key.save_content(downloaded_file_handle) def _push_string_to_path(self, rel_path: str, from_string: str) -> bool: try: if not self.bucket.objects.get(rel_path): created_obj = self.bucket.objects.create(rel_path) created_obj.upload(from_string) else: self.bucket.objects.get(rel_path).upload(from_string) return True except Exception: log.exception("Trouble pushing to cloud '%s' from string", rel_path) return False def _push_file_to_path(self, rel_path: str, source_file: str) -> bool: try: if not self.bucket.objects.get(rel_path): created_obj = self.bucket.objects.create(rel_path) created_obj.upload_from_file(source_file) else: self.bucket.objects.get(rel_path).upload_from_file(source_file) return True except Exception: log.exception("Trouble pushing to cloud '%s' from file '%s'", rel_path, source_file) return False def _delete_remote_all(self, rel_path: str) -> bool: try: results = self.bucket.objects.list(prefix=rel_path) for key in results: log.debug("Deleting key %s", key.name) key.delete() return True except Exception: log.exception("Could not delete key '%s' from cloud", rel_path) return False def _delete_existing_remote(self, rel_path: str) -> bool: try: key = self.bucket.objects.get(rel_path) log.debug("Deleting key %s", key.name) key.delete() return True except Exception: log.exception("Could not delete key '%s' from cloud", rel_path) return False def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: key = self.bucket.objects.get(rel_path) return key.generate_url(expires_in=86400) # 24hrs except Exception: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None def _get_store_usage_percent(self, obj): return 0.0
[docs] def shutdown(self): self._shutdown_cache_monitor()