Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.objectstore.irods

"""
Object Store plugin for the Integrated Rule-Oriented Data Store (iRODS)
"""
import logging
import os
import shutil
from contextlib import contextmanager
from datetime import datetime
from functools import partial
try:
    from pathlib import Path
except ImportError:
    # Use backport on python 2
    from pathlib2 import Path

try:
    import irods
    import irods.keywords as kw
    from irods.exception import CollectionDoesNotExist
    from irods.exception import DataObjectDoesNotExist
    from irods.exception import NetworkException
    from irods.session import iRODSSession
except ImportError:
    irods = None

from galaxy.exceptions import ObjectInvalid, ObjectNotFound
from galaxy.util import directory_hash_id, ExecutionTimer, umask_fix_perms
from galaxy.util.path import safe_relpath
from ..objectstore import DiskObjectStore

IRODS_IMPORT_MESSAGE = ('The Python irods package is required to use this feature, please install it')
# 1 MB
CHUNK_SIZE = 2**20
log = logging.getLogger(__name__)


def _config_xml_error(tag):
    msg = 'No {tag} element in config XML tree'.format(tag=tag)
    raise Exception(msg)


def _config_dict_error(key):
    msg = 'No {key} key in config dictionary'.forma(key=key)
    raise Exception(msg)


[docs]def parse_config_xml(config_xml): try: a_xml = config_xml.findall('auth') if not a_xml: _config_xml_error('auth') username = a_xml[0].get('username') password = a_xml[0].get('password') r_xml = config_xml.findall('resource') if not r_xml: _config_xml_error('resource') resource_name = r_xml[0].get('name') z_xml = config_xml.findall('zone') if not z_xml: _config_xml_error('zone') zone_name = z_xml[0].get('name') c_xml = config_xml.findall('connection') if not c_xml: _config_xml_error('connection') host = c_xml[0].get('host', None) port = int(c_xml[0].get('port', 0)) timeout = int(c_xml[0].get('timeout', 30)) poolsize = int(c_xml[0].get('poolsize', 3)) c_xml = config_xml.findall('cache') if not c_xml: _config_xml_error('cache') cache_size = float(c_xml[0].get('size', -1)) staging_path = c_xml[0].get('path', None) attrs = ('type', 'path') e_xml = config_xml.findall('extra_dir') if not e_xml: _config_xml_error('extra_dir') extra_dirs = [{k: e.get(k) for k in attrs} for e in e_xml] return { 'auth': { 'username': username, 'password': password, }, 'resource': { 'name': resource_name, }, 'zone': { 'name': zone_name, }, 'connection': { 'host': host, 'port': port, 'timeout': timeout, 'poolsize': poolsize }, 'cache': { 'size': cache_size, 'path': staging_path, }, 'extra_dirs': extra_dirs, } except Exception: # Toss it back up after logging, we can't continue loading at this point. log.exception("Malformed iRODS ObjectStore Configuration XML -- unable to continue.") raise
[docs]def acquire_session(host='localhost', port='1247', user='rods', password='rods', zone='tempZone', timeout='30'): session = iRODSSession(host=host, port=port, user=user, password=password, zone=zone) # Set connection timeout session.connection_timeout = timeout return session
[docs]def release_session(session): # This call will cleanup all the connections in the connection pool # OSError sometimes happens on GitHub Actions, after the test has successfully completed. Ignore it if it happens. try: session.cleanup() except OSError: pass
[docs]@contextmanager def managed_session(host='localhost', port='1247', user='rods', password='rods', zone='tempZone', timeout='30'): session = acquire_session(host=host, port=port, user=user, password=password, zone=zone, timeout=timeout) try: yield session finally: release_session(session)
[docs]class CloudConfigMixin(object): def _config_to_dict(self): return { 'auth': { 'username': self.username, 'password': self.password, }, 'resource': { 'name': self.resource, }, 'zone': { 'name': self.zone, }, 'connection': { 'host': self.host, 'port': self.port, 'timeout': self.timeout, 'poolsize': self.poolsize, }, 'cache': { 'size': self.cache_size, 'path': self.staging_path, } }
[docs]class IRODSObjectStore(DiskObjectStore, CloudConfigMixin): """ Object store that stores objects as data objects in an iRODS collections. A local cache exists that is used as an intermediate location for files between Galaxy and iRODS. """ store_type = 'irods'
[docs] def __init__(self, config, config_dict): reload_timer = ExecutionTimer() super().__init__(config, config_dict) auth_dict = config_dict.get('auth') if auth_dict is None: _config_dict_error('auth') self.username = auth_dict.get('username') if self.username is None: _config_dict_error('auth->username') self.password = auth_dict.get('password') if self.password is None: _config_dict_error('auth->password') resource_dict = config_dict['resource'] if resource_dict is None: _config_dict_error('resource') self.resource = resource_dict.get('name') if self.resource is None: _config_dict_error('resource->name') zone_dict = config_dict['zone'] if zone_dict is None: _config_dict_error('zone') self.zone = zone_dict.get('name') if self.zone is None: _config_dict_error('zone->name') connection_dict = config_dict['connection'] if connection_dict is None: _config_dict_error('connection') self.host = connection_dict.get('host') if self.host is None: _config_dict_error('connection->host') self.port = connection_dict.get('port') if self.port is None: _config_dict_error('connection->port') self.timeout = connection_dict.get('timeout') if self.timeout is None: _config_dict_error('connection->timeout') self.poolsize = connection_dict.get('poolsize') if self.poolsize is None: _config_dict_error('connection->poolsize') cache_dict = config_dict['cache'] if cache_dict is None: _config_dict_error('cache') self.cache_size = cache_dict.get('size', -1) if self.cache_size is None: _config_dict_error('cache->size') self.staging_path = cache_dict.get('path') or self.config.object_store_cache_path if self.staging_path is None: _config_dict_error('cache->path') extra_dirs = {e['type']: e['path'] for e in config_dict.get('extra_dirs', [])} if not extra_dirs: _config_dict_error('extra_dirs') self.extra_dirs.update(extra_dirs) if irods is None: raise Exception(IRODS_IMPORT_MESSAGE) self.home = "/" + self.zone + "/home/" + self.username log.debug("irods __init__ %s", reload_timer)
[docs] @classmethod def parse_xml(cls, config_xml): return parse_config_xml(config_xml)
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update(self._config_to_dict()) return as_dict
def _fix_permissions(self, rel_path): """ Set permissions on rel_path""" for basedir, _, files in os.walk(rel_path): umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid) for filename in files: path = os.path.join(basedir, filename) # Ignore symlinks if os.path.islink(path): continue umask_fix_perms(path, self.config.umask, 0o666, self.config.gid) def _construct_path(self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, **kwargs): # extra_dir should never be constructed from provided data but just # make sure there are no shenannigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning('extra_dir is not normalized: %s', extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning('alt_name would locate path outside dir: %s', alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but S3 will not # follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # for JOB_WORK directory if obj_dir: rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) if base_dir: base = self.extra_dirs.get(base_dir) return os.path.join(base, rel_path) if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % self._get_object_id(obj)) return rel_path def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) # rel_path is file or folder? def _get_size_in_irods(self, rel_path): with managed_session(host=self.host, port=self.port, user=self.username, password=self.password, zone=self.zone, timeout=self.timeout) as session: p = Path(rel_path) data_object_name = p.stem + p.suffix subcollection_name = p.parent collection_path = self.home + "/" + str(subcollection_name) data_object_path = collection_path + "/" + str(data_object_name) try: data_obj = session.data_objects.get(data_object_path) return data_obj.__sizeof__() except (DataObjectDoesNotExist, CollectionDoesNotExist): log.warn("Collection or data object (%s) does not exist", data_object_path) return -1 except NetworkException as e: log.exception(e) return -1 # rel_path is file or folder? def _data_object_exists(self, rel_path): with managed_session(host=self.host, port=self.port, user=self.username, password=self.password, zone=self.zone, timeout=self.timeout) as session: p = Path(rel_path) data_object_name = p.stem + p.suffix subcollection_name = p.parent collection_path = self.home + "/" + str(subcollection_name) data_object_path = collection_path + "/" + str(data_object_name) try: session.data_objects.get(data_object_path) return True except (DataObjectDoesNotExist, CollectionDoesNotExist): log.debug("Collection or data object (%s) does not exist", data_object_path) return False except NetworkException as e: log.exception(e) return False def _in_cache(self, rel_path): """ Check if the given dataset is in the local cache and return True if so. """ cache_path = self._get_cache_path(rel_path) return os.path.exists(cache_path) def _pull_into_cache(self, rel_path): # Ensure the cache directory structure exists (e.g., dataset_#_files/) rel_path_dir = os.path.dirname(rel_path) if not os.path.exists(self._get_cache_path(rel_path_dir)): os.makedirs(self._get_cache_path(rel_path_dir)) # Now pull in the file file_ok = self._download(rel_path) self._fix_permissions(self._get_cache_path(rel_path_dir)) return file_ok def _download(self, rel_path): with managed_session(host=self.host, port=self.port, user=self.username, password=self.password, zone=self.zone, timeout=self.timeout) as session: log.debug("Pulling data object '%s' into cache to %s", rel_path, self._get_cache_path(rel_path)) p = Path(rel_path) data_object_name = p.stem + p.suffix subcollection_name = p.parent collection_path = self.home + "/" + str(subcollection_name) data_object_path = collection_path + "/" + str(data_object_name) data_obj = None try: data_obj = session.data_objects.get(data_object_path) except (DataObjectDoesNotExist, CollectionDoesNotExist): log.warn("Collection or data object (%s) does not exist", data_object_path) return False except NetworkException as e: log.exception(e) return False if self.cache_size > 0 and data_obj.__sizeof__() > self.cache_size: log.critical("File %s is larger (%s) than the cache size (%s). Cannot download.", rel_path, data_obj.__sizeof__(), self.cache_size) return False log.debug("Pulled data object '%s' into cache to %s", rel_path, self._get_cache_path(rel_path)) with data_obj.open('r') as data_obj_fp, open(self._get_cache_path(rel_path), "wb") as cache_fp: for chunk in iter(partial(data_obj_fp.read, CHUNK_SIZE), b''): cache_fp.write(chunk) return True def _push_to_irods(self, rel_path, source_file=None, from_string=None): """ Push the file pointed to by ``rel_path`` to the iRODS. Extract folder name from rel_path as iRODS collection name, and extract file name from rel_path as iRODS data object name. If ``source_file`` is provided, push that file instead while still using ``rel_path`` for collection and object store names. If ``from_string`` is provided, set contents of the file to the value of the string. """ with managed_session(host=self.host, port=self.port, user=self.username, password=self.password, zone=self.zone, timeout=self.timeout) as session: p = Path(rel_path) data_object_name = p.stem + p.suffix subcollection_name = p.parent source_file = source_file if source_file else self._get_cache_path(rel_path) options = {kw.FORCE_FLAG_KW: ''} if os.path.exists(source_file): # Check if the data object exists in iRODS collection_path = self.home + "/" + str(subcollection_name) data_object_path = collection_path + "/" + str(data_object_name) exists = False try: exists = session.data_objects.exists(data_object_path) if os.path.getsize(source_file) == 0 and exists: log.debug("Wanted to push file '%s' to iRODS collection '%s' but its size is 0; skipping.", source_file, rel_path) return True if from_string: data_obj = session.data_objects.create(data_object_path, self.resource, **options) with data_obj.open('w') as data_obj_fp: data_obj_fp.write(from_string) log.debug("Pushed data from string '%s' to collection '%s'", from_string, data_object_path) else: start_time = datetime.now() log.debug("Pushing cache file '%s' of size %s bytes to collection '%s'", source_file, os.path.getsize(source_file), rel_path) # Create sub-collection first session.collections.create(collection_path, recurse=True) data_obj = session.data_objects.create(data_object_path, self.resource, **options) # Write to file in subcollection created above with open(source_file, 'rb') as content_file, data_obj.open('w') as data_obj_fp: for chunk in iter(partial(content_file.read, CHUNK_SIZE), b''): data_obj_fp.write(chunk) end_time = datetime.now() log.debug("Pushed cache file '%s' to collection '%s' (%s bytes transfered in %s sec)", source_file, rel_path, os.path.getsize(source_file), end_time - start_time) return True except NetworkException as e: log.exception(e) return False log.error("Tried updating key '%s' from source file '%s', but source file does not exist.", rel_path, source_file) return False
[docs] def file_ready(self, obj, **kwargs): """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return ``True`` if so, ``False`` otherwise. """ rel_path = self._construct_path(obj, **kwargs) # Make sure the size in cache is available in its entirety if self._in_cache(rel_path): if os.path.getsize(self._get_cache_path(rel_path)) == self._get_size_in_irods(rel_path): return True log.debug("Waiting for dataset %s to transfer from OS: %s/%s", rel_path, os.path.getsize(self._get_cache_path(rel_path)), self._get_size_in_irods(rel_path)) return False
def _exists(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Check cache and irods if self._in_cache(rel_path) or self._data_object_exists(rel_path): return True # dir_only does not get synced so shortcut the decision dir_only = kwargs.get('dir_only', False) base_dir = kwargs.get('base_dir', None) if dir_only and base_dir: # for JOB_WORK directory if not os.path.exists(rel_path): os.makedirs(rel_path) return True return False def _create(self, obj, **kwargs): if not self._exists(obj, **kwargs): # Pull out locally used fields extra_dir = kwargs.get('extra_dir', None) extra_dir_at_root = kwargs.get('extra_dir_at_root', False) dir_only = kwargs.get('dir_only', False) alt_name = kwargs.get('alt_name', None) # Construct hashed path rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) # Optionally append extra_dir if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # Create given directory in cache cache_dir = os.path.join(self.staging_path, rel_path) if not os.path.exists(cache_dir): os.makedirs(cache_dir) if not dir_only: rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % self._get_object_id(obj)) open(os.path.join(self.staging_path, rel_path), 'w').close() self._push_to_irods(rel_path, from_string='') def _empty(self, obj, **kwargs): if self._exists(obj, **kwargs): return bool(self._size(obj, **kwargs) > 0) else: raise ObjectNotFound('objectstore.empty, object does not exist: %s, kwargs: %s' % (str(obj), str(kwargs))) def _size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) if self._in_cache(rel_path): try: return os.path.getsize(self._get_cache_path(rel_path)) except OSError as ex: log.info("Could not get size of file '%s' in local cache, will try S3. Error: %s", rel_path, ex) elif self._exists(obj, **kwargs): return self._get_size_in_irods(rel_path) log.warning("Did not find dataset '%s', returning 0 for size", rel_path) return 0 def _delete(self, obj, entire_dir=False, **kwargs): with managed_session(host=self.host, port=self.port, user=self.username, password=self.password, zone=self.zone, timeout=self.timeout) as session: rel_path = self._construct_path(obj, **kwargs) extra_dir = kwargs.get('extra_dir', None) base_dir = kwargs.get('base_dir', None) dir_only = kwargs.get('dir_only', False) obj_dir = kwargs.get('obj_dir', False) try: # Remove temparory data in JOB_WORK directory if base_dir and dir_only and obj_dir: shutil.rmtree(os.path.abspath(rel_path)) return True # For the case of extra_files, because we don't have a reference to # individual files we need to remove the entire directory structure # with all the files in it. This is easy for the local file system, # but requires iterating through each individual key in irods and deleing it. if entire_dir and extra_dir: shutil.rmtree(self._get_cache_path(rel_path)) col_path = self.home + "/" + str(rel_path) col = None try: col = session.collections.get(col_path) except CollectionDoesNotExist: log.warn("Collection (%s) does not exist!", col_path) return False except NetworkException as e: log.exception(e) return False cols = col.walk() # Traverse the tree only one level deep for _ in range(2): # get next result _, _, data_objects = next(cols) # Delete data objects for data_object in data_objects: data_object.unlink(force=True) return True else: # Delete from cache first os.unlink(self._get_cache_path(rel_path)) # Delete from irods as well p = Path(rel_path) data_object_name = p.stem + p.suffix subcollection_name = p.parent collection_path = self.home + "/" + str(subcollection_name) data_object_path = collection_path + "/" + str(data_object_name) try: data_obj = session.data_objects.get(data_object_path) # remove object data_obj.unlink(force=True) return True except (DataObjectDoesNotExist, CollectionDoesNotExist): log.info("Collection or data object (%s) does not exist", data_object_path) return True except NetworkException as e: log.exception(e) return False except OSError: log.exception('%s delete error', self._get_filename(obj, **kwargs)) except NetworkException as e: log.exception(e) return False def _get_data(self, obj, start=0, count=-1, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Check cache first and get file if not there if not self._in_cache(rel_path): self._pull_into_cache(rel_path) # Read the file content from cache data_file = open(self._get_cache_path(rel_path)) data_file.seek(start) content = data_file.read(count) data_file.close() return content def _get_filename(self, obj, **kwargs): base_dir = kwargs.get('base_dir', None) dir_only = kwargs.get('dir_only', False) obj_dir = kwargs.get('obj_dir', False) rel_path = self._construct_path(obj, **kwargs) # for JOB_WORK directory if base_dir and dir_only and obj_dir: return os.path.abspath(rel_path) cache_path = self._get_cache_path(rel_path) # iRODS does not recognize directories as files so cannot check if those exist. # So, if checking dir only, ensure given dir exists in cache and return # the expected cache path. # dir_only = kwargs.get('dir_only', False) # if dir_only: # if not os.path.exists(cache_path): # os.makedirs(cache_path) # return cache_path # Check if the file exists in the cache first if self._in_cache(rel_path): return cache_path # Check if the file exists in persistent storage and, if it does, pull it into cache elif self._exists(obj, **kwargs): if dir_only: # Directories do not get pulled into cache return cache_path else: if self._pull_into_cache(rel_path): return cache_path # For the case of retrieving a directory only, return the expected path # even if it does not exist. # if dir_only: # return cache_path raise ObjectNotFound('objectstore.get_filename, no cache_path: %s, kwargs: %s' % (str(obj), str(kwargs))) # return cache_path # Until the upload tool does not explicitly create the dataset, return expected path def _update_from_file(self, obj, file_name=None, create=False, **kwargs): if create: self._create(obj, **kwargs) if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Chose whether to use the dataset file itself or an alternate file if file_name: source_file = os.path.abspath(file_name) # Copy into cache cache_file = self._get_cache_path(rel_path) try: if source_file != cache_file: # FIXME? Should this be a `move`? shutil.copy2(source_file, cache_file) self._fix_permissions(cache_file) except OSError: log.exception("Trouble copying source file '%s' to cache '%s'", source_file, cache_file) else: source_file = self._get_cache_path(rel_path) # Update the file on iRODS self._push_to_irods(rel_path, source_file) else: raise ObjectNotFound('objectstore.update_from_file, object does not exist: %s, kwargs: %s' % (str(obj), str(kwargs))) # Unlike S3, url is not really applicable to iRODS def _get_object_url(self, obj, **kwargs): if self._exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) p = Path(rel_path) data_object_name = p.stem + p.suffix subcollection_name = p.parent collection_path = self.home + "/" + str(subcollection_name) data_object_path = collection_path + "/" + str(data_object_name) return data_object_path def _get_store_usage_percent(self): return 0.0