Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.objectstore.rods
"""
Object Store plugin for the Integrated Rule-Oriented Data Store (iRODS)
The module is named rods to avoid conflicting with the PyRods module, irods
"""
import logging
import os
import time
from posixpath import (
basename as path_basename,
dirname as path_dirname,
join as path_join
)
try:
import irods
except ImportError:
irods = None
from galaxy.exceptions import (
ObjectInvalid,
ObjectNotFound
)
from galaxy.util.path import safe_relpath
from ..objectstore import (
DiskObjectStore,
local_extra_dirs
)
IRODS_IMPORT_MESSAGE = ('The Python irods package is required to use this '
'feature, please install it')
log = logging.getLogger(__name__)
[docs]class IRODSObjectStore(DiskObjectStore):
"""
Galaxy object store based on iRODS
"""
[docs] def __init__(self, config, file_path=None, extra_dirs=None):
super(IRODSObjectStore, self).__init__(config, file_path=file_path, extra_dirs=extra_dirs)
assert irods is not None, IRODS_IMPORT_MESSAGE
self.cache_path = config.object_store_cache_path
self.default_resource = config.irods_default_resource or None
# Connect to iRODS (AssertionErrors will be raised if anything goes wrong)
self.rods_env, self.rods_conn = rods_connect()
# if the root collection path in the config is unset or relative, try to use a sensible default
if config.irods_root_collection_path is None or (config.irods_root_collection_path is not None and not config.irods_root_collection_path.startswith('/')):
rods_home = self.rods_env.rodsHome
assert rods_home != '', "Unable to initialize iRODS Object Store: rodsHome cannot be determined and irods_root_collection_path in Galaxy config is unset or not absolute."
if config.irods_root_collection_path is None:
self.root_collection_path = path_join(rods_home, 'galaxy_data')
else:
self.root_collection_path = path_join(rods_home, config.irods_root_collection_path)
else:
self.root_collection_path = config.irods_root_collection_path
# will return a collection object regardless of whether it exists
self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path)
if self.root_collection.getId() == -1:
log.warning("iRODS root collection does not exist, will attempt to create: %s", self.root_collection_path)
self.root_collection.upCollection()
assert self.root_collection.createCollection(os.path.basename(self.root_collection_path)) == 0, "iRODS root collection creation failed: %s" % self.root_collection_path
self.root_collection = irods.irodsCollection(self.rods_conn, self.root_collection_path)
assert self.root_collection.getId() != -1, "iRODS root collection creation claimed success but still does not exist"
if self.default_resource is None:
self.default_resource = self.rods_env.rodsDefResource
log.info("iRODS data for this instance will be stored in collection: %s, resource: %s", self.root_collection_path, self.default_resource)
def __get_rods_path(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, strip_dat=True, **kwargs):
# extra_dir should never be constructed from provided data but just
# make sure there are no shenannigans afoot
if extra_dir and extra_dir != os.path.normpath(extra_dir):
log.warning('extra_dir is not normalized: %s', extra_dir)
raise ObjectInvalid("The requested object is invalid")
# ensure that any parent directory references in alt_name would not
# result in a path not contained in the directory path constructed here
if alt_name:
if not safe_relpath(alt_name):
log.warning('alt_name would locate path outside dir: %s', alt_name)
raise ObjectInvalid("The requested object is invalid")
# alt_name can contain parent directory references, but iRODS will
# not follow them, so if they are valid we normalize them out
alt_name = os.path.normpath(alt_name)
path = ""
if extra_dir is not None:
path = extra_dir
# extra_dir_at_root is ignored - since the iRODS plugin does not use
# the directory hash, there is only one level of subdirectory.
if not dir_only:
# the .dat extension is stripped when stored in iRODS
# TODO: is the strip_dat kwarg the best way to implement this?
if strip_dat and alt_name and alt_name.endswith('.dat'):
alt_name = os.path.splitext(alt_name)[0]
default_name = 'dataset_%s' % obj.id
if not strip_dat:
default_name += '.dat'
path = path_join(path, alt_name if alt_name else default_name)
path = path_join(self.root_collection_path, path)
return path
def __get_cache_path(self, obj, **kwargs):
# FIXME: does not handle collections
# FIXME: collisions could occur here
return os.path.join(self.cache_path, path_basename(self.__get_rods_path(obj, strip_dat=False, **kwargs)))
def __clean_cache_entry(self, obj, **kwargs):
# FIXME: does not handle collections
try:
os.unlink(self.__get_cache_path(obj, **kwargs))
except OSError:
# it is expected that we'll call this method a lot regardless of
# whether we think the cached file exists
pass
def __get_rods_handle(self, obj, mode='r', **kwargs):
if kwargs.get('dir_only', False):
return irods.irodsCollection(self.rods_conn, self.__get_rods_path(obj, **kwargs))
else:
return irods.irodsOpen(self.rods_conn, self.__get_rods_path(obj, **kwargs), mode)
def __mkcolls(self, rods_path):
"""
An os.makedirs() for iRODS collections. `rods_path` is the desired collection to create.
"""
assert rods_path.startswith(self.root_collection_path + '/'), '__mkcolls(): Creating collections outside the root collection is not allowed (requested path was: %s)' % rods_path
mkcolls = []
c = irods.irodsCollection(self.rods_conn, rods_path)
while c.getId() == -1:
assert c.getCollName().startswith(self.root_collection_path + '/'), '__mkcolls(): Attempted to move above the root collection: %s' % c.getCollName()
mkcolls.append(c.getCollName())
c.upCollection()
for collname in reversed(mkcolls):
log.debug('Creating collection %s' % collname)
ci = irods.collInp_t()
ci.collName = collname
status = irods.rcCollCreate(self.rods_conn, ci)
assert status == 0, '__mkcolls(): Failed to create collection: %s' % collname
@local_extra_dirs
def exists(self, obj, **kwargs):
doi = irods.dataObjInp_t()
doi.objPath = self.__get_rods_path(obj, **kwargs)
log.debug('exists(): checking: %s', doi.objPath)
return irods.rcObjStat(self.rods_conn, doi) is not None
@local_extra_dirs
def create(self, obj, **kwargs):
if not self.exists(obj, **kwargs):
rods_path = self.__get_rods_path(obj, **kwargs)
log.debug('create(): %s', rods_path)
dir_only = kwargs.get('dir_only', False)
# short circuit collection creation since most of the time it will
# be the root collection which already exists
collection_path = rods_path if dir_only else path_dirname(rods_path)
if collection_path != self.root_collection_path:
self.__mkcolls(collection_path)
if not dir_only:
# rcDataObjCreate is used instead of the irodsOpen wrapper so
# that we can prevent overwriting
doi = irods.dataObjInp_t()
doi.objPath = rods_path
doi.createMode = 0o640
doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable
irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource)
status = irods.rcDataObjCreate(self.rods_conn, doi)
assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % (rods_path, status, irods.strerror(status))
@local_extra_dirs
def empty(self, obj, **kwargs):
assert 'dir_only' not in kwargs, 'empty(): `dir_only` parameter is invalid here'
h = self.__get_rods_handle(obj, **kwargs)
try:
return h.getSize() == 0
except AttributeError:
# h is None
raise ObjectNotFound()
[docs] def size(self, obj, **kwargs):
assert 'dir_only' not in kwargs, 'size(): `dir_only` parameter is invalid here'
h = self.__get_rods_handle(obj, **kwargs)
try:
return h.getSize()
except AttributeError:
# h is None
return 0
@local_extra_dirs
def delete(self, obj, entire_dir=False, **kwargs):
assert 'dir_only' not in kwargs, 'delete(): `dir_only` parameter is invalid here'
rods_path = self.__get_rods_path(obj, **kwargs)
# __get_rods_path prepends self.root_collection_path but we are going
# to ensure that it's valid anyway for safety's sake
assert rods_path.startswith(self.root_collection_path + '/'), 'ERROR: attempt to delete object outside root collection (path was: %s)' % rods_path
if entire_dir:
# TODO
raise NotImplementedError()
h = self.__get_rods_handle(obj, **kwargs)
try:
# note: PyRods' irodsFile.delete() does not set force
status = h.delete()
assert status == 0, '%d: %s' % (status, irods.strerror(status))
return True
except AttributeError:
log.warning('delete(): operation failed: object does not exist: %s', rods_path)
except AssertionError as e:
# delete() does not raise on deletion failure
log.error('delete(): operation failed: %s', e)
finally:
# remove the cached entry (finally is executed even when the try
# contains a return)
self.__clean_cache_entry(self, obj, **kwargs)
return False
@local_extra_dirs
def get_data(self, obj, start=0, count=-1, **kwargs):
log.debug('get_data(): %s')
h = self.__get_rods_handle(obj, **kwargs)
try:
h.seek(start)
except AttributeError:
raise ObjectNotFound()
if count == -1:
return h.read()
else:
return h.read(count)
# TODO: make sure implicit close is okay, DiskObjectStore actually
# reads data into a var, closes, and returns the var
@local_extra_dirs
def get_filename(self, obj, **kwargs):
log.debug("get_filename(): called on %s %s. For better performance, avoid this method and use get_data() instead.", obj.__class__.__name__, obj.id)
cached_path = self.__get_cache_path(obj, **kwargs)
if not self.exists(obj, **kwargs):
raise ObjectNotFound()
# TODO: implement or define whether dir_only is valid
if 'dir_only' in kwargs:
raise NotImplementedError()
# cache hit
if os.path.exists(cached_path):
return os.path.abspath(cached_path)
# cache miss
# TODO: thread this
incoming_path = os.path.join(os.path.dirname(cached_path), "__incoming_%s" % os.path.basename(cached_path))
doi = irods.dataObjInp_t()
doi.objPath = self.__get_rods_path(obj, **kwargs)
doi.dataSize = 0 # TODO: does this affect performance? should we get size?
doi.numThreads = 0
# TODO: might want to VERIFY_CHKSUM_KW
log.debug('get_filename(): caching %s to %s', doi.objPath, incoming_path)
# do the iget
status = irods.rcDataObjGet(self.rods_conn, doi, incoming_path)
# if incoming already exists, we'll wait for another process or thread
# to finish caching
if status != irods.OVERWRITE_WITHOUT_FORCE_FLAG:
assert status == 0, 'get_filename(): iget %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status))
# POSIX rename is atomic
# TODO: rename without clobbering
os.rename(incoming_path, cached_path)
log.debug('get_filename(): cached %s to %s', doi.objPath, cached_path)
# another process or thread is caching, wait for it
while not os.path.exists(cached_path):
# TODO: force restart after mod time > some configurable, or
# otherwise deal with this potential deadlock and interrupted
# transfers
time.sleep(5)
log.debug("get_filename(): waiting on incoming '%s' for %s %s", incoming_path, obj.__class__.__name__, obj.id)
return os.path.abspath(cached_path)
@local_extra_dirs
def update_from_file(self, obj, file_name=None, create=False, **kwargs):
assert 'dir_only' not in kwargs, 'update_from_file(): `dir_only` parameter is invalid here'
# do not create if not requested
if create and not self.exists(obj, **kwargs):
raise ObjectNotFound()
if file_name is None:
file_name = self.__get_cache_path(obj, **kwargs)
# put will create if necessary
doi = irods.dataObjInp_t()
doi.objPath = self.__get_rods_path(obj, **kwargs)
doi.createMode = 0o640
doi.dataSize = os.stat(file_name).st_size
doi.numThreads = 0
irods.addKeyVal(doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource)
irods.addKeyVal(doi.condInput, irods.FORCE_FLAG_KW, '')
# TODO: might want to VERIFY_CHKSUM_KW
log.debug('update_from_file(): updating %s to %s', file_name, doi.objPath)
# do the iput
status = irods.rcDataObjPut(self.rods_conn, doi, file_name)
assert status == 0, 'update_from_file(): iput %s failed (%s): %s' % (doi.objPath, status, irods.strerror(status))
# monkeypatch an strerror method into the irods module
def _rods_strerror(errno):
"""
The missing `strerror` for iRODS error codes
"""
if not hasattr(irods, '__rods_strerror_map'):
irods.__rods_strerror_map = {}
for name in dir(irods):
v = getattr(irods, name)
if type(v) == int and v < 0:
irods.__rods_strerror_map[v] = name
return irods.__rods_strerror_map.get(errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND')
if irods is not None:
irods.strerror = _rods_strerror
[docs]def rods_connect():
"""
A basic iRODS connection mechanism that connects using the current iRODS
environment
"""
status, env = irods.getRodsEnv()
assert status == 0, 'connect(): getRodsEnv() failed (%s): %s' % (status, irods.strerror(status))
conn, err = irods.rcConnect(env.rodsHost,
env.rodsPort,
env.rodsUserName,
env.rodsZone)
assert err.status == 0, 'connect(): rcConnect() failed (%s): %s' % (err.status, err.msg)
status, pw = irods.obfGetPw()
assert status == 0, 'connect(): getting password with obfGetPw() failed (%s): %s' % (status, irods.strerror(status))
status = irods.clientLoginWithObfPassword(conn, pw)
assert status == 0, 'connect(): logging in with clientLoginWithObfPassword() failed (%s): %s' % (status, irods.strerror(status))
return env, conn