Source code for galaxy.objectstore.pithos

# This work originally developed by Stavros Sachtouris <saxtouri@grnet.gr>
# as part of the effort committed GRNET S.A. (Greek Research and Technology
# Network) in the context of the OpenMinTeD project (openminted.eu)

import logging
import os
import shutil

try:
    from kamaki.clients import (
        astakos,
        Client as KamakiClient,
        ClientError,
        pithos,
        utils,
    )
except ImportError:
    KamakiClient = None

from galaxy.exceptions import (
    ObjectInvalid,
    ObjectNotFound,
)
from galaxy.util import (
    directory_hash_id,
    umask_fix_perms,
)
from galaxy.util.path import safe_relpath
from . import ConcreteObjectStore

NO_KAMAKI_ERROR_MESSAGE = (
    "ObjectStore configured, but no kamaki.clients dependency available."
    "Please install and properly configure kamaki.clients or modify Object "
    "Store configuration."
)

log = logging.getLogger(__name__)


[docs]def parse_config_xml(config_xml): """Parse and validate config_xml, return dict for convenience :param config_xml: (lxml.etree.Element) root of XML subtree :returns: (dict) according to syntax :raises: various XML parse errors """ r = dict() try: for tag, required_attrs, optional_attrs in ( ( "auth", ( "url", "token", ), ( "ca_certs", "ignore_ssl", ), ), ("container", ("name",), ("project",)), ): element = config_xml.findall(tag)[0] required = tuple((k, element.get(k)) for k in required_attrs) for k, v in required: if not v: msg = f"No value for {tag}:{k} in XML tree" log.error(msg) raise Exception(msg) optional = tuple((k, element.get(k)) for k in optional_attrs) r[tag] = dict(required + optional) # Extract extra_dir tag, attrs = "extra_dir", ("type", "path") extra_dirs = config_xml.findall(tag) if not extra_dirs: msg = f"No {tag} element in XML tree" log.error(msg) raise Exception(msg) r["extra_dirs"] = [{k: e.get(k) for k in attrs} for e in extra_dirs] r["private"] = ConcreteObjectStore.parse_private_from_config_xml(config_xml) if "job_work" not in (d["type"] for d in r["extra_dirs"]): msg = f'No value for {tag}:type="job_work" in XML tree' log.error(msg) raise Exception(msg) except Exception: log.exception("Malformed PithosObjectStore Configuration XML -- " "unable to continue") raise return r
[docs]class PithosObjectStore(ConcreteObjectStore): """ Object store that stores objects as items in a Pithos+ container. Cache is ignored for the time being. """ store_type = "pithos"
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) self.staging_path = self.config.file_path log.info("Parse config_xml for pithos object store") self.config_dict = config_dict log.debug(self.config_dict) self._initialize()
def _initialize(self): if KamakiClient is None: raise Exception(NO_KAMAKI_ERROR_MESSAGE) log.info("Authenticate Synnefo account") self._authenticate() log.info("Initialize Pithos+ client") self._init_pithos()
[docs] @classmethod def parse_xml(clazz, config_xml): return parse_config_xml(config_xml)
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update(self.config_dict) return as_dict
def _authenticate(self): auth = self.config_dict["auth"] url, token = auth["url"], auth["token"] ca_certs = auth.get("ca_certs") if ca_certs: utils.https.patch_with_certs(ca_certs) elif auth.get("ignore_ssl").lower() in ("true", "yes", "on"): utils.https.patch_ignore_ssl() self.astakos = astakos.AstakosClient(url, token) def _init_pithos(self): uuid, token = self.astakos.user_term("id"), self.astakos.token service_type = pithos.PithosClient.service_type pithos_url = self.astakos.get_endpoint_url(service_type) container = self.config_dict["container"]["name"] self.pithos = pithos.PithosClient(pithos_url, token, uuid, container) # Create container if not exist, or reassign to named project project = self.config_dict["container"].get("project", None) try: c = self.pithos.get_container_info() except ClientError as ce: if ce.status not in (404,): raise c = self.pithos.create_container(project_id=project) return if project and c.get("x-container-policy-project") != project: self.pithos.reassign_container(project) def _construct_path( self, obj, base_dir=None, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False, in_cache=False, **kwargs, ): """Construct path from object and parameters""" # param extra_dir: should never be constructed from provided data but # just make sure there are no shenannigans afoot if extra_dir and extra_dir != os.path.normpath(extra_dir): log.warning(f"extra_dir is not normalized: {extra_dir}") raise ObjectInvalid("The requested object is invalid") # ensure that any parent directory references in alt_name would not # result in a path not contained in the directory path constructed here if alt_name: if not safe_relpath(alt_name): log.warning(f"alt_name would locate path outside dir: {alt_name}") raise ObjectInvalid("The requested object is invalid") # alt_name can contain parent directory references, but S3 will not # follow them, so if they are valid we normalize them out alt_name = os.path.normpath(alt_name) rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # for JOB_WORK directory if obj_dir: rel_path = os.path.join(rel_path, str(self._get_object_id(obj))) if base_dir: base = self.extra_dirs.get(base_dir) return os.path.join(base, rel_path) # Pithos+ folders are marked by having trailing '/' so add it now rel_path = f"{rel_path}/" if not dir_only: an = alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat" rel_path = os.path.join(rel_path, an) if in_cache: return self._get_cache_path(rel_path) return rel_path def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) def _in_cache(self, rel_path): """Check if the given dataset is in the local cache and return True if so. """ cache_path = self._get_cache_path(rel_path) return os.path.exists(cache_path) def _fix_permissions(self, rel_path): """Set permissions on rel_path""" for basedir, _, files in os.walk(rel_path): umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid) for filename in files: path = os.path.join(basedir, filename) # Ignore symlinks if os.path.islink(path): continue umask_fix_perms(path, self.config.umask, 0o666, self.config.gid) def _pull_into_cache(self, rel_path): # Ensure the cache directory structure exists (e.g., dataset_#_files/) rel_path_dir = os.path.dirname(rel_path) rel_cache_path_dir = self._get_cache_path(rel_path_dir) if not os.path.exists(rel_cache_path_dir): os.makedirs(self._get_cache_path(rel_path_dir), exist_ok=True) # Now pull in the file cache_path = self._get_cache_path(rel_path_dir) self.pithos.download_object(rel_path, cache_path) self._fix_permissions(cache_path) return cache_path # No need to overwrite "shutdown" def _exists(self, obj, **kwargs): """Check if file exists, fix if file in cache and not on Pithos+ :returns: weather the file exists remotely or in cache """ path = self._construct_path(obj, **kwargs) try: self.pithos.get_object_info(path) return True except ClientError as ce: if ce.status not in (404,): raise in_cache = self._in_cache(path) dir_only = kwargs.get("dir_only", False) if dir_only: base_dir = kwargs.get("base_dir", None) if in_cache: return True elif base_dir: # for JOB_WORK directory if not os.path.exists(path): os.makedirs(path, exist_ok=True) return True return False if in_cache: cache_path = self._get_cache_path(path) # Maybe the upload should have happened in some thread elsewhere? with open(cache_path) as f: self.pithos.upload_object(path, f) return True return False def _create(self, obj, **kwargs): """Touch a file (aka create empty), if it doesn't exist""" if not self._exists(obj, **kwargs): # Pull out locally used fields extra_dir = kwargs.get("extra_dir", None) extra_dir_at_root = kwargs.get("extra_dir_at_root", False) dir_only = kwargs.get("dir_only", False) alt_name = kwargs.get("alt_name", None) # Construct hashed path rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) # Optionally append extra_dir if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # Create given directory in cache cache_dir = os.path.join(self.staging_path, rel_path) if not os.path.exists(cache_dir): os.makedirs(cache_dir, exist_ok=True) if dir_only: self.pithos.upload_from_string(rel_path, "", content_type="application/directory") else: rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") new_file = os.path.join(self.staging_path, rel_path) open(new_file, "w").close() self.pithos.upload_from_string(rel_path, "") return self def _empty(self, obj, **kwargs): """ :returns: weather the object has content :raises ObjectNotFound: """ if not self._exists(obj, **kwargs): raise ObjectNotFound(f"objectstore.empty, object does not exist: {obj}, kwargs: {kwargs}") return bool(self._size(obj, **kwargs)) def _size(self, obj, **kwargs) -> int: """ :returns: The size of the object, or 0 if it doesn't exist (sorry for that, not our fault, the ObjectStore interface is like that some times) """ path = self._construct_path(obj, **kwargs) if self._in_cache(path): try: return os.path.getsize(self._get_cache_path(path)) except OSError as ex: log.warning(f"Could not get size of file {path} in local cache," f"will try Pithos. Error: {ex}") try: file = self.pithos.get_object_info(path) except ClientError as ce: if ce.status not in (404,): raise return 0 return int(file["content-length"]) def _delete(self, obj, **kwargs): """Delete the object :returns: weather the object was deleted """ path = self._construct_path(obj, **kwargs) base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) obj_dir = kwargs.get("obj_dir", False) try: if all((base_dir, dir_only, obj_dir)): shutil.rmtree(os.path.abspath(path)) return True cache_path = self._get_cache_path(path) entire_dir = kwargs.get("entire_dir", False) extra_dir = kwargs.get("extra_dir", False) if entire_dir and extra_dir: shutil.rmtree(cache_path) log.debug(f"On Pithos: delete -r {path}/") self.pithos.del_object(path, delimiter="/") return True else: os.unlink(cache_path) self.pithos.del_object(path) except OSError: log.exception(f"{self._get_filename(obj, **kwargs)} delete error") except ClientError as ce: log.exception(f"Could not delete {path} from Pithos, {ce}") return False def _get_data(self, obj, start=0, count=-1, **kwargs): """Fetch (e.g., download) data :param start: Chunk of data starts here :param count: Fetch at most as many data, fetch all if negative """ path = self._construct_path(obj, **kwargs) if self._in_cache(path): cache_path = self._pull_into_cache(path) else: cache_path = self._get_cache_path(path) data_file = open(cache_path) data_file.seek(start) content = data_file.read(count) data_file.close() return content def _get_filename(self, obj, **kwargs): """Get the expected filename with absolute path""" base_dir = kwargs.get("base_dir", None) dir_only = kwargs.get("dir_only", False) obj_dir = kwargs.get("obj_dir", False) path = self._construct_path(obj, **kwargs) # for JOB_WORK directory if base_dir and dir_only and obj_dir: return os.path.abspath(path) cache_path = self._get_cache_path(path) if dir_only: if not os.path.exists(cache_path): os.makedirs(cache_path, exist_ok=True) return cache_path if self._in_cache(path): return cache_path elif self._exists(obj, **kwargs): if not dir_only: self._pull_into_cache(path) return cache_path raise ObjectNotFound(f"objectstore.get_filename, no cache_path: {obj}, kwargs: {kwargs}") def _update_from_file(self, obj, **kwargs): """Update the store when a file is updated""" if kwargs.get("create"): self._create(obj, **kwargs) if not self._exists(obj, **kwargs): raise ObjectNotFound(f"objectstore.update_from_file, object does not exist: {obj}, " f"kwargs: {kwargs}") path = self._construct_path(obj, **kwargs) cache_path = self._get_cache_path(path) file_name = kwargs.get("file_name") if file_name: source_path = os.path.abspath(file_name) try: if source_path != cache_path: shutil.copy2(source_path, cache_path) self._fix_permissions(cache_path) except OSError: log.exception( 'Trouble copying source file "{source}" to cache "{cache}"' "".format(source=source_path, cache=cache_path) ) else: with open(cache_path) as f: self.pithos.upload_object(obj, f) def _get_object_url(self, obj, **kwargs): """ :returns: URL for direct access, None if no object """ if self._exists(obj, **kwargs): path = self._construct_path(obj, **kwargs) try: return self.pithos.publish_object(path) except ClientError as ce: log.exception(f'Trouble generating URL for dataset "{path}"') log.exception(f"Kamaki: {ce}") return None def _get_store_usage_percent(self): """ :returns: percentage indicating how full the store is """ quotas = self.astakos.get_quotas() project = self.config_dict["container"]["project"] pithos_quotas = quotas[project]["pithos.diskspace"] usage = pithos_quotas["usage"] limit = min(pithos_quotas["limit"], pithos_quotas["project_limit"]) return (100.0 * usage) / limit