"""
Object Store plugin for the Integrated Rule-Oriented Data System (iRODS)
"""
import logging
import os
import shutil
import threading
from datetime import datetime
from pathlib import Path
try:
import irods
import irods.keywords as kw
from irods.exception import (
CollectionDoesNotExist,
DataObjectDoesNotExist,
)
from irods.session import iRODSSession
except ImportError:
irods = None
from galaxy.util import (
ExecutionTimer,
string_as_bool,
unlink,
)
from ._caching_base import CachingConcreteObjectStore
IRODS_IMPORT_MESSAGE = "The Python irods package is required to use this feature, please install it"
# 1 MB
CHUNK_SIZE = 2**20
log = logging.getLogger(__name__)
logging.getLogger("irods.connection").setLevel(logging.INFO) # irods logging generates gigabytes of logs
def _config_xml_error(tag):
msg = f"No {tag} element in config XML tree"
raise Exception(msg)
def _config_dict_error(key):
msg = "No {key} key in config dictionary".forma(key=key)
raise Exception(msg)
[docs]def parse_config_xml(config_xml):
try:
a_xml = config_xml.findall("auth")
if not a_xml:
_config_xml_error("auth")
username = a_xml[0].get("username")
password = a_xml[0].get("password")
r_xml = config_xml.findall("resource")
if not r_xml:
_config_xml_error("resource")
resource_name = r_xml[0].get("name")
z_xml = config_xml.findall("zone")
if not z_xml:
_config_xml_error("zone")
zone_name = z_xml[0].get("name")
c_xml = config_xml.findall("connection")
if not c_xml:
_config_xml_error("connection")
host = c_xml[0].get("host", None)
port = int(c_xml[0].get("port", 0))
timeout = int(c_xml[0].get("timeout", 30))
refresh_time = int(c_xml[0].get("refresh_time", 300))
connection_pool_monitor_interval = int(c_xml[0].get("connection_pool_monitor_interval", -1))
c_xml = config_xml.findall("cache")
if not c_xml:
_config_xml_error("cache")
cache_size = float(c_xml[0].get("size", -1))
staging_path = c_xml[0].get("path", None)
cache_updated_data = string_as_bool(c_xml[0].get("cache_updated_data", "True"))
attrs = ("type", "path")
e_xml = config_xml.findall("extra_dir")
if not e_xml:
_config_xml_error("extra_dir")
extra_dirs = [{k: e.get(k) for k in attrs} for e in e_xml]
return {
"auth": {
"username": username,
"password": password,
},
"resource": {
"name": resource_name,
},
"zone": {
"name": zone_name,
},
"connection": {
"host": host,
"port": port,
"timeout": timeout,
"refresh_time": refresh_time,
"connection_pool_monitor_interval": connection_pool_monitor_interval,
},
"cache": {
"size": cache_size,
"path": staging_path,
"cache_updated_data": cache_updated_data,
},
"extra_dirs": extra_dirs,
"private": CachingConcreteObjectStore.parse_private_from_config_xml(config_xml),
}
except Exception:
# Toss it back up after logging, we can't continue loading at this point.
log.exception("Malformed iRODS ObjectStore Configuration XML -- unable to continue.")
raise
[docs]class IRODSObjectStore(CachingConcreteObjectStore):
"""
Object store that stores files as data objects in an iRODS Zone. A local cache
exists that is used as an intermediate location for files between Galaxy and iRODS.
"""
store_type = "irods"
[docs] def __init__(self, config, config_dict):
ipt_timer = ExecutionTimer()
super().__init__(config, config_dict)
auth_dict = config_dict.get("auth")
if auth_dict is None:
_config_dict_error("auth")
self.username = auth_dict.get("username")
if self.username is None:
_config_dict_error("auth->username")
self.password = auth_dict.get("password")
if self.password is None:
_config_dict_error("auth->password")
resource_dict = config_dict["resource"]
if resource_dict is None:
_config_dict_error("resource")
self.resource = resource_dict.get("name")
if self.resource is None:
_config_dict_error("resource->name")
zone_dict = config_dict["zone"]
if zone_dict is None:
_config_dict_error("zone")
self.zone = zone_dict.get("name")
if self.zone is None:
_config_dict_error("zone->name")
connection_dict = config_dict["connection"]
if connection_dict is None:
_config_dict_error("connection")
self.host = connection_dict.get("host")
if self.host is None:
_config_dict_error("connection->host")
self.port = connection_dict.get("port")
if self.port is None:
_config_dict_error("connection->port")
self.timeout = connection_dict.get("timeout")
if self.timeout is None:
_config_dict_error("connection->timeout")
self.refresh_time = connection_dict.get("refresh_time")
if self.refresh_time is None:
_config_dict_error("connection->refresh_time")
self.connection_pool_monitor_interval = connection_dict.get("connection_pool_monitor_interval")
if self.connection_pool_monitor_interval is None:
_config_dict_error("connection->connection_pool_monitor_interval")
cache_dict = config_dict.get("cache") or {}
self.cache_size = cache_dict.get("size") or self.config.object_store_cache_path
if self.cache_size is None:
_config_dict_error("cache->size")
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
if self.staging_path is None:
_config_dict_error("cache->path")
self.cache_updated_data = cache_dict.get("cache_updated_data", True)
extra_dirs = {e["type"]: e["path"] for e in config_dict.get("extra_dirs", [])}
if not extra_dirs:
_config_dict_error("extra_dirs")
self.extra_dirs.update(extra_dirs)
if irods is None:
raise Exception(IRODS_IMPORT_MESSAGE)
self.home = f"/{self.zone}/home/{self.username}"
if irods is None:
raise Exception(IRODS_IMPORT_MESSAGE)
self.session = iRODSSession(
host=self.host,
port=self.port,
user=self.username,
password=self.password,
zone=self.zone,
refresh_time=self.refresh_time,
)
# Set connection timeout
self.session.connection_timeout = self.timeout
if self.connection_pool_monitor_interval != -1:
# This Event object is initialized to False
# It is set to True in shutdown(), causing
# the connection pool monitor thread to return/terminate
self.stop_connection_pool_monitor_event = threading.Event()
self.connection_pool_monitor_thread = None
log.debug("irods_pt __init__: %s", ipt_timer)
[docs] def shutdown(self):
# This call will cleanup all the connections in the connection pool
# OSError sometimes happens on GitHub Actions, after the test has successfully completed. Ignore it if it happens.
ipt_timer = ExecutionTimer()
try:
self.session.cleanup()
except OSError:
pass
if self.connection_pool_monitor_interval != -1:
# Set to True so the connection pool monitor thread will return/terminate
self.stop_connection_pool_monitor_event.set()
if self.connection_pool_monitor_thread is not None:
self.connection_pool_monitor_thread.join(5)
log.debug("irods_pt shutdown: %s", ipt_timer)
[docs] @classmethod
def parse_xml(cls, config_xml):
return parse_config_xml(config_xml)
[docs] def start_connection_pool_monitor(self):
self.connection_pool_monitor_thread = threading.Thread(
target=self._connection_pool_monitor,
args=(),
kwargs={
"refresh_time": self.refresh_time,
"connection_pool_monitor_interval": self.connection_pool_monitor_interval,
"stop_connection_pool_monitor_event": self.stop_connection_pool_monitor_event,
},
name="ConnectionPoolMonitorThread",
daemon=True,
)
self.connection_pool_monitor_thread.start()
log.info("Connection pool monitor started")
[docs] def start(self):
if self.connection_pool_monitor_interval != -1:
self.start_connection_pool_monitor()
def _connection_pool_monitor(self, *args, **kwargs):
refresh_time = kwargs["refresh_time"]
connection_pool_monitor_interval = kwargs["connection_pool_monitor_interval"]
stop_connection_pool_monitor_event = kwargs["stop_connection_pool_monitor_event"]
while not stop_connection_pool_monitor_event.is_set():
curr_time = datetime.now()
idle_connection_set = self.session.pool.idle.copy()
for conn in idle_connection_set:
# If the connection was created more than 'refresh_time'
# seconds ago, release the connection (as its stale)
if (curr_time - conn.create_time).total_seconds() > refresh_time:
log.debug(
"Idle connection with id %s was created more than %s seconds ago. Releasing the connection.",
id(conn),
refresh_time,
)
self.session.pool.release_connection(conn, True)
stop_connection_pool_monitor_event.wait(connection_pool_monitor_interval)
[docs] def to_dict(self):
as_dict = super().to_dict()
as_dict.update(self._config_to_dict())
return as_dict
def _config_to_dict(self):
return {
"auth": {
"username": self.username,
"password": self.password,
},
"resource": {
"name": self.resource,
},
"zone": {
"name": self.zone,
},
"connection": {
"host": self.host,
"port": self.port,
"timeout": self.timeout,
"refresh_time": self.refresh_time,
"connection_pool_monitor_interval": self.connection_pool_monitor_interval,
},
"cache": {
"size": self.cache_size,
"path": self.staging_path,
"cache_updated_data": self.cache_updated_data,
},
}
# rel_path is file or folder?
def _get_remote_size(self, rel_path):
ipt_timer = ExecutionTimer()
p = Path(rel_path)
data_object_name = p.stem + p.suffix
subcollection_name = p.parent
collection_path = f"{self.home}/{subcollection_name}"
data_object_path = f"{collection_path}/{data_object_name}"
options = {kw.DEST_RESC_NAME_KW: self.resource}
try:
data_obj = self.session.data_objects.get(data_object_path, **options)
return data_obj.__sizeof__()
except (DataObjectDoesNotExist, CollectionDoesNotExist):
log.warning("Collection or data object (%s) does not exist", data_object_path)
return -1
finally:
log.debug("irods_pt _get_remote_size: %s", ipt_timer)
# rel_path is file or folder?
def _exists_remotely(self, rel_path):
ipt_timer = ExecutionTimer()
p = Path(rel_path)
data_object_name = p.stem + p.suffix
subcollection_name = p.parent
collection_path = f"{self.home}/{subcollection_name}"
data_object_path = f"{collection_path}/{data_object_name}"
options = {kw.DEST_RESC_NAME_KW: self.resource}
try:
self.session.data_objects.get(data_object_path, **options)
return True
except (DataObjectDoesNotExist, CollectionDoesNotExist):
log.debug("Collection or data object (%s) does not exist", data_object_path)
return False
finally:
log.debug("irods_pt _exists_remotely: %s", ipt_timer)
def _download(self, rel_path):
ipt_timer = ExecutionTimer()
cache_path = self._get_cache_path(rel_path)
log.debug("Pulling data object '%s' into cache to %s", rel_path, cache_path)
p = Path(rel_path)
data_object_name = p.stem + p.suffix
subcollection_name = p.parent
collection_path = f"{self.home}/{subcollection_name}"
data_object_path = f"{collection_path}/{data_object_name}"
# we need to allow irods to override already existing zero-size output files created
# in object store cache during job setup (see also https://github.com/galaxyproject/galaxy/pull/17025#discussion_r1394517033)
# TODO: get rid of this flag when Galaxy stops pre-creating those output files in cache
options = {kw.FORCE_FLAG_KW: "", kw.DEST_RESC_NAME_KW: self.resource}
try:
self.session.data_objects.get(data_object_path, cache_path, **options)
log.debug("Pulled data object '%s' into cache to %s", rel_path, cache_path)
return True
except (DataObjectDoesNotExist, CollectionDoesNotExist):
log.warning("Collection or data object (%s) does not exist", data_object_path)
return False
finally:
log.debug("irods_pt _download: %s", ipt_timer)
def _push_to_storage(self, rel_path, source_file=None, from_string=None):
"""
Push the file pointed to by ``rel_path`` to the iRODS. Extract folder name
from rel_path as iRODS collection name, and extract file name from rel_path
as iRODS data object name.
If ``source_file`` is provided, push that file instead while
still using ``rel_path`` for collection and object store names.
If ``from_string`` is provided, set contents of the file to the value of the string.
"""
ipt_timer = ExecutionTimer()
p = Path(rel_path)
data_object_name = p.stem + p.suffix
subcollection_name = p.parent
source_file = source_file if source_file else self._get_cache_path(rel_path)
options = {kw.FORCE_FLAG_KW: "", kw.DEST_RESC_NAME_KW: self.resource}
if not os.path.exists(source_file):
log.error(
"Tried updating key '%s' from source file '%s', but source file does not exist.", rel_path, source_file
)
return False
# Check if the data object exists in iRODS
collection_path = f"{self.home}/{subcollection_name}"
data_object_path = f"{collection_path}/{data_object_name}"
exists = False
try:
exists = self.session.data_objects.exists(data_object_path)
if os.path.getsize(source_file) == 0 and exists:
log.debug(
"Wanted to push file '%s' to iRODS collection '%s' but its size is 0; skipping.",
source_file,
rel_path,
)
return True
# Create sub-collection first
self.session.collections.create(collection_path, recurse=True, **options)
if from_string:
# Create data object
data_obj = self.session.data_objects.create(data_object_path, self.resource, **options)
# Save 'from_string' as a file
with data_obj.open("w") as data_obj_fp:
data_obj_fp.write(from_string)
# Add file containing 'from_string' to the irods collection, since
# put() expects a file as input. Get file name from data object's 'desc' field
self.session.data_objects.put(data_obj.desc, f"{collection_path}/", **options)
log.debug("Pushed data from string '%s' to collection '%s'", from_string, data_object_path)
else:
start_time = datetime.now()
log.debug(
"Pushing cache file '%s' of size %s bytes to collection '%s'",
source_file,
os.path.getsize(source_file),
rel_path,
)
# Add the source file to the irods collection
self.session.data_objects.put(source_file, data_object_path, **options)
end_time = datetime.now()
log.debug(
"Pushed cache file '%s' to collection '%s' (%s bytes transfered in %s sec)",
source_file,
rel_path,
os.path.getsize(source_file),
(end_time - start_time).total_seconds(),
)
return True
finally:
log.debug("irods_pt _push_to_storage: %s", ipt_timer)
def _delete(self, obj, entire_dir=False, **kwargs):
ipt_timer = ExecutionTimer()
rel_path = self._construct_path(obj, **kwargs)
extra_dir = kwargs.get("extra_dir", None)
base_dir = kwargs.get("base_dir", None)
dir_only = kwargs.get("dir_only", False)
obj_dir = kwargs.get("obj_dir", False)
options = {kw.DEST_RESC_NAME_KW: self.resource}
try:
# Remove temparory data in JOB_WORK directory
if base_dir and dir_only and obj_dir:
shutil.rmtree(os.path.abspath(rel_path))
return True
# For the case of extra_files, because we don't have a reference to
# individual files we need to remove the entire directory structure
# with all the files in it. This is easy for the local file system,
# but requires iterating through each individual key in irods and deleing it.
if entire_dir and extra_dir:
shutil.rmtree(self._get_cache_path(rel_path), ignore_errors=True)
col_path = f"{self.home}/{rel_path}"
col = None
try:
col = self.session.collections.get(col_path)
except CollectionDoesNotExist:
log.warning("Collection (%s) does not exist!", col_path)
return False
cols = col.walk()
# Traverse the tree only one level deep
for _ in range(2):
# get next result
_, _, data_objects = next(cols)
# Delete data objects
for data_object in data_objects:
data_object.unlink(force=True)
return True
else:
# Delete from cache first
unlink(self._get_cache_path(rel_path), ignore_errors=True)
# Delete from irods as well
p = Path(rel_path)
data_object_name = p.stem + p.suffix
subcollection_name = p.parent
collection_path = f"{self.home}/{subcollection_name}"
data_object_path = f"{collection_path}/{data_object_name}"
try:
data_obj = self.session.data_objects.get(data_object_path, **options)
# remove object
data_obj.unlink(force=True)
return True
except (DataObjectDoesNotExist, CollectionDoesNotExist):
log.info("Collection or data object (%s) does not exist", data_object_path)
return True
except OSError:
log.exception("%s delete error", self._get_filename(obj, **kwargs))
finally:
log.debug("irods_pt _delete: %s", ipt_timer)
return False
# Unlike S3, url is not really applicable to iRODS
def _get_object_url(self, obj, **kwargs):
if self._exists(obj, **kwargs):
rel_path = self._construct_path(obj, **kwargs)
p = Path(rel_path)
data_object_name = p.stem + p.suffix
subcollection_name = p.parent
collection_path = f"{self.home}/{subcollection_name}"
data_object_path = f"{collection_path}/{data_object_name}"
return data_object_path
def _get_store_usage_percent(self):
return 0.0