Source code for galaxy.objectstore.azure_blob

"""
Object Store plugin for the Microsoft Azure Block Blob Storage system
"""

import logging
import os
import shutil
import threading
import time
from datetime import datetime

try:
    from azure.common import AzureHttpError
    from azure.storage import CloudStorageAccount
    from azure.storage.blob import BlockBlobService
    from azure.storage.blob.models import Blob
except ImportError:
    BlockBlobService = None

from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound
)
from galaxy.util import (
    directory_hash_id,
    umask_fix_perms
)
from galaxy.util.path import safe_relpath
from galaxy.util.sleeper import Sleeper
from ..objectstore import (
    ConcreteObjectStore,
    convert_bytes,
)

NO_BLOBSERVICE_ERROR_MESSAGE = ("ObjectStore configured, but no azure.storage.blob dependency available."
                                "Please install and properly configure azure.storage.blob or modify Object Store configuration.")

log = logging.getLogger(__name__)


[docs]def parse_config_xml(config_xml): try: auth_xml = config_xml.findall('auth')[0] account_name = auth_xml.get('account_name') account_key = auth_xml.get('account_key') container_xml = config_xml.find('container') container_name = container_xml.get('name') max_chunk_size = int(container_xml.get('max_chunk_size', 250)) # currently unused c_xml = config_xml.findall('cache')[0] cache_size = float(c_xml.get('size', -1)) staging_path = c_xml.get('path', None) tag, attrs = 'extra_dir', ('type', 'path') extra_dirs = config_xml.findall(tag) if not extra_dirs: msg = f'No {tag} element in XML tree' log.error(msg) raise Exception(msg) extra_dirs = [{k: e.get(k) for k in attrs} for e in extra_dirs] return { 'auth': { 'account_name': account_name, 'account_key': account_key, }, 'container': { 'name': container_name, 'max_chunk_size': max_chunk_size, }, 'cache': { 'size': cache_size, 'path': staging_path, }, 'extra_dirs': extra_dirs, } except Exception: # Toss it back up after logging, we can't continue loading at this point. log.exception("Malformed ObjectStore Configuration XML -- unable to continue") raise
[docs]class AzureBlobObjectStore(ConcreteObjectStore): """ Object store that stores objects as blobs in an Azure Blob Container. A local cache exists that is used as an intermediate location for files between Galaxy and Azure. """ store_type = 'azure_blob'
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) self.transfer_progress = 0 auth_dict = config_dict["auth"] container_dict = config_dict["container"] cache_dict = config_dict["cache"] self.account_name = auth_dict.get('account_name') self.account_key = auth_dict.get('account_key') self.container_name = container_dict.get('name') self.max_chunk_size = container_dict.get('max_chunk_size', 250) # currently unused self.cache_size = cache_dict.get('size', -1) self.staging_path = cache_dict.get('path') or self.config.object_store_cache_path self._initialize()
def _initialize(self): if BlockBlobService is None: raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE) self._configure_connection() # Clean cache only if value is set in galaxy.ini if self.cache_size != -1: # Convert GBs to bytes for comparison self.cache_size = self.cache_size * 1073741824 # Helper for interruptable sleep self.sleeper = Sleeper() self.cache_monitor_thread = threading.Thread(target=self.__cache_monitor) self.cache_monitor_thread.start() log.info("Cache cleaner manager started")
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update({ 'auth': { 'account_name': self.account_name, 'account_key': self.account_key, }, 'container': { 'name': self.container_name, 'max_chunk_size': self.max_chunk_size, }, 'cache': { 'size': self.cache_size, 'path': self.staging_path, } }) return as_dict
################### # Private Methods # ################### # config_xml is an ElementTree object.
[docs] @classmethod def parse_xml(clazz, config_xml): return parse_config_xml(config_xml)
def _configure_connection(self): log.debug("Configuring Connection") self.account = CloudStorageAccount(self.account_name, self.account_key) self.service = self.account.create_block_blob_service() def _construct_path(self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, **kwargs): # extra_dir should never be constructed from provided data but just # make sure there are no shenannigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning('extra_dir is not normalized: %s', extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning('alt_name would locate path outside dir: %s', alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but S3 will not # follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # for JOB_WORK directory if obj_dir: rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) if base_dir: base = self.extra_dirs.get(base_dir) return os.path.join(base, rel_path) # S3 folders are marked by having trailing '/' so add it now # rel_path = '%s/' % rel_path # assume for now we don't need this in Azure blob storage. if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % self._get_object_id(obj)) return rel_path def _fix_permissions(self, rel_path): """ Set permissions on rel_path""" for basedir, _, files in os.walk(rel_path): umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid) for filename in files: path = os.path.join(basedir, filename) # Ignore symlinks if os.path.islink(path): continue umask_fix_perms(path, self.config.umask, 0o666, self.config.gid) def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) def _get_transfer_progress(self): return self.transfer_progress def _get_size_in_azure(self, rel_path): try: properties = self.service.get_blob_properties(self.container_name, rel_path) # Currently this returns a blob and not a BlobProperties object # Similar issue for the ruby https://github.com/Azure/azure-storage-ruby/issues/13 # The typecheck is an attempt at future-proofing this when/if the bug is fixed. if type(properties) is Blob: properties = properties.properties if properties: size_in_bytes = properties.content_length return size_in_bytes except AzureHttpError: log.exception("Could not get size of blob '%s' from Azure", rel_path) return -1 def _in_azure(self, rel_path): try: exists = self.service.exists(self.container_name, rel_path) except AzureHttpError: log.exception("Trouble checking existence of Azure blob '%s'", rel_path) return False return exists def _in_cache(self, rel_path): """ Check if the given dataset is in the local cache. """ cache_path = self._get_cache_path(rel_path) return os.path.exists(cache_path) def _pull_into_cache(self, rel_path): # Ensure the cache directory structure exists (e.g., dataset_#_files/) rel_path_dir = os.path.dirname(rel_path) if not os.path.exists(self._get_cache_path(rel_path_dir)): os.makedirs(self._get_cache_path(rel_path_dir)) # Now pull in the file file_ok = self._download(rel_path) self._fix_permissions(self._get_cache_path(rel_path_dir)) return file_ok def _transfer_cb(self, complete, total): self.transfer_progress = float(complete) / float(total) * 100 # in percent def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) try: log.debug("Pulling '%s' into cache to %s", rel_path, local_destination) if self.cache_size > 0 and self._get_size_in_azure(rel_path) > self.cache_size: log.critical("File %s is larger (%s) than the cache size (%s). Cannot download.", rel_path, self._get_size_in_azure(rel_path), self.cache_size) return False else: self.transfer_progress = 0 # Reset transfer progress counter self.service.get_blob_to_path(self.container_name, rel_path, local_destination, progress_callback=self._transfer_cb) return True except AzureHttpError: log.exception("Problem downloading '%s' from Azure", rel_path) return False def _push_to_os(self, rel_path, source_file=None, from_string=None): """ Push the file pointed to by ``rel_path`` to the object store naming the blob ``rel_path``. If ``source_file`` is provided, push that file instead while still using ``rel_path`` as the blob name. If ``from_string`` is provided, set contents of the file to the value of the string. """ try: source_file = source_file or self._get_cache_path(rel_path) if not os.path.exists(source_file): log.error("Tried updating blob '%s' from source file '%s', but source file does not exist.", rel_path, source_file) return False if os.path.getsize(source_file) == 0: log.debug("Wanted to push file '%s' to azure blob '%s' but its size is 0; skipping.", source_file, rel_path) return True if from_string: self.service.create_blob_from_text(self.container_name, rel_path, from_string, progress_callback=self._transfer_cb) log.debug("Pushed data from string '%s' to blob '%s'", from_string, rel_path) else: start_time = datetime.now() log.debug("Pushing cache file '%s' of size %s bytes to '%s'", source_file, os.path.getsize(source_file), rel_path) self.transfer_progress = 0 # Reset transfer progress counter self.service.create_blob_from_path(self.container_name, rel_path, source_file, progress_callback=self._transfer_cb) end_time = datetime.now() log.debug("Pushed cache file '%s' to blob '%s' (%s bytes transfered in %s sec)", source_file, rel_path, os.path.getsize(source_file), end_time - start_time) return True except AzureHttpError: log.exception("Trouble pushing to Azure Blob '%s' from file '%s'", rel_path, source_file) return False ################## # Public Methods # ################## def _exists(self, obj, **kwargs): in_cache = in_azure = False rel_path = self._construct_path(obj, **kwargs) in_cache = self._in_cache(rel_path) in_azure = self._in_azure(rel_path) # log.debug("~~~~~~ File '%s' exists in cache: %s; in azure: %s" % (rel_path, in_cache, in_azure)) # dir_only does not get synced so shortcut the decision dir_only = kwargs.get('dir_only', False) base_dir = kwargs.get('base_dir', None) if dir_only: if in_cache or in_azure: return True # for JOB_WORK directory elif base_dir: if not os.path.exists(rel_path): os.makedirs(rel_path) return True else: return False # TODO: Sync should probably not be done here. Add this to an async upload stack? if in_cache and not in_azure: self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path)) return True elif in_azure: return True else: return False
[docs] def file_ready(self, obj, **kwargs): """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return ``True`` if so, ``False`` otherwise. """ rel_path = self._construct_path(obj, **kwargs) # Make sure the size in cache is available in its entirety if self._in_cache(rel_path): local_size = os.path.getsize(self._get_cache_path(rel_path)) remote_size = self._get_size_in_azure(rel_path) if local_size == remote_size: return True else: log.debug("Waiting for dataset %s to transfer from OS: %s/%s", rel_path, local_size, remote_size) return False
def _create(self, obj, **kwargs): if not self._exists(obj, **kwargs): # Pull out locally used fields extra_dir = kwargs.get('extra_dir', None) extra_dir_at_root = kwargs.get('extra_dir_at_root', False) dir_only = kwargs.get('dir_only', False) alt_name = kwargs.get('alt_name', None) # Construct hashed path rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) # Optionally append extra_dir if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # Create given directory in cache cache_dir = os.path.join(self.staging_path, rel_path) if not os.path.exists(cache_dir): os.makedirs(cache_dir) # Although not really necessary to create S3 folders (because S3 has # flat namespace), do so for consistency with the regular file system # S3 folders are marked by having trailing '/' so add it now # s3_dir = '%s/' % rel_path # self._push_to_os(s3_dir, from_string='') # If instructed, create the dataset in cache & in S3 if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % self._get_object_id(obj)) open(os.path.join(self.staging_path, rel_path), 'w').close() self._push_to_os(rel_path, from_string='') def _empty(self, obj, **kwargs): if self._exists(obj, **kwargs): return bool(self._size(obj, **kwargs) > 0) else: raise ObjectNotFound('objectstore.empty, object does not exist: {}, kwargs: {}'.format(str(obj), str(kwargs))) def _size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) if self._in_cache(rel_path): try: return os.path.getsize(self._get_cache_path(rel_path)) except OSError as ex: log.info("Could not get size of file '%s' in local cache, will try Azure. Error: %s", rel_path, ex) elif self._exists(obj, **kwargs): return self._get_size_in_azure(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 def _delete(self, obj, entire_dir=False, **kwargs): rel_path = self._construct_path(obj, **kwargs) extra_dir = kwargs.get('extra_dir', None) base_dir = kwargs.get('base_dir', None) dir_only = kwargs.get('dir_only', False) obj_dir = kwargs.get('obj_dir', False) try: if base_dir and dir_only and obj_dir: # Remove temporary data in JOB_WORK directory shutil.rmtree(os.path.abspath(rel_path)) return True # For the case of extra_files, because we don't have a reference to # individual files/blobs we need to remove the entire directory structure # with all the files in it. This is easy for the local file system, # but requires iterating through each individual blob in Azure and deleing it. if entire_dir and extra_dir: shutil.rmtree(self._get_cache_path(rel_path)) blobs = self.service.list_blobs(self.container_name, prefix=rel_path) for blob in blobs: log.debug("Deleting from Azure: %s", blob) self.service.delete_blob(self.container_name, blob.name) return True else: # Delete from cache first os.unlink(self._get_cache_path(rel_path)) # Delete from S3 as well if self._in_azure(rel_path): log.debug("Deleting from Azure: %s", rel_path) self.service.delete_blob(self.container_name, rel_path) return True except AzureHttpError: log.exception("Could not delete blob '%s' from Azure", rel_path) except OSError: log.exception('%s delete error', self._get_filename(obj, **kwargs)) return False def _get_data(self, obj, start=0, count=-1, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Check cache first and get file if not there if not self._in_cache(rel_path): self._pull_into_cache(rel_path) # Read the file content from cache data_file = open(self._get_cache_path(rel_path)) data_file.seek(start) content = data_file.read(count) data_file.close() return content def _get_filename(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) base_dir = kwargs.get('base_dir', None) dir_only = kwargs.get('dir_only', False) obj_dir = kwargs.get('obj_dir', False) # for JOB_WORK directory if base_dir and dir_only and obj_dir: return os.path.abspath(rel_path) cache_path = self._get_cache_path(rel_path) # S3 does not recognize directories as files so cannot check if those exist. # So, if checking dir only, ensure given dir exists in cache and return # the expected cache path. # dir_only = kwargs.get('dir_only', False) # if dir_only: # if not os.path.exists(cache_path): # os.makedirs(cache_path) # return cache_path # Check if the file exists in the cache first if self._in_cache(rel_path): return cache_path # Check if the file exists in persistent storage and, if it does, pull it into cache elif self._exists(obj, **kwargs): if dir_only: # Directories do not get pulled into cache return cache_path else: if self._pull_into_cache(rel_path): return cache_path # For the case of retrieving a directory only, return the expected path # even if it does not exist. # if dir_only: # return cache_path raise ObjectNotFound('objectstore.get_filename, no cache_path: {}, kwargs: {}'.format(str(obj), str(kwargs))) def _update_from_file(self, obj, file_name=None, create=False, **kwargs): if create is True: self._create(obj, **kwargs) elif self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Chose whether to use the dataset file itself or an alternate file if file_name: source_file = os.path.abspath(file_name) # Copy into cache cache_file = self._get_cache_path(rel_path) try: if source_file != cache_file: # FIXME? Should this be a `move`? shutil.copy2(source_file, cache_file) self._fix_permissions(cache_file) except OSError: log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file) else: source_file = self._get_cache_path(rel_path) self._push_to_os(rel_path, source_file) else: raise ObjectNotFound('objectstore.update_from_file, object does not exist: {}, kwargs: {}'.format(str(obj), str(kwargs))) def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: url = self.service.make_blob_url(container_name=self.container_name, blob_name=rel_path) return url except AzureHttpError: log.exception("Trouble generating URL for dataset '%s'", rel_path) return None def _get_store_usage_percent(self): return 0.0 ################## # Secret Methods # ################## def __cache_monitor(self): time.sleep(2) # Wait for things to load before starting the monitor while self.running: total_size = 0 # Is this going to be too expensive of an operation to be done frequently? file_list = [] for dirpath, _, filenames in os.walk(self.staging_path): for filename in filenames: filepath = os.path.join(dirpath, filename) file_size = os.path.getsize(filepath) total_size += file_size # Get the time given file was last accessed last_access_time = time.localtime(os.stat(filepath)[7]) # Compose a tuple of the access time and the file path file_tuple = last_access_time, filepath, file_size file_list.append(file_tuple) # Sort the file list (based on access time) file_list.sort() # Initiate cleaning once within 10% of the defined cache size? cache_limit = self.cache_size * 0.9 if total_size > cache_limit: log.info("Initiating cache cleaning: current cache size: %s; clean until smaller than: %s", convert_bytes(total_size), convert_bytes(cache_limit)) # How much to delete? If simply deleting up to the cache-10% limit, # is likely to be deleting frequently and may run the risk of hitting # the limit - maybe delete additional #%? # For now, delete enough to leave at least 10% of the total cache free delete_this_much = total_size - cache_limit # Keep deleting datasets from file_list until deleted_amount does not # exceed delete_this_much; start deleting from the front of the file list, # which assumes the oldest files come first on the list. deleted_amount = 0 for entry in enumerate(file_list): if deleted_amount < delete_this_much: deleted_amount += entry[2] os.remove(entry[1]) # Debugging code for printing deleted files' stats # folder, file_name = os.path.split(f[1]) # file_date = time.strftime("%m/%d/%y %H:%M:%S", f[0]) # log.debug("%s. %-25s %s, size %s (deleted %s/%s)" \ # % (i, file_name, convert_bytes(f[2]), file_date, \ # convert_bytes(deleted_amount), convert_bytes(delete_this_much))) else: log.debug("Cache cleaning done. Total space freed: %s", convert_bytes(deleted_amount)) self.sleeper.sleep(30) # Test cache size every 30 seconds?