Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.objectstore.rods
"""
Object Store plugin for the Integrated Rule-Oriented Data Store (iRODS)
The module is named rods to avoid conflicting with the PyRods module, irods
"""
import logging
import os
import time
from posixpath import (
    basename as path_basename,
    dirname as path_dirname,
    join as path_join
)
try:
    import irods
except ImportError:
    irods = None
from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound
)
from galaxy.util.path import safe_relpath
from ..objectstore import (
    DiskObjectStore,
    local_extra_dirs
)
IRODS_IMPORT_MESSAGE = ('The Python irods package is required to use this '
                        'feature, please install it')
log = logging.getLogger(__name__)
[docs]class IRODSObjectStore(DiskObjectStore):
    """
    Galaxy object store based on iRODS
    """
[docs]    def __init__(self, config, file_path=None, extra_dirs=None):
        super(IRODSObjectStore, self).__init__(config, file_path=file_path, extra_dirs=extra_dirs)
        assert irods is not None, IRODS_IMPORT_MESSAGE
        self.cache_path = config.object_store_cache_path
        self.default_resource = config.irods_default_resource or None
        # Connect to iRODS (AssertionErrors will be raised if anything goes wrong)
        self.rods_env, self.rods_conn = rods_connect()
        # if the root collection path in the config is unset or relative, try to use a sensible default
        if config.irods_root_collection_path is None or (config.irods_root_collection_path is not None and not config.irods_root_collection_path.startswith('/')):
            rods_home = self.rods_env.rodsHome
            assert rods_home != '', "Unable to initialize iRODS Object Store: rodsHome cannot be determined and irods_root_collection_path in Galaxy config is unset or not absolute."
            if config.irods_root_collection_path is None:
                self.root_collection_path = path_join(rods_home, 'galaxy_data')
            else:
                self.root_collection_path = path_join(rods_home, config.irods_root_collection_path)
        else:
            self.root_collection_path = config.irods_root_collection_path
        # will return a collection object regardless of whether it exists
        self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path)
        if self.root_collection.getId() == -1:
            log.warning("iRODS root collection does not exist, will attempt to create: %s", self.root_collection_path)
            self.root_collection.upCollection()
            assert self.root_collection.createCollection(os.path.basename(self.root_collection_path)) == 0, "iRODS root collection creation failed: %s" % self.root_collection_path
            self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path)
            assert self.root_collection.getId() != -1, "iRODS root collection creation claimed success but still does not exist"
        if self.default_resource is None:
            self.default_resource = self.rods_env.rodsDefResource
        log.info("iRODS data for this instance will be stored in collection: %s, resource: %s", self.root_collection_path, self.default_resource)
    def __get_rods_path(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, strip_dat=True, **kwargs):
        # extra_dir should never be constructed from provided data but just
        # make sure there are no shenannigans afoot
        if extra_dir and extra_dir != os.path.normpath(extra_dir):
            log.warning('extra_dir is not normalized: %s', extra_dir)
            raise ObjectInvalid("The requested object is invalid")
        # ensure that any parent directory references in alt_name would not
        # result in a path not contained in the directory path constructed here
        if alt_name:
            if not safe_relpath(alt_name):
                log.warning('alt_name would locate path outside dir: %s', alt_name)
                raise ObjectInvalid("The requested object is invalid")
            # alt_name can contain parent directory references, but iRODS will
            # not follow them, so if they are valid we normalize them out
            alt_name = os.path.normpath(alt_name)
        path = ""
        if extra_dir is not None:
            path = extra_dir
        # extra_dir_at_root is ignored - since the iRODS plugin does not use
        # the directory hash, there is only one level of subdirectory.
        if not dir_only:
            # the .dat extension is stripped when stored in iRODS
            # TODO: is the strip_dat kwarg the best way to implement this?
            if strip_dat and alt_name and alt_name.endswith('.dat'):
                alt_name = os.path.splitext(alt_name)[0]
            default_name = 'dataset_%s' % obj.id
            if not strip_dat:
                default_name += '.dat'
            path = path_join(path, alt_name if alt_name else default_name)
        path = path_join(self.root_collection_path, path)
        return path
    def __get_cache_path(self, obj, **kwargs):
        # FIXME: does not handle collections
        # FIXME: collisions could occur here
        return os.path.join(self.cache_path, path_basename(self.__get_rods_path(obj, strip_dat=False, **kwargs)))
    def __clean_cache_entry(self, obj, **kwargs):
        # FIXME: does not handle collections
        try:
            os.unlink(self.__get_cache_path(obj, **kwargs))
        except OSError:
            # it is expected that we'll call this method a lot regardless of
            # whether we think the cached file exists
            pass
    def __get_rods_handle(self, obj, mode='r', **kwargs):
        if kwargs.get('dir_only', False):
            return irods.irodsCollection(self.rods_conn, self.__get_rods_path(obj, **kwargs))
        else:
            return irods.irodsOpen(self.rods_conn, self.__get_rods_path(obj, **kwargs), mode)
    def __mkcolls(self, rods_path):
        """
        An os.makedirs() for iRODS collections.  `rods_path` is the desired collection to create.
        """
        assert rods_path.startswith(self.root_collection_path + '/'), '__mkcolls(): Creating collections outside the root collection is not allowed (requested path was: %s)' % rods_path
        mkcolls = []
        c = irods.irodsCollection(self.rods_conn, rods_path)
        while c.getId() == -1:
            assert c.getCollName().startswith(self.root_collection_path + '/'), '__mkcolls(): Attempted to move above the root collection: %s' % c.getCollName()
            mkcolls.append(c.getCollName())
            c.upCollection()
        for collname in reversed(mkcolls):
            log.debug('Creating collection %s' % collname)
            ci = irods.collInp_t()
            ci.collName = collname
            status = irods.rcCollCreate(self.rods_conn, ci)
            assert status == 0, '__mkcolls(): Failed to create collection: %s' % collname
    @local_extra_dirs
    def exists(self, obj, **kwargs):
        doi = irods.dataObjInp_t()
        doi.objPath = self.__get_rods_path(obj, **kwargs)
        log.debug('exists(): checking: %s', doi.objPath)
        return irods.rcObjStat(self.rods_conn, doi) is not None
    @local_extra_dirs
    def create(self, obj, **kwargs):
        if not self.exists(obj, **kwargs):
            rods_path = self.__get_rods_path(obj, **kwargs)
            log.debug('create(): %s', rods_path)
            dir_only = kwargs.get('dir_only', False)
            # short circuit collection creation since most of the time it will
            # be the root collection which already exists
            collection_path = rods_path if dir_only else path_dirname(rods_path)
            if collection_path != self.root_collection_path:
                self.__mkcolls(collection_path)
            if not dir_only:
                # rcDataObjCreate is used instead of the irodsOpen wrapper so
                # that we can prevent overwriting
                doi = irods.dataObjInp_t()
                doi.objPath = rods_path
                doi.createMode = 0o640
                doi.dataSize = 0  # 0 actually means "unknown", although literally 0 would be preferable
                irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource)
                status = irods.rcDataObjCreate(self.rods_conn, doi)
                assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % (rods_path, status, irods.strerror(status))
    @local_extra_dirs
    def empty(self, obj, **kwargs):
        assert 'dir_only' not in kwargs, 'empty(): `dir_only` parameter is invalid here'
        h = self.__get_rods_handle(obj, **kwargs)
        try:
            return h.getSize() == 0
        except AttributeError:
            # h is None
            raise ObjectNotFound()
[docs]    def size(self, obj, **kwargs):
        assert 'dir_only' not in kwargs, 'size(): `dir_only` parameter is invalid here'
        h = self.__get_rods_handle(obj, **kwargs)
        try:
            return h.getSize()
        except AttributeError:
            # h is None
            return 0
    @local_extra_dirs
    def delete(self, obj, entire_dir=False, **kwargs):
        assert 'dir_only' not in kwargs, 'delete(): `dir_only` parameter is invalid here'
        rods_path = self.__get_rods_path(obj, **kwargs)
        # __get_rods_path prepends self.root_collection_path but we are going
        # to ensure that it's valid anyway for safety's sake
        assert rods_path.startswith(self.root_collection_path + '/'), 'ERROR: attempt to delete object outside root collection (path was: %s)' % rods_path
        if entire_dir:
            # TODO
            raise NotImplementedError()
        h = self.__get_rods_handle(obj, **kwargs)
        try:
            # note: PyRods' irodsFile.delete() does not set force
            status = h.delete()
            assert status == 0, '%d: %s' % (status, irods.strerror(status))
            return True
        except AttributeError:
            log.warning('delete(): operation failed: object does not exist: %s', rods_path)
        except AssertionError as e:
            # delete() does not raise on deletion failure
            log.error('delete(): operation failed: %s', e)
        finally:
            # remove the cached entry (finally is executed even when the try
            # contains a return)
            self.__clean_cache_entry(self, obj, **kwargs)
        return False
    @local_extra_dirs
    def get_data(self, obj, start=0, count=-1, **kwargs):
        log.debug('get_data(): %s')
        h = self.__get_rods_handle(obj, **kwargs)
        try:
            h.seek(start)
        except AttributeError:
            raise ObjectNotFound()
        if count == -1:
            return h.read()
        else:
            return h.read(count)
        # TODO: make sure implicit close is okay, DiskObjectStore actually
        # reads data into a var, closes, and returns the var
    @local_extra_dirs
    def get_filename(self, obj, **kwargs):
        log.debug("get_filename(): called on %s %s. For better performance, avoid this method and use get_data() instead.", obj.__class__.__name__, obj.id)
        cached_path = self.__get_cache_path(obj, **kwargs)
        if not self.exists(obj, **kwargs):
            raise ObjectNotFound()
        # TODO: implement or define whether dir_only is valid
        if 'dir_only' in kwargs:
            raise NotImplementedError()
        # cache hit
        if os.path.exists(cached_path):
            return os.path.abspath(cached_path)
        # cache miss
        # TODO: thread this
        incoming_path = os.path.join(os.path.dirname(cached_path), "__incoming_%s" % os.path.basename(cached_path))
        doi = irods.dataObjInp_t()
        doi.objPath = self.__get_rods_path(obj, **kwargs)
        doi.dataSize = 0  # TODO: does this affect performance? should we get size?
        doi.numThreads = 0
        # TODO: might want to VERIFY_CHKSUM_KW
        log.debug('get_filename(): caching %s to %s', doi.objPath, incoming_path)
        # do the iget
        status = irods.rcDataObjGet(self.rods_conn, doi, incoming_path)
        # if incoming already exists, we'll wait for another process or thread
        # to finish caching
        if status != irods.OVERWRITE_WITHOUT_FORCE_FLAG:
            assert status == 0, 'get_filename(): iget %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status))
            # POSIX rename is atomic
            # TODO: rename without clobbering
            os.rename(incoming_path, cached_path)
            log.debug('get_filename(): cached %s to %s', doi.objPath, cached_path)
        # another process or thread is caching, wait for it
        while not os.path.exists(cached_path):
            # TODO: force restart after mod time > some configurable, or
            # otherwise deal with this potential deadlock and interrupted
            # transfers
            time.sleep(5)
            log.debug("get_filename(): waiting on incoming '%s' for %s %s", incoming_path, obj.__class__.__name__, obj.id)
        return os.path.abspath(cached_path)
    @local_extra_dirs
    def update_from_file(self, obj, file_name=None, create=False, **kwargs):
        assert 'dir_only' not in kwargs, 'update_from_file(): `dir_only` parameter is invalid here'
        # do not create if not requested
        if create and not self.exists(obj, **kwargs):
            raise ObjectNotFound()
        if file_name is None:
            file_name = self.__get_cache_path(obj, **kwargs)
        # put will create if necessary
        doi = irods.dataObjInp_t()
        doi.objPath = self.__get_rods_path(obj, **kwargs)
        doi.createMode = 0o640
        doi.dataSize = os.stat(file_name).st_size
        doi.numThreads = 0
        irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource)
        irods.addKeyVal(doi.condInput, irods.FORCE_FLAG_KW, '')
        # TODO: might want to VERIFY_CHKSUM_KW
        log.debug('update_from_file(): updating %s to %s', file_name, doi.objPath)
        # do the iput
        status = irods.rcDataObjPut(self.rods_conn, doi, file_name)
        assert status == 0, 'update_from_file(): iput %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status))
# monkeypatch an strerror method into the irods module
def _rods_strerror(errno):
    """
    The missing `strerror` for iRODS error codes
    """
    if not hasattr(irods, '__rods_strerror_map'):
        irods.__rods_strerror_map = {}
        for name in dir(irods):
            v = getattr(irods, name)
            if type(v) == int and v < 0:
                irods.__rods_strerror_map[v] = name
    return irods.__rods_strerror_map.get(errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND')
if irods is not None:
    irods.strerror = _rods_strerror
[docs]def rods_connect():
    """
    A basic iRODS connection mechanism that connects using the current iRODS
    environment
    """
    status, env = irods.getRodsEnv()
    assert status == 0, 'connect(): getRodsEnv() failed (%s): %s' % (status, irods.strerror(status))
    conn, err = irods.rcConnect(env.rodsHost,
                                env.rodsPort,
                                env.rodsUserName,
                                env.rodsZone)
    assert err.status == 0, 'connect(): rcConnect() failed (%s): %s' % (err.status, err.msg)
    status, pw = irods.obfGetPw()
    assert status == 0, 'connect(): getting password with obfGetPw() failed (%s): %s' % (status, irods.strerror(status))
    status = irods.clientLoginWithObfPassword(conn, pw)
    assert status == 0, 'connect(): logging in with clientLoginWithObfPassword() failed (%s): %s' % (status, irods.strerror(status))
    return env, conn