Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.objectstore.azure_blob

"""
Object Store plugin for the Microsoft Azure Block Blob Storage system
"""

import logging
import os
from datetime import (
    datetime,
    timedelta,
)

try:
    from azure.common import AzureHttpError
    from azure.storage.blob import (
        BlobSasPermissions,
        BlobServiceClient,
        generate_blob_sas,
    )
except ImportError:
    BlobServiceClient = None  # type: ignore[assignment,unused-ignore,misc]

from ._caching_base import CachingConcreteObjectStore
from .caching import (
    enable_cache_monitor,
    parse_caching_config_dict_from_xml,
)

NO_BLOBSERVICE_ERROR_MESSAGE = (
    "ObjectStore configured, but no azure.storage.blob dependency available."
    "Please install and properly configure azure.storage.blob or modify Object Store configuration."
)

log = logging.getLogger(__name__)


[docs]def parse_config_xml(config_xml): try: auth_xml = config_xml.findall("auth")[0] account_name = auth_xml.get("account_name") account_key = auth_xml.get("account_key") account_url = auth_xml.get("account_url") container_xml = config_xml.find("container") container_name = container_xml.get("name") transfer_xml = config_xml.findall("transfer") if not transfer_xml: transfer_xml = {} else: transfer_xml = transfer_xml[0] transfer_dict = {} for key in [ "max_concurrency", "download_max_concurrency", "upload_max_concurrency", "max_single_put_size", "max_single_get_size", "max_block_size", ]: value = transfer_xml.get(key) if transfer_xml.get(key) is not None: transfer_dict[key] = value cache_dict = parse_caching_config_dict_from_xml(config_xml) tag, attrs = "extra_dir", ("type", "path") extra_dirs = config_xml.findall(tag) if not extra_dirs: msg = f"No {tag} element in XML tree" log.error(msg) raise Exception(msg) extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] auth = { "account_name": account_name, "account_key": account_key, } if account_url: auth["account_url"] = account_url config_dict = { "auth": auth, "container": { "name": container_name, }, "cache": cache_dict, "transfer": transfer_dict, "extra_dirs": extra_dirs, "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } name = config_xml.attrib.get("name", None) if name is not None: config_dict["name"] = name device = config_xml.attrib.get("device", None) config_dict["device"] = device return config_dict except Exception: # Toss it back up after logging, we can't continue loading at this point. log.exception("Malformed ObjectStore Configuration XML -- unable to continue") raise
[docs]class AzureBlobObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as blobs in an Azure Blob Container. A local cache exists that is used as an intermediate location for files between Galaxy and Azure. """ store_type = "azure_blob" cloud = True
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) auth_dict = config_dict["auth"] container_dict = config_dict["container"] cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) self.account_name = auth_dict.get("account_name") self.account_url = auth_dict.get("account_url") self.account_key = auth_dict.get("account_key") self.container_name = container_dict.get("name") raw_transfer_dict = config_dict.get("transfer", {}) typed_transfer_dict = {} for key in [ "max_concurrency", "download_max_concurrency", "upload_max_concurrency", "max_single_put_size", "max_single_get_size", "max_block_size", ]: value = raw_transfer_dict.get(key) if value is not None: typed_transfer_dict[key] = int(value) self.transfer_dict = typed_transfer_dict self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path self.cache_updated_data = cache_dict.get("cache_updated_data", True) self._initialize()
def _initialize(self): if BlobServiceClient is None: raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE) self._configure_connection() self._ensure_staging_path_writable() self._start_cache_monitor_if_needed()
[docs] def to_dict(self): as_dict = super().to_dict() auth = { "account_name": self.account_name, "account_key": self.account_key, } if self.account_url: auth["account_url"] = self.account_url as_dict.update( { "auth": auth, "container": { "name": self.container_name, }, "transfer": self.transfer_dict, "cache": { "size": self.cache_size, "path": self.staging_path, "cache_updated_data": self.cache_updated_data, }, } ) return as_dict
# config_xml is an ElementTree object.
[docs] @classmethod def parse_xml(clazz, config_xml): return parse_config_xml(config_xml)
def _configure_connection(self): log.debug("Configuring Connection") extra_kwds = {} for key in [ "max_single_put_size", "max_single_get_size", "max_block_size", ]: if key in self.transfer_dict: extra_kwds[key] = self.transfer_dict[key] if self.account_url: # https://pypi.org/project/azure-storage-blob/ service = BlobServiceClient( account_url=self.account_url, credential={"account_name": self.account_name, "account_key": self.account_key}, **extra_kwds, ) else: service = BlobServiceClient( account_url=f"https://{self.account_name}.blob.core.windows.net", credential=self.account_key, **extra_kwds, ) self.service = service def _get_remote_size(self, rel_path): try: properties = self._blob_client(rel_path).get_blob_properties() size_in_bytes = properties.size return size_in_bytes except AzureHttpError: log.exception("Could not get size of blob '%s' from Azure", rel_path) return -1 def _blobs_from(self, rel_path): return self.service.get_container_client(self.container_name).list_blobs(name_starts_with=rel_path) def _exists_remotely(self, rel_path: str): try: is_dir = rel_path[-1] == "/" if is_dir: blobs = self._blobs_from(rel_path) if blobs: return True else: return False else: exists = self._blob_client(rel_path).exists() except AzureHttpError: log.exception("Trouble checking existence of Azure blob '%s'", rel_path) return False return exists def _blob_client(self, rel_path: str): return self.service.get_blob_client(self.container_name, rel_path) def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: log.debug("Pulling '%s' into cache to %s", rel_path, local_destination) if not self._caching_allowed(rel_path): return False else: self._download_to_file(rel_path, local_destination) return True except AzureHttpError: log.exception("Problem downloading '%s' from Azure", rel_path) return False def _download_to_file(self, rel_path, local_destination): kwd = {} max_concurrency = self.transfer_dict.get("download_max_concurrency") or self.transfer_dict.get( "max_concurrency" ) if max_concurrency is not None: kwd["max_concurrency"] = max_concurrency with open(local_destination, "wb") as f: self._blob_client(rel_path).download_blob().download_to_stream(f, **kwd) def _download_directory_into_cache(self, rel_path, cache_path): blobs = self._blobs_from(rel_path) for blob in blobs: key = blob.name local_file_path = os.path.join(cache_path, os.path.relpath(key, rel_path)) # Create directories if they don't exist os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # Download the file self._download_to_file(key, local_file_path) def _push_string_to_path(self, rel_path: str, from_string: str) -> bool: try: self._blob_client(rel_path).upload_blob(from_string, overwrite=True) return True except AzureHttpError: log.exception("Trouble pushing to Azure Blob '%s' from string", rel_path) return False def _push_file_to_path(self, rel_path: str, source_file: str) -> bool: try: with open(source_file, "rb") as f: kwd = {} max_concurrency = self.transfer_dict.get("upload_max_concurrency") or self.transfer_dict.get( "max_concurrency" ) if max_concurrency is not None: kwd["max_concurrency"] = max_concurrency self._blob_client(rel_path).upload_blob(f, overwrite=True, **kwd) return True except AzureHttpError: log.exception("Trouble pushing to Azure Blob '%s' from file '%s'", rel_path, source_file) return False def _delete_remote_all(self, rel_path: str) -> bool: try: blobs = self._blobs_from(rel_path) for blob in blobs: log.debug("Deleting from Azure: %s", blob) self._blob_client(blob.name).delete_blob() return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) return False def _delete_existing_remote(self, rel_path: str) -> bool: try: self._blob_client(rel_path).delete_blob() return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) return False def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: url = self._blob_client(rel_path).url # https://learn.microsoft.com/en-us/azure/storage/blobs/sas-service-create-python token = generate_blob_sas( account_name=self.account_name, account_key=self.account_key, container_name=self.container_name, blob_name=rel_path, permission=BlobSasPermissions(read=True), expiry=datetime.utcnow() + timedelta(hours=1), ) return f"{url}?{token}" except AzureHttpError: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None def _get_store_usage_percent(self, obj): # Percent used for Azure blob containers is effectively zero realistically. # https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets return 0.0
[docs] def shutdown(self): self._shutdown_cache_monitor()