Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.objectstore.azure_blob
"""
Object Store plugin for the Microsoft Azure Block Blob Storage system
"""
import logging
import os
import shutil
import threading
import time
from datetime import datetime
try:
from azure.common import AzureHttpError
from azure.storage import CloudStorageAccount
from azure.storage.blob import BlockBlobService
from azure.storage.blob.models import Blob
except ImportError:
BlockBlobService = None
from galaxy.exceptions import (
ObjectInvalid,
ObjectNotFound
)
from galaxy.util import (
directory_hash_id,
umask_fix_perms
)
from galaxy.util.path import safe_relpath
from galaxy.util.sleeper import Sleeper
from ..objectstore import (
convert_bytes,
ObjectStore
)
NO_BLOBSERVICE_ERROR_MESSAGE = ("ObjectStore configured, but no azure.storage.blob dependency available."
"Please install and properly configure azure.storage.blob or modify Object Store configuration.")
log = logging.getLogger(__name__)
[docs]def parse_config_xml(config_xml):
try:
auth_xml = config_xml.findall('auth')[0]
account_name = auth_xml.get('account_name')
account_key = auth_xml.get('account_key')
container_xml = config_xml.find('container')
container_name = container_xml.get('name')
max_chunk_size = int(container_xml.get('max_chunk_size', 250)) # currently unused
c_xml = config_xml.findall('cache')[0]
cache_size = float(c_xml.get('size', -1))
staging_path = c_xml.get('path', None)
tag, attrs = 'extra_dir', ('type', 'path')
extra_dirs = config_xml.findall(tag)
if not extra_dirs:
msg = 'No {tag} element in XML tree'.format(tag=tag)
log.error(msg)
raise Exception(msg)
extra_dirs = [dict(((k, e.get(k)) for k in attrs)) for e in extra_dirs]
return {
'auth': {
'account_name': account_name,
'account_key': account_key,
},
'container': {
'name': container_name,
'max_chunk_size': max_chunk_size,
},
'cache': {
'size': cache_size,
'path': staging_path,
},
'extra_dirs': extra_dirs,
}
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed ObjectStore Configuration XML -- unable to continue")
raise
[docs]class AzureBlobObjectStore(ObjectStore):
"""
Object store that stores objects as blobs in an Azure Blob Container. A local
cache exists that is used as an intermediate location for files between
Galaxy and Azure.
"""
store_type = 'azure_blob'
[docs] def __init__(self, config, config_dict):
super(AzureBlobObjectStore, self).__init__(config, config_dict)
self.transfer_progress = 0
auth_dict = config_dict["auth"]
container_dict = config_dict["container"]
cache_dict = config_dict["cache"]
self.account_name = auth_dict.get('account_name')
self.account_key = auth_dict.get('account_key')
self.container_name = container_dict.get('name')
self.max_chunk_size = container_dict.get('max_chunk_size', 250) # currently unused
self.cache_size = cache_dict.get('size', -1)
self.staging_path = cache_dict.get('path') or self.config.object_store_cache_path
self._initialize()
def _initialize(self):
if BlockBlobService is None:
raise Exception(NO_BLOBSERVICE_ERROR_MESSAGE)
self._configure_connection()
# Clean cache only if value is set in galaxy.ini
if self.cache_size != -1:
# Convert GBs to bytes for comparison
self.cache_size = self.cache_size * 1073741824
# Helper for interruptable sleep
self.sleeper = Sleeper()
self.cache_monitor_thread = threading.Thread(target=self.__cache_monitor)
self.cache_monitor_thread.start()
log.info("Cache cleaner manager started")
[docs] def to_dict(self):
as_dict = super(AzureBlobObjectStore, self).to_dict()
as_dict.update({
'auth': {
'account_name': self.account_name,
'account_key': self.account_key,
},
'container': {
'name': self.container_name,
'max_chunk_size': self.max_chunk_size,
},
'cache': {
'size': self.cache_size,
'path': self.staging_path,
}
})
return as_dict
###################
# Private Methods #
###################
# config_xml is an ElementTree object.
def _configure_connection(self):
log.debug("Configuring Connection")
self.account = CloudStorageAccount(self.account_name, self.account_key)
self.service = self.account.create_block_blob_service()
def _construct_path(self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, **kwargs):
# extra_dir should never be constructed from provided data but just
# make sure there are no shenannigans afoot
if extra_dir and extra_dir != os.path.normpath(extra_dir):
log.warning('extra_dir is not normalized: %s', extra_dir)
raise ObjectInvalid("The requested object is invalid")
# ensure that any parent directory references in alt_name would not
# result in a path not contained in the directory path constructed here
if alt_name:
if not safe_relpath(alt_name):
log.warning('alt_name would locate path outside dir: %s', alt_name)
raise ObjectInvalid("The requested object is invalid")
# alt_name can contain parent directory references, but S3 will not
# follow them, so if they are valid we normalize them out
alt_name = os.path.normpath(alt_name)
rel_path = os.path.join(*directory_hash_id(obj.id))
if extra_dir is not None:
if extra_dir_at_root:
rel_path = os.path.join(extra_dir, rel_path)
else:
rel_path = os.path.join(rel_path, extra_dir)
# for JOB_WORK directory
if obj_dir:
rel_path = os.path.join(rel_path, str(obj.id))
if base_dir:
base = self.extra_dirs.get(base_dir)
return os.path.join(base, rel_path)
# S3 folders are marked by having trailing '/' so add it now
# rel_path = '%s/' % rel_path # assume for now we don't need this in Azure blob storage.
if not dir_only:
rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id)
return rel_path
def _fix_permissions(self, rel_path):
""" Set permissions on rel_path"""
for basedir, _, files in os.walk(rel_path):
umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid)
for filename in files:
path = os.path.join(basedir, filename)
# Ignore symlinks
if os.path.islink(path):
continue
umask_fix_perms(path, self.config.umask, 0o666, self.config.gid)
def _get_cache_path(self, rel_path):
return os.path.abspath(os.path.join(self.staging_path, rel_path))
def _get_transfer_progress(self):
return self.transfer_progress
def _get_size_in_azure(self, rel_path):
try:
properties = self.service.get_blob_properties(self.container_name, rel_path)
# Currently this returns a blob and not a BlobProperties object
# Similar issue for the ruby https://github.com/Azure/azure-storage-ruby/issues/13
# The typecheck is an attempt at future-proofing this when/if the bug is fixed.
if type(properties) is Blob:
properties = properties.properties
if properties:
size_in_bytes = properties.content_length
return size_in_bytes
except AzureHttpError:
log.exception("Could not get size of blob '%s' from Azure", rel_path)
return -1
def _in_azure(self, rel_path):
try:
exists = self.service.exists(self.container_name, rel_path)
except AzureHttpError:
log.exception("Trouble checking existence of Azure blob '%s'", rel_path)
return False
return exists
def _in_cache(self, rel_path):
""" Check if the given dataset is in the local cache. """
cache_path = self._get_cache_path(rel_path)
return os.path.exists(cache_path)
def _pull_into_cache(self, rel_path):
# Ensure the cache directory structure exists (e.g., dataset_#_files/)
rel_path_dir = os.path.dirname(rel_path)
if not os.path.exists(self._get_cache_path(rel_path_dir)):
os.makedirs(self._get_cache_path(rel_path_dir))
# Now pull in the file
file_ok = self._download(rel_path)
self._fix_permissions(self._get_cache_path(rel_path_dir))
return file_ok
def _transfer_cb(self, complete, total):
self.transfer_progress = float(complete) / float(total) * 100 # in percent
def _download(self, rel_path):
local_destination = self._get_cache_path(rel_path)
try:
log.debug("Pulling '%s' into cache to %s", rel_path, local_destination)
if self.cache_size > 0 and self._get_size_in_azure(rel_path) > self.cache_size:
log.critical("File %s is larger (%s) than the cache size (%s). Cannot download.",
rel_path, self._get_size_in_azure(rel_path), self.cache_size)
return False
else:
self.transfer_progress = 0 # Reset transfer progress counter
self.service.get_blob_to_path(self.container_name, rel_path, local_destination, progress_callback=self._transfer_cb)
return True
except AzureHttpError:
log.exception("Problem downloading '%s' from Azure", rel_path)
return False
def _push_to_os(self, rel_path, source_file=None, from_string=None):
"""
Push the file pointed to by ``rel_path`` to the object store naming the blob
``rel_path``. If ``source_file`` is provided, push that file instead while
still using ``rel_path`` as the blob name.
If ``from_string`` is provided, set contents of the file to the value of
the string.
"""
try:
source_file = source_file or self._get_cache_path(rel_path)
if not os.path.exists(source_file):
log.error("Tried updating blob '%s' from source file '%s', but source file does not exist.", rel_path, source_file)
return False
if os.path.getsize(source_file) == 0:
log.debug("Wanted to push file '%s' to azure blob '%s' but its size is 0; skipping.", source_file, rel_path)
return True
if from_string:
self.service.create_blob_from_text(self.container_name, rel_path, from_string, progress_callback=self._transfer_cb)
log.debug("Pushed data from string '%s' to blob '%s'", from_string, rel_path)
else:
start_time = datetime.now()
log.debug("Pushing cache file '%s' of size %s bytes to '%s'", source_file, os.path.getsize(source_file), rel_path)
self.transfer_progress = 0 # Reset transfer progress counter
self.service.create_blob_from_path(self.container_name, rel_path, source_file, progress_callback=self._transfer_cb)
end_time = datetime.now()
log.debug("Pushed cache file '%s' to blob '%s' (%s bytes transfered in %s sec)",
source_file, rel_path, os.path.getsize(source_file), end_time - start_time)
return True
except AzureHttpError:
log.exception("Trouble pushing to Azure Blob '%s' from file '%s'", rel_path, source_file)
return False
##################
# Public Methods #
##################
[docs] def exists(self, obj, **kwargs):
in_cache = in_azure = False
rel_path = self._construct_path(obj, **kwargs)
in_cache = self._in_cache(rel_path)
in_azure = self._in_azure(rel_path)
# log.debug("~~~~~~ File '%s' exists in cache: %s; in azure: %s" % (rel_path, in_cache, in_azure))
# dir_only does not get synced so shortcut the decision
dir_only = kwargs.get('dir_only', False)
base_dir = kwargs.get('base_dir', None)
if dir_only:
if in_cache or in_azure:
return True
# for JOB_WORK directory
elif base_dir:
if not os.path.exists(rel_path):
os.makedirs(rel_path)
return True
else:
return False
# TODO: Sync should probably not be done here. Add this to an async upload stack?
if in_cache and not in_azure:
self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path))
return True
elif in_azure:
return True
else:
return False
[docs] def file_ready(self, obj, **kwargs):
"""
A helper method that checks if a file corresponding to a dataset is
ready and available to be used. Return ``True`` if so, ``False`` otherwise.
"""
rel_path = self._construct_path(obj, **kwargs)
# Make sure the size in cache is available in its entirety
if self._in_cache(rel_path):
local_size = os.path.getsize(self._get_cache_path(rel_path))
remote_size = self._get_size_in_azure(rel_path)
if local_size == remote_size:
return True
else:
log.debug("Waiting for dataset %s to transfer from OS: %s/%s", rel_path, local_size, remote_size)
return False
[docs] def create(self, obj, **kwargs):
if not self.exists(obj, **kwargs):
# Pull out locally used fields
extra_dir = kwargs.get('extra_dir', None)
extra_dir_at_root = kwargs.get('extra_dir_at_root', False)
dir_only = kwargs.get('dir_only', False)
alt_name = kwargs.get('alt_name', None)
# Construct hashed path
rel_path = os.path.join(*directory_hash_id(obj.id))
# Optionally append extra_dir
if extra_dir is not None:
if extra_dir_at_root:
rel_path = os.path.join(extra_dir, rel_path)
else:
rel_path = os.path.join(rel_path, extra_dir)
# Create given directory in cache
cache_dir = os.path.join(self.staging_path, rel_path)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
# Although not really necessary to create S3 folders (because S3 has
# flat namespace), do so for consistency with the regular file system
# S3 folders are marked by having trailing '/' so add it now
# s3_dir = '%s/' % rel_path
# self._push_to_os(s3_dir, from_string='')
# If instructed, create the dataset in cache & in S3
if not dir_only:
rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id)
open(os.path.join(self.staging_path, rel_path), 'w').close()
self._push_to_os(rel_path, from_string='')
[docs] def empty(self, obj, **kwargs):
if self.exists(obj, **kwargs):
return bool(self.size(obj, **kwargs) > 0)
else:
raise ObjectNotFound('objectstore.empty, object does not exist: %s, kwargs: %s' % (str(obj), str(kwargs)))
[docs] def size(self, obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
if self._in_cache(rel_path):
try:
return os.path.getsize(self._get_cache_path(rel_path))
except OSError as ex:
log.info("Could not get size of file '%s' in local cache, will try Azure. Error: %s", rel_path, ex)
elif self.exists(obj, **kwargs):
return self._get_size_in_azure(rel_path)
log.warning("Did not find dataset '%s', returning 0 for size", rel_path)
return 0
[docs] def delete(self, obj, entire_dir=False, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
extra_dir = kwargs.get('extra_dir', None)
base_dir = kwargs.get('base_dir', None)
dir_only = kwargs.get('dir_only', False)
obj_dir = kwargs.get('obj_dir', False)
try:
if base_dir and dir_only and obj_dir:
# Remove temporary data in JOB_WORK directory
shutil.rmtree(os.path.abspath(rel_path))
return True
# For the case of extra_files, because we don't have a reference to
# individual files/blobs we need to remove the entire directory structure
# with all the files in it. This is easy for the local file system,
# but requires iterating through each individual blob in Azure and deleing it.
if entire_dir and extra_dir:
shutil.rmtree(self._get_cache_path(rel_path))
blobs = self.service.list_blobs(self.container_name, prefix=rel_path)
for blob in blobs:
log.debug("Deleting from Azure: %s", blob)
self.service.delete_blob(self.container_name, blob.name)
return True
else:
# Delete from cache first
os.unlink(self._get_cache_path(rel_path))
# Delete from S3 as well
if self._in_azure(rel_path):
log.debug("Deleting from Azure: %s", rel_path)
self.service.delete_blob(self.container_name, rel_path)
return True
except AzureHttpError:
log.exception("Could not delete blob '%s' from Azure", rel_path)
except OSError:
log.exception('%s delete error', self.get_filename(obj, **kwargs))
return False
[docs] def get_data(self, obj, start=0, count=-1, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
# Check cache first and get file if not there
if not self._in_cache(rel_path):
self._pull_into_cache(rel_path)
# Read the file content from cache
data_file = open(self._get_cache_path(rel_path), 'r')
data_file.seek(start)
content = data_file.read(count)
data_file.close()
return content
[docs] def get_filename(self, obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
base_dir = kwargs.get('base_dir', None)
dir_only = kwargs.get('dir_only', False)
obj_dir = kwargs.get('obj_dir', False)
# for JOB_WORK directory
if base_dir and dir_only and obj_dir:
return os.path.abspath(rel_path)
cache_path = self._get_cache_path(rel_path)
# S3 does not recognize directories as files so cannot check if those exist.
# So, if checking dir only, ensure given dir exists in cache and return
# the expected cache path.
# dir_only = kwargs.get('dir_only', False)
# if dir_only:
# if not os.path.exists(cache_path):
# os.makedirs(cache_path)
# return cache_path
# Check if the file exists in the cache first
if self._in_cache(rel_path):
return cache_path
# Check if the file exists in persistent storage and, if it does, pull it into cache
elif self.exists(obj, **kwargs):
if dir_only: # Directories do not get pulled into cache
return cache_path
else:
if self._pull_into_cache(rel_path):
return cache_path
# For the case of retrieving a directory only, return the expected path
# even if it does not exist.
# if dir_only:
# return cache_path
raise ObjectNotFound('objectstore.get_filename, no cache_path: %s, kwargs: %s' % (str(obj), str(kwargs)))
[docs] def update_from_file(self, obj, file_name=None, create=False, **kwargs):
if create is True:
self.create(obj, **kwargs)
elif self.exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
# Chose whether to use the dataset file itself or an alternate file
if file_name:
source_file = os.path.abspath(file_name)
# Copy into cache
cache_file = self._get_cache_path(rel_path)
try:
if source_file != cache_file:
# FIXME? Should this be a `move`?
shutil.copy2(source_file, cache_file)
self._fix_permissions(cache_file)
except OSError:
log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file)
else:
source_file = self._get_cache_path(rel_path)
self._push_to_os(rel_path, source_file)
else:
raise ObjectNotFound('objectstore.update_from_file, object does not exist: %s, kwargs: %s' % (str(obj), str(kwargs)))
[docs] def get_object_url(self, obj, **kwargs):
if self.exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
try:
url = self.service.make_blob_url(container_name=self.container_name, blob_name=rel_path)
return url
except AzureHttpError:
log.exception("Trouble generating URL for dataset '%s'", rel_path)
return None
##################
# Secret Methods #
##################
def __cache_monitor(self):
time.sleep(2) # Wait for things to load before starting the monitor
while self.running:
total_size = 0
# Is this going to be too expensive of an operation to be done frequently?
file_list = []
for dirpath, _, filenames in os.walk(self.staging_path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
file_size = os.path.getsize(filepath)
total_size += file_size
# Get the time given file was last accessed
last_access_time = time.localtime(os.stat(filepath)[7])
# Compose a tuple of the access time and the file path
file_tuple = last_access_time, filepath, file_size
file_list.append(file_tuple)
# Sort the file list (based on access time)
file_list.sort()
# Initiate cleaning once within 10% of the defined cache size?
cache_limit = self.cache_size * 0.9
if total_size > cache_limit:
log.info("Initiating cache cleaning: current cache size: %s; clean until smaller than: %s",
convert_bytes(total_size), convert_bytes(cache_limit))
# How much to delete? If simply deleting up to the cache-10% limit,
# is likely to be deleting frequently and may run the risk of hitting
# the limit - maybe delete additional #%?
# For now, delete enough to leave at least 10% of the total cache free
delete_this_much = total_size - cache_limit
# Keep deleting datasets from file_list until deleted_amount does not
# exceed delete_this_much; start deleting from the front of the file list,
# which assumes the oldest files come first on the list.
deleted_amount = 0
for entry in enumerate(file_list):
if deleted_amount < delete_this_much:
deleted_amount += entry[2]
os.remove(entry[1])
# Debugging code for printing deleted files' stats
# folder, file_name = os.path.split(f[1])
# file_date = time.strftime("%m/%d/%y %H:%M:%S", f[0])
# log.debug("%s. %-25s %s, size %s (deleted %s/%s)" \
# % (i, file_name, convert_bytes(f[2]), file_date, \
# convert_bytes(deleted_amount), convert_bytes(delete_this_much)))
else:
log.debug("Cache cleaning done. Total space freed: %s", convert_bytes(deleted_amount))
self.sleeper.sleep(30) # Test cache size every 30 seconds?