Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.objectstore.azure_blob

"""
Object Store plugin for the Microsoft Azure Block Blob Storage system
"""

import logging
import os
import shutil
from datetime import datetime
from typing import Optional

try:
    from azure.common import AzureHttpError
    from azure.storage import CloudStorageAccount
    from azure.storage.blob import BlockBlobService
    from azure.storage.blob.models import Blob
except ImportError:
    BlockBlobService = None

from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound,
)
from galaxy.util import (
    directory_hash_id,
    umask_fix_perms,
    unlink,
)
from galaxy.util.path import safe_relpath
from . import ConcreteObjectStore
from .caching import (
    CacheTarget,
    enable_cache_monitor,
    InProcessCacheMonitor,
    parse_caching_config_dict_from_xml,
)

NO_BLOBSERVICE_ERROR_MESSAGE = (
    "ObjectStore configured, but no azure.storage.blob dependency available."
    "Please install and properly configure azure.storage.blob or modify Object Store configuration."
)

log = logging.getLogger(__name__)


[docs]def parse_config_xml(config_xml): try: auth_xml = config_xml.findall("auth")[0] account_name = auth_xml.get("account_name") account_key = auth_xml.get("account_key") container_xml = config_xml.find("container") container_name = container_xml.get("name") max_chunk_size = int(container_xml.get("max_chunk_size", 250)) # currently unused cache_dict = parse_caching_config_dict_from_xml(config_xml) tag, attrs = "extra_dir", ("type", "path") extra_dirs = config_xml.findall(tag) if not extra_dirs: msg = f"No {tag} element in XML tree" log.error(msg) raise Exception(msg) extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] return { "auth": { "account_name": account_name, "account_key": account_key, }, "container": { "name": container_name, "max_chunk_size": max_chunk_size, }, "cache": cache_dict, "extra_dirs": extra_dirs, "private": ConcreteObjectStore.parse_private_from_config_xml(config_xml), } except Exception: # Toss it back up after logging, we can't continue loading at this point. log.exception("Malformed ObjectStore Configuration XML -- unable to continue") raise
[docs]class AzureBlobObjectStore(ConcreteObjectStore): """ Object store that stores objects as blobs in an Azure Blob Container. A local cache exists that is used as an intermediate location for files between Galaxy and Azure. """ cache_monitor: Optional[InProcessCacheMonitor] = None store_type = "azure_blob"
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) self.transfer_progress = 0 auth_dict = config_dict["auth"] container_dict = config_dict["container"] cache_dict = config_dict.get("cache") or {} self.enable_cache_monitor, self.cache_monitor_interval = enable_cache_monitor(config, config_dict) self.account_name = auth_dict.get("account_name") self.account_key = auth_dict.get("account_key") self.container_name = container_dict.get("name") self.max_chunk_size = container_dict.get("max_chunk_size", 250) # currently unused self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path self.cache_updated_data = cache_dict.get("cache_updated_data", True) self._initialize()
def _initialize(self): if BlockBlobService is None: raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE) self._configure_connection() if self.enable_cache_monitor: self.cache_monitor = InProcessCacheMonitor(self.cache_target, self.cache_monitor_interval)
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update( { "auth": { "account_name": self.account_name, "account_key": self.account_key, }, "container": { "name": self.container_name, "max_chunk_size": self.max_chunk_size, }, "cache": { "size": self.cache_size, "path": self.staging_path, "cache_updated_data": self.cache_updated_data, }, } ) return as_dict
################### # Private Methods # ################### # config_xml is an ElementTree object.
[docs] @classmethod def parse_xml(clazz, config_xml): return parse_config_xml(config_xml)
def _configure_connection(self): log.debug("Configuring Connection") self.account = CloudStorageAccount(self.account_name, self.account_key) self.service = self.account.create_block_blob_service() def _construct_path( self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, in_cache=False, **kwargs, ): # extra_dir should never be constructed from provided data but just # make sure there are no shenannigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning("extra_dir is not normalized: %s", extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning("alt_name would locate path outside dir: %s", alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but S3 will not # follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # for JOB_WORK directory if obj_dir: rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) if base_dir: base = self.extra_dirs.get(base_dir) return os.path.join(base, rel_path) # S3 folders are marked by having trailing '/' so add it now # rel_path = '%s/' % rel_path # assume for now we don't need this in Azure blob storage. if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") if in_cache: return self._get_cache_path(rel_path) return rel_path def _fix_permissions(self, rel_path): """Set permissions on rel_path""" for basedir, _, files in os.walk(rel_path): umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid) for filename in files: path = os.path.join(basedir, filename) # Ignore symlinks if os.path.islink(path): continue umask_fix_perms(path, self.config.umask, 0o666, self.config.gid) def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) def _get_transfer_progress(self): return self.transfer_progress def _get_size_in_azure(self, rel_path): try: properties = self.service.get_blob_properties(self.container_name, rel_path) # Currently this returns a blob and not a BlobProperties object # Similar issue for the ruby https://github.com/Azure/azure-storage-ruby/issues/13 # The typecheck is an attempt at future-proofing this when/if the bug is fixed. if type(properties) is Blob: properties = properties.properties if properties: size_in_bytes = properties.content_length return size_in_bytes except AzureHttpError: log.exception("Could not get size of blob '%s' from Azure", rel_path) return -1 def _in_azure(self, rel_path): try: exists = self.service.exists(self.container_name, rel_path) except AzureHttpError: log.exception("Trouble checking existence of Azure blob '%s'", rel_path) return False return exists def _in_cache(self, rel_path): """Check if the given dataset is in the local cache.""" cache_path = self._get_cache_path(rel_path) return os.path.exists(cache_path) def _pull_into_cache(self, rel_path): # Ensure the cache directory structure exists (e.g., dataset_#_files/) rel_path_dir = os.path.dirname(rel_path) if not os.path.exists(self._get_cache_path(rel_path_dir)): os.makedirs(self._get_cache_path(rel_path_dir), exist_ok=True) # Now pull in the file file_ok = self._download(rel_path) self._fix_permissions(self._get_cache_path(rel_path_dir)) return file_ok def _transfer_cb(self, complete, total): self.transfer_progress = float(complete) / float(total) * 100 # in percent def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: log.debug("Pulling '%s' into cache to %s", rel_path, local_destination) if not self.cache_target.fits_in_cache(self._get_size_in_azure(rel_path)): log.critical( "File %s is larger (%s bytes) than the configured cache allows (%s). Cannot download.", rel_path, self._get_size_in_azure(rel_path), self.cache_target.log_description, ) return False else: self.transfer_progress = 0 # Reset transfer progress counter self.service.get_blob_to_path( self.container_name, rel_path, local_destination, progress_callback=self._transfer_cb ) return True except AzureHttpError: log.exception("Problem downloading '%s' from Azure", rel_path) return False def _push_to_os(self, rel_path, source_file=None, from_string=None): """ Push the file pointed to by ``rel_path`` to the object store naming the blob ``rel_path``. If ``source_file`` is provided, push that file instead while still using ``rel_path`` as the blob name. If ``from_string`` is provided, set contents of the file to the value of the string. """ try: source_file = source_file or self._get_cache_path(rel_path) if not os.path.exists(source_file): log.error( "Tried updating blob '%s' from source file '%s', but source file does not exist.", rel_path, source_file, ) return False if os.path.getsize(source_file) == 0: log.debug( "Wanted to push file '%s' to azure blob '%s' but its size is 0; skipping.", source_file, rel_path ) return True if from_string: self.service.create_blob_from_text( self.container_name, rel_path, from_string, progress_callback=self._transfer_cb ) log.debug("Pushed data from string '%s' to blob '%s'", from_string, rel_path) else: start_time = datetime.now() log.debug( "Pushing cache file '%s' of size %s bytes to '%s'", source_file, os.path.getsize(source_file), rel_path, ) self.transfer_progress = 0 # Reset transfer progress counter self.service.create_blob_from_path( self.container_name, rel_path, source_file, progress_callback=self._transfer_cb ) end_time = datetime.now() log.debug( "Pushed cache file '%s' to blob '%s' (%s bytes transfered in %s sec)", source_file, rel_path, os.path.getsize(source_file), end_time - start_time, ) return True except AzureHttpError: log.exception("Trouble pushing to Azure Blob '%s' from file '%s'", rel_path, source_file) return False ################## # Public Methods # ################## def _exists(self, obj, **kwargs): in_cache = in_azure = False rel_path = self._construct_path(obj, **kwargs) in_cache = self._in_cache(rel_path) in_azure = self._in_azure(rel_path) # log.debug("~~~~~~ File '%s' exists in cache: %s; in azure: %s" % (rel_path, in_cache, in_azure)) # dir_only does not get synced so shortcut the decision dir_only = kwargs.get("dir_only", False) base_dir = kwargs.get("base_dir", None) if dir_only: if in_cache or in_azure: return True # for JOB_WORK directory elif base_dir: if not os.path.exists(rel_path): os.makedirs(rel_path, exist_ok=True) return True else: return False # TODO: Sync should probably not be done here. Add this to an async upload stack? if in_cache and not in_azure: self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path)) return True elif in_azure: return True else: return False
[docs] def file_ready(self, obj, **kwargs): """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return ``True`` if so, ``False`` otherwise. """ rel_path = self._construct_path(obj, **kwargs) # Make sure the size in cache is available in its entirety if self._in_cache(rel_path): local_size = os.path.getsize(self._get_cache_path(rel_path)) remote_size = self._get_size_in_azure(rel_path) if local_size == remote_size: return True else: log.debug("Waiting for dataset %s to transfer from OS: %s/%s", rel_path, local_size, remote_size) return False
def _create(self, obj, **kwargs): if not self._exists(obj, **kwargs): # Pull out locally used fields extra_dir = kwargs.get("extra_dir", None) extra_dir_at_root = kwargs.get("extra_dir_at_root", False) dir_only = kwargs.get("dir_only", False) alt_name = kwargs.get("alt_name", None) # Construct hashed path rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) # Optionally append extra_dir if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # Create given directory in cache cache_dir = os.path.join(self.staging_path, rel_path) if not os.path.exists(cache_dir): os.makedirs(cache_dir, exist_ok=True) # Although not really necessary to create S3 folders (because S3 has # flat namespace), do so for consistency with the regular file system # S3 folders are marked by having trailing '/' so add it now # s3_dir = '%s/' % rel_path # self._push_to_os(s3_dir, from_string='') # If instructed, create the dataset in cache & in S3 if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") open(os.path.join(self.staging_path, rel_path), "w").close() self._push_to_os(rel_path, from_string="") def _empty(self, obj, **kwargs): if self._exists(obj, **kwargs): return bool(self._size(obj, **kwargs) > 0) else: raise ObjectNotFound(f"objectstore.empty, object does not exist: {str(obj)}, kwargs: {str(kwargs)}") def _size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) if self._in_cache(rel_path): try: return os.path.getsize(self._get_cache_path(rel_path)) except OSError as ex: log.info("Could not get size of file '%s' in local cache, will try Azure. Error: %s", rel_path, ex) elif self._exists(obj, **kwargs): return self._get_size_in_azure(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 def _delete(self, obj, entire_dir=False, **kwargs): rel_path = self._construct_path(obj, **kwargs) extra_dir = kwargs.get("extra_dir", None) base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) obj_dir = kwargs.get("obj_dir", False) try: if base_dir and dir_only and obj_dir: # Remove temporary data in JOB_WORK directory shutil.rmtree(os.path.abspath(rel_path)) return True # For the case of extra_files, because we don't have a reference to # individual files/blobs we need to remove the entire directory structure # with all the files in it. This is easy for the local file system, # but requires iterating through each individual blob in Azure and deleing it. if entire_dir and extra_dir: shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True) blobs = self.service.list_blobs(self.container_name, prefix=rel_path) for blob in blobs: log.debug("Deleting from Azure: %s", blob) self.service.delete_blob(self.container_name, blob.name) return True else: # Delete from cache first unlink(self._get_cache_path(rel_path), ignore_errors=True) # Delete from S3 as well if self._in_azure(rel_path): log.debug("Deleting from Azure: %s", rel_path) self.service.delete_blob(self.container_name, rel_path) return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) except OSError: log.exception("%s delete error", self._get_filename(obj, **kwargs)) return False def _get_data(self, obj, start=0, count=-1, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Check cache first and get file if not there if not self._in_cache(rel_path): self._pull_into_cache(rel_path) # Read the file content from cache data_file = open(self._get_cache_path(rel_path)) data_file.seek(start) content = data_file.read(count) data_file.close() return content def _get_filename(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) obj_dir = kwargs.get("obj_dir", False) sync_cache = kwargs.get("sync_cache", True) # for JOB_WORK directory if base_dir and dir_only and obj_dir: return os.path.abspath(rel_path) cache_path = self._get_cache_path(rel_path) if not sync_cache: return cache_path # S3 does not recognize directories as files so cannot check if those exist. # So, if checking dir only, ensure given dir exists in cache and return # the expected cache path. # dir_only = kwargs.get('dir_only', False) # if dir_only: # if not os.path.exists(cache_path): # os.makedirs(cache_path) # return cache_path # Check if the file exists in the cache first, always pull if file size in cache is zero if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0): return cache_path # Check if the file exists in persistent storage and, if it does, pull it into cache elif self._exists(obj, **kwargs): if dir_only: # Directories do not get pulled into cache return cache_path else: if self._pull_into_cache(rel_path): return cache_path # For the case of retrieving a directory only, return the expected path # even if it does not exist. # if dir_only: # return cache_path raise ObjectNotFound(f"objectstore.get_filename, no cache_path: {str(obj)}, kwargs: {str(kwargs)}") def _update_from_file(self, obj, file_name=None, create=False, **kwargs): if create is True: self._create(obj, **kwargs) elif self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Chose whether to use the dataset file itself or an alternate file if file_name: source_file = os.path.abspath(file_name) # Copy into cache cache_file = self._get_cache_path(rel_path) try: if source_file != cache_file and self.cache_updated_data: # FIXME? Should this be a `move`? shutil.copy2(source_file, cache_file) self._fix_permissions(cache_file) except OSError: log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file) else: source_file = self._get_cache_path(rel_path) self._push_to_os(rel_path, source_file) else: raise ObjectNotFound( f"objectstore.update_from_file, object does not exist: {str(obj)}, kwargs: {str(kwargs)}" ) def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: url = self.service.make_blob_url(container_name=self.container_name, blob_name=rel_path) return url except AzureHttpError: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None def _get_store_usage_percent(self): return 0.0 @property def cache_target(self) -> CacheTarget: return CacheTarget( self.staging_path, self.cache_size, 0.9, )
[docs] def shutdown(self): self.cache_monitor and self.cache_monitor.shutdown()