Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.objectstore.azure_blob

"""
Object Store plugin for the Microsoft Azure Block Blob Storage system
"""

import logging
import os
from datetime import (
    datetime,
    timedelta,
)

try:
    from azure.common import AzureHttpError
    from azure.storage.blob import (
        BlobSasPermissions,
        BlobServiceClient,
        generate_blob_sas,
    )
except ImportError:
    BlobServiceClient = None  # type: ignore[assignment,unused-ignore,misc]

from ._caching_base import CachingConcreteObjectStore
from .caching import (
    enable_cache_monitor,
    parse_caching_config_dict_from_xml,
)

NO_BLOBSERVICE_ERROR_MESSAGE = (
    "ObjectStore configured, but no azure.storage.blob dependency available."
    "Please install and properly configure azure.storage.blob or modify Object Store configuration."
)

log = logging.getLogger(__name__)


[docs]def parse_config_xml(config_xml): try: auth_xml = config_xml.findall("auth")[0] account_name = auth_xml.get("account_name") account_key = auth_xml.get("account_key") account_url = auth_xml.get("account_url") container_xml = config_xml.find("container") container_name = container_xml.get("name") transfer_xml = config_xml.findall("transfer") if not transfer_xml: transfer_xml = {} else: transfer_xml = transfer_xml[0] transfer_dict = {} for key in [ "max_concurrency", "download_max_concurrency", "upload_max_concurrency", "max_single_put_size", "max_single_get_size", "max_block_size", ]: value = transfer_xml.get(key) if transfer_xml.get(key) is not None: transfer_dict[key] = value cache_dict = parse_caching_config_dict_from_xml(config_xml) tag, attrs = "extra_dir", ("type", "path") extra_dirs = config_xml.findall(tag) if not extra_dirs: msg = f"No {tag} element in XML tree" log.error(msg) raise Exception(msg) extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] auth = { "account_name": account_name, "account_key": account_key, } if account_url: auth["account_url"] = account_url config_dict = { "auth": auth, "container": { "name": container_name, }, "cache": cache_dict, "transfer": transfer_dict, "extra_dirs": extra_dirs, "private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml), } name = config_xml.attrib.get("name", None) if name is not None: config_dict["name"] = name device = config_xml.attrib.get("device", None) config_dict["device"] = device return config_dict except Exception: # Toss it back up after logging, we can't continue loading at this point. log.exception("Malformed ObjectStore Configuration XML -- unable to continue") raise
[docs]class AzureBlobObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as blobs in an Azure Blob Container. A local cache exists that is used as an intermediate location for files between Galaxy and Azure. """ store_type = "azure_blob" cloud = True
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) auth_dict = config_dict["auth"] container_dict = config_dict["container"] cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) self.account_name = auth_dict.get("account_name") self.account_url = auth_dict.get("account_url") self.account_key = auth_dict.get("account_key") self.container_name = container_dict.get("name") raw_transfer_dict = config_dict.get("transfer", {}) typed_transfer_dict = {} for key in [ "max_concurrency", "download_max_concurrency", "upload_max_concurrency", "max_single_put_size", "max_single_get_size", "max_block_size", ]: value = raw_transfer_dict.get(key) if value is not None: typed_transfer_dict[key] = int(value) self.transfer_dict = typed_transfer_dict self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path self.cache_updated_data = cache_dict.get("cache_updated_data", True) self._initialize()
def _initialize(self): if BlobServiceClient is None: raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE) self._configure_connection() self._ensure_staging_path_writable() self._start_cache_monitor_if_needed()
[docs] def to_dict(self): as_dict = super().to_dict() auth = { "account_name": self.account_name, "account_key": self.account_key, } if self.account_url: auth["account_url"] = self.account_url as_dict.update( { "auth": auth, "container": { "name": self.container_name, }, "transfer": self.transfer_dict, "cache": { "size": self.cache_size, "path": self.staging_path, "cache_updated_data": self.cache_updated_data, }, } ) return as_dict
# config_xml is an ElementTree object.
[docs] @classmethod def parse_xml(clazz, config_xml): return parse_config_xml(config_xml)
def _configure_connection(self): log.debug("Configuring Connection") extra_kwds = {} for key in [ "max_single_put_size", "max_single_get_size", "max_block_size", ]: if key in self.transfer_dict: extra_kwds[key] = self.transfer_dict[key] if self.account_url: # https://pypi.org/project/azure-storage-blob/ service = BlobServiceClient( account_url=self.account_url, credential={"account_name": self.account_name, "account_key": self.account_key}, **extra_kwds, ) else: service = BlobServiceClient( account_url=f"https://{self.account_name}.blob.core.windows.net", credential=self.account_key, **extra_kwds, ) self.service = service def _get_remote_size(self, rel_path): try: properties = self._blob_client(rel_path).get_blob_properties() size_in_bytes = properties.size return size_in_bytes except AzureHttpError: log.exception("Could not get size of blob '%s' from Azure", rel_path) return -1 def _blobs_from(self, rel_path): return self.service.get_container_client(self.container_name).list_blobs(name_starts_with=rel_path) def _exists_remotely(self, rel_path: str): try: is_dir = rel_path[-1] == "/" if is_dir: blobs = self._blobs_from(rel_path) if blobs: return True else: return False else: exists = self._blob_client(rel_path).exists() except AzureHttpError: log.exception("Trouble checking existence of Azure blob '%s'", rel_path) return False return exists def _blob_client(self, rel_path: str): return self.service.get_blob_client(self.container_name, rel_path) def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: log.debug("Pulling '%s' into cache to %s", rel_path, local_destination) if not self._caching_allowed(rel_path): return False else: self._download_to_file(rel_path, local_destination) return True except AzureHttpError: log.exception("Problem downloading '%s' from Azure", rel_path) return False def _download_to_file(self, rel_path, local_destination): kwd = {} max_concurrency = self.transfer_dict.get("download_max_concurrency") or self.transfer_dict.get( "max_concurrency" ) if max_concurrency is not None: kwd["max_concurrency"] = max_concurrency with open(local_destination, "wb") as f: self._blob_client(rel_path).download_blob().download_to_stream(f, **kwd) def _download_directory_into_cache(self, rel_path, cache_path): blobs = self._blobs_from(rel_path) for blob in blobs: key = blob.name local_file_path = os.path.join(cache_path, os.path.relpath(key, rel_path)) # Create directories if they don't exist os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # Download the file self._download_to_file(key, local_file_path) def _push_string_to_path(self, rel_path: str, from_string: str) -> bool: try: self._blob_client(rel_path).upload_blob(from_string, overwrite=True) return True except AzureHttpError: log.exception("Trouble pushing to Azure Blob '%s' from string", rel_path) return False def _push_file_to_path(self, rel_path: str, source_file: str) -> bool: try: with open(source_file, "rb") as f: kwd = {} max_concurrency = self.transfer_dict.get("upload_max_concurrency") or self.transfer_dict.get( "max_concurrency" ) if max_concurrency is not None: kwd["max_concurrency"] = max_concurrency self._blob_client(rel_path).upload_blob(f, overwrite=True, **kwd) return True except AzureHttpError: log.exception("Trouble pushing to Azure Blob '%s' from file '%s'", rel_path, source_file) return False def _delete_remote_all(self, rel_path: str) -> bool: try: blobs = self._blobs_from(rel_path) for blob in blobs: log.debug("Deleting from Azure: %s", blob) self._blob_client(blob.name).delete_blob() return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) return False def _delete_existing_remote(self, rel_path: str) -> bool: try: self._blob_client(rel_path).delete_blob() return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) return False def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: url = self._blob_client(rel_path).url # https://learn.microsoft.com/en-us/azure/storage/blobs/sas-service-create-python token = generate_blob_sas( account_name=self.account_name, account_key=self.account_key, container_name=self.container_name, blob_name=rel_path, permission=BlobSasPermissions(read=True), expiry=datetime.utcnow() + timedelta(hours=1), ) return f"{url}?{token}" except AzureHttpError: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None def _get_store_usage_percent(self, obj): # Percent used for Azure blob containers is effectively zero realistically. # https://learn.microsoft.com/en-us/azure/storage/blobs/scalability-targets return 0.0
[docs] def shutdown(self): self._shutdown_cache_monitor()