Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.objectstore.rods

"""
Object Store plugin for the Integrated Rule-Oriented Data Store (iRODS)

The module is named rods to avoid conflicting with the PyRods module, irods
"""

import logging
import os
import time
from posixpath import (
    basename as path_basename,
    dirname as path_dirname,
    join as path_join
)

try:
    import irods
except ImportError:
    irods = None

from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound
)
from galaxy.util.path import safe_relpath
from ..objectstore import (
    DiskObjectStore,
    local_extra_dirs
)

IRODS_IMPORT_MESSAGE = ('The Python irods package is required to use this '
                        'feature, please install it')

log = logging.getLogger(__name__)


[docs]class IRODSObjectStore(DiskObjectStore): """ Galaxy object store based on iRODS """
[docs] def __init__(self, config, file_path=None, extra_dirs=None): super(IRODSObjectStore, self).__init__(config, file_path=file_path, extra_dirs=extra_dirs) assert irods is not None, IRODS_IMPORT_MESSAGE self.cache_path = config.object_store_cache_path self.default_resource = config.irods_default_resource or None # Connect to iRODS (AssertionErrors will be raised if anything goes wrong) self.rods_env, self.rods_conn = rods_connect() # if the root collection path in the config is unset or relative, try to use a sensible default if config.irods_root_collection_path is None or (config.irods_root_collection_path is not None and not config.irods_root_collection_path.startswith('/')): rods_home = self.rods_env.rodsHome assert rods_home != '', "Unable to initialize iRODS Object Store: rodsHome cannot be determined and irods_root_collection_path in Galaxy config is unset or not absolute." if config.irods_root_collection_path is None: self.root_collection_path = path_join(rods_home, 'galaxy_data') else: self.root_collection_path = path_join(rods_home, config.irods_root_collection_path) else: self.root_collection_path = config.irods_root_collection_path # will return a collection object regardless of whether it exists self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path) if self.root_collection.getId() == -1: log.warning("iRODS root collection does not exist, will attempt to create: %s", self.root_collection_path) self.root_collection.upCollection() assert self.root_collection.createCollection(os.path.basename(self.root_collection_path)) == 0, "iRODS root collection creation failed: %s" % self.root_collection_path self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path) assert self.root_collection.getId() != -1, "iRODS root collection creation claimed success but still does not exist" if self.default_resource is None: self.default_resource = self.rods_env.rodsDefResource log.info("iRODS data for this instance will be stored in collection: %s, resource: %s", self.root_collection_path, self.default_resource)
def __get_rods_path(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, strip_dat=True, **kwargs): # extra_dir should never be constructed from provided data but just # make sure there are no shenannigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning('extra_dir is not normalized: %s', extra_dir) raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning('alt_name would locate path outside dir: %s', alt_name) raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but iRODS will # not follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) path = "" if extra_dir is not None: path = extra_dir # extra_dir_at_root is ignored - since the iRODS plugin does not use # the directory hash, there is only one level of subdirectory. if not dir_only: # the .dat extension is stripped when stored in iRODS # TODO: is the strip_dat kwarg the best way to implement this? if strip_dat and alt_name and alt_name.endswith('.dat'): alt_name = os.path.splitext(alt_name)[0] default_name = 'dataset_%s' % obj.id if not strip_dat: default_name += '.dat' path = path_join(path, alt_name if alt_name else default_name) path = path_join(self.root_collection_path, path) return path def __get_cache_path(self, obj, **kwargs): # FIXME: does not handle collections # FIXME: collisions could occur here return os.path.join(self.cache_path, path_basename(self.__get_rods_path(obj, strip_dat=False, **kwargs))) def __clean_cache_entry(self, obj, **kwargs): # FIXME: does not handle collections try: os.unlink(self.__get_cache_path(obj, **kwargs)) except OSError: # it is expected that we'll call this method a lot regardless of # whether we think the cached file exists pass def __get_rods_handle(self, obj, mode='r', **kwargs): if kwargs.get('dir_only', False): return irods.irodsCollection(self.rods_conn, self.__get_rods_path(obj, **kwargs)) else: return irods.irodsOpen(self.rods_conn, self.__get_rods_path(obj, **kwargs), mode) def __mkcolls(self, rods_path): """ An os.makedirs() for iRODS collections. `rods_path` is the desired collection to create. """ assert rods_path.startswith(self.root_collection_path + '/'), '__mkcolls(): Creating collections outside the root collection is not allowed (requested path was: %s)' % rods_path mkcolls = [] c = irods.irodsCollection(self.rods_conn, rods_path) while c.getId() == -1: assert c.getCollName().startswith(self.root_collection_path + '/'), '__mkcolls(): Attempted to move above the root collection: %s' % c.getCollName() mkcolls.append(c.getCollName()) c.upCollection() for collname in reversed(mkcolls): log.debug('Creating collection %s' % collname) ci = irods.collInp_t() ci.collName = collname status = irods.rcCollCreate(self.rods_conn, ci) assert status == 0, '__mkcolls(): Failed to create collection: %s' % collname @local_extra_dirs def exists(self, obj, **kwargs): doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path(obj, **kwargs) log.debug('exists(): checking: %s', doi.objPath) return irods.rcObjStat(self.rods_conn, doi) is not None @local_extra_dirs def create(self, obj, **kwargs): if not self.exists(obj, **kwargs): rods_path = self.__get_rods_path(obj, **kwargs) log.debug('create(): %s', rods_path) dir_only = kwargs.get('dir_only', False) # short circuit collection creation since most of the time it will # be the root collection which already exists collection_path = rods_path if dir_only else path_dirname(rods_path) if collection_path != self.root_collection_path: self.__mkcolls(collection_path) if not dir_only: # rcDataObjCreate is used instead of the irodsOpen wrapper so # that we can prevent overwriting doi = irods.dataObjInp_t() doi.objPath = rods_path doi.createMode = 0o640 doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource) status = irods.rcDataObjCreate(self.rods_conn, doi) assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % (rods_path, status, irods.strerror(status)) @local_extra_dirs def empty(self, obj, **kwargs): assert 'dir_only' not in kwargs, 'empty(): `dir_only` parameter is invalid here' h = self.__get_rods_handle(obj, **kwargs) try: return h.getSize() == 0 except AttributeError: # h is None raise ObjectNotFound()
[docs] def size(self, obj, **kwargs): assert 'dir_only' not in kwargs, 'size(): `dir_only` parameter is invalid here' h = self.__get_rods_handle(obj, **kwargs) try: return h.getSize() except AttributeError: # h is None return 0
@local_extra_dirs def delete(self, obj, entire_dir=False, **kwargs): assert 'dir_only' not in kwargs, 'delete(): `dir_only` parameter is invalid here' rods_path = self.__get_rods_path(obj, **kwargs) # __get_rods_path prepends self.root_collection_path but we are going # to ensure that it's valid anyway for safety's sake assert rods_path.startswith(self.root_collection_path + '/'), 'ERROR: attempt to delete object outside root collection (path was: %s)' % rods_path if entire_dir: # TODO raise NotImplementedError() h = self.__get_rods_handle(obj, **kwargs) try: # note: PyRods' irodsFile.delete() does not set force status = h.delete() assert status == 0, '%d: %s' % (status, irods.strerror(status)) return True except AttributeError: log.warning('delete(): operation failed: object does not exist: %s', rods_path) except AssertionError as e: # delete() does not raise on deletion failure log.error('delete(): operation failed: %s', e) finally: # remove the cached entry (finally is executed even when the try # contains a return) self.__clean_cache_entry(self, obj, **kwargs) return False @local_extra_dirs def get_data(self, obj, start=0, count=-1, **kwargs): log.debug('get_data(): %s') h = self.__get_rods_handle(obj, **kwargs) try: h.seek(start) except AttributeError: raise ObjectNotFound() if count == -1: return h.read() else: return h.read(count) # TODO: make sure implicit close is okay, DiskObjectStore actually # reads data into a var, closes, and returns the var @local_extra_dirs def get_filename(self, obj, **kwargs): log.debug("get_filename(): called on %s %s. For better performance, avoid this method and use get_data() instead.", obj.__class__.__name__, obj.id) cached_path = self.__get_cache_path(obj, **kwargs) if not self.exists(obj, **kwargs): raise ObjectNotFound() # TODO: implement or define whether dir_only is valid if 'dir_only' in kwargs: raise NotImplementedError() # cache hit if os.path.exists(cached_path): return os.path.abspath(cached_path) # cache miss # TODO: thread this incoming_path = os.path.join(os.path.dirname(cached_path), "__incoming_%s" % os.path.basename(cached_path)) doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path(obj, **kwargs) doi.dataSize = 0 # TODO: does this affect performance? should we get size? doi.numThreads = 0 # TODO: might want to VERIFY_CHKSUM_KW log.debug('get_filename(): caching %s to %s', doi.objPath, incoming_path) # do the iget status = irods.rcDataObjGet(self.rods_conn, doi, incoming_path) # if incoming already exists, we'll wait for another process or thread # to finish caching if status != irods.OVERWRITE_WITHOUT_FORCE_FLAG: assert status == 0, 'get_filename(): iget %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status)) # POSIX rename is atomic # TODO: rename without clobbering os.rename(incoming_path, cached_path) log.debug('get_filename(): cached %s to %s', doi.objPath, cached_path) # another process or thread is caching, wait for it while not os.path.exists(cached_path): # TODO: force restart after mod time > some configurable, or # otherwise deal with this potential deadlock and interrupted # transfers time.sleep(5) log.debug("get_filename(): waiting on incoming '%s' for %s %s", incoming_path, obj.__class__.__name__, obj.id) return os.path.abspath(cached_path) @local_extra_dirs def update_from_file(self, obj, file_name=None, create=False, **kwargs): assert 'dir_only' not in kwargs, 'update_from_file(): `dir_only` parameter is invalid here' # do not create if not requested if create and not self.exists(obj, **kwargs): raise ObjectNotFound() if file_name is None: file_name = self.__get_cache_path(obj, **kwargs) # put will create if necessary doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path(obj, **kwargs) doi.createMode = 0o640 doi.dataSize = os.stat(file_name).st_size doi.numThreads = 0 irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource) irods.addKeyVal(doi.condInput, irods.FORCE_FLAG_KW, '') # TODO: might want to VERIFY_CHKSUM_KW log.debug('update_from_file(): updating %s to %s', file_name, doi.objPath) # do the iput status = irods.rcDataObjPut(self.rods_conn, doi, file_name) assert status == 0, 'update_from_file(): iput %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status))
[docs] def get_object_url(self, obj, **kwargs): return None
[docs] def get_store_usage_percent(self): return 0.0
# monkeypatch an strerror method into the irods module def _rods_strerror(errno): """ The missing `strerror` for iRODS error codes """ if not hasattr(irods, '__rods_strerror_map'): irods.__rods_strerror_map = {} for name in dir(irods): v = getattr(irods, name) if type(v) == int and v < 0: irods.__rods_strerror_map[v] = name return irods.__rods_strerror_map.get(errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND') if irods is not None: irods.strerror = _rods_strerror
[docs]def rods_connect(): """ A basic iRODS connection mechanism that connects using the current iRODS environment """ status, env = irods.getRodsEnv() assert status == 0, 'connect(): getRodsEnv() failed (%s): %s' % (status, irods.strerror(status)) conn, err = irods.rcConnect(env.rodsHost, env.rodsPort, env.rodsUserName, env.rodsZone) assert err.status == 0, 'connect(): rcConnect() failed (%s): %s' % (err.status, err.msg) status, pw = irods.obfGetPw() assert status == 0, 'connect(): getting password with obfGetPw() failed (%s): %s' % (status, irods.strerror(status)) status = irods.clientLoginWithObfPassword(conn, pw) assert status == 0, 'connect(): logging in with clientLoginWithObfPassword() failed (%s): %s' % (status, irods.strerror(status)) return env, conn