Source code for galaxy.objectstore.pithos

# This work originally developed by Stavros Sachtouris <saxtouri@grnet.gr>
# as part of the effort committed GRNET S.A. (Greek Research and Technology
# Network) in the context of the OpenMinTeD project (openminted.eu)

import logging
import os

try:
    from kamaki.clients import (
        astakos,
        Client as KamakiClient,
        ClientError,
        pithos,
        utils,
    )
except ImportError:
    KamakiClient = None

from galaxy.util import directory_hash_id
from ._caching_base import CachingConcreteObjectStore

NO_KAMAKI_ERROR_MESSAGE = (
    "ObjectStore configured, but no kamaki.clients dependency available."
    "Please install and properly configure kamaki.clients or modify Object "
    "Store configuration."
)

log = logging.getLogger(__name__)


[docs]def parse_config_xml(config_xml): """Parse and validate config_xml, return dict for convenience :param config_xml: (lxml.etree.Element) root of XML subtree :returns: (dict) according to syntax :raises: various XML parse errors """ r = {} try: for tag, required_attrs, optional_attrs in ( ( "auth", ( "url", "token", ), ( "ca_certs", "ignore_ssl", ), ), ("container", ("name",), ("project",)), ): element = config_xml.findall(tag)[0] required = tuple((k, element.get(k)) for k in required_attrs) for k, v in required: if not v: msg = f"No value for {tag}:{k} in XML tree" log.error(msg) raise Exception(msg) optional = tuple((k, element.get(k)) for k in optional_attrs) r[tag] = dict(required + optional) # Extract extra_dir tag, attrs = "extra_dir", ("type", "path") extra_dirs = config_xml.findall(tag) if not extra_dirs: msg = f"No {tag} element in XML tree" log.error(msg) raise Exception(msg) r["extra_dirs"] = [{k: e.get(k) for k in attrs} for e in extra_dirs] r["private"] = CachingConcreteObjectStore.parse_private_from_config_xml(config_xml) if "job_work" not in (d["type"] for d in r["extra_dirs"]): msg = f'No value for {tag}:type="job_work" in XML tree' log.error(msg) raise Exception(msg) except Exception: log.exception("Malformed PithosObjectStore Configuration XML, unable to continue") raise return r
[docs]class PithosObjectStore(CachingConcreteObjectStore): """ Object store that stores objects as items in a Pithos+ container. Cache is ignored for the time being. """ store_type = "pithos"
[docs] def __init__(self, config, config_dict): super().__init__(config, config_dict) self.staging_path = self.config.file_path log.info("Parse config_xml for pithos object store") self.config_dict = config_dict self._initialize()
def _initialize(self): if KamakiClient is None: raise Exception(NO_KAMAKI_ERROR_MESSAGE) self._ensure_staging_path_writable() log.info("Authenticate Synnefo account") self._authenticate() log.info("Initialize Pithos+ client") self._init_pithos()
[docs] @classmethod def parse_xml(clazz, config_xml): return parse_config_xml(config_xml)
[docs] def to_dict(self): as_dict = super().to_dict() as_dict.update(self.config_dict) return as_dict
def _authenticate(self): auth = self.config_dict["auth"] url, token = auth["url"], auth["token"] ca_certs = auth.get("ca_certs") if ca_certs: utils.https.patch_with_certs(ca_certs) elif auth.get("ignore_ssl").lower() in ("true", "yes", "on"): utils.https.patch_ignore_ssl() self.astakos = astakos.AstakosClient(url, token) def _init_pithos(self): uuid, token = self.astakos.user_term("id"), self.astakos.token service_type = pithos.PithosClient.service_type pithos_url = self.astakos.get_endpoint_url(service_type) container = self.config_dict["container"]["name"] self.pithos = pithos.PithosClient(pithos_url, token, uuid, container) # Create container if not exist, or reassign to named project project = self.config_dict["container"].get("project", None) try: c = self.pithos.get_container_info() except ClientError as ce: if ce.status not in (404,): raise c = self.pithos.create_container(project_id=project) return if project and c.get("x-container-policy-project") != project: self.pithos.reassign_container(project) def _download(self, rel_path): local_destination = self._get_cache_path(rel_path) self.pithos.download_object(rel_path, local_destination) # No need to overwrite "shutdown" def _exists(self, obj, **kwargs): """Check if file exists, fix if file in cache and not on Pithos+ :returns: weather the file exists remotely or in cache """ path = self._construct_path(obj, **kwargs) try: self.pithos.get_object_info(path) return True except ClientError as ce: if ce.status not in (404,): raise in_cache = self._in_cache(path) dir_only = kwargs.get("dir_only", False) if dir_only: base_dir = kwargs.get("base_dir", None) if in_cache: return True elif base_dir: # for JOB_WORK directory if not os.path.exists(path): os.makedirs(path, exist_ok=True) return True return False if in_cache: cache_path = self._get_cache_path(path) # Maybe the upload should have happened in some thread elsewhere? with open(cache_path) as f: self.pithos.upload_object(path, f) return True return False def _create(self, obj, **kwargs): """Touch a file (aka create empty), if it doesn't exist""" if not self._exists(obj, **kwargs): # Pull out locally used fields extra_dir = kwargs.get("extra_dir", None) extra_dir_at_root = kwargs.get("extra_dir_at_root", False) dir_only = kwargs.get("dir_only", False) alt_name = kwargs.get("alt_name", None) # Construct hashed path rel_path = os.path.join(*directory_hash_id(self._get_object_id(obj))) # Optionally append extra_dir if extra_dir is not None: if extra_dir_at_root: rel_path = os.path.join(extra_dir, rel_path) else: rel_path = os.path.join(rel_path, extra_dir) # Create given directory in cache cache_dir = os.path.join(self.staging_path, rel_path) if not os.path.exists(cache_dir): os.makedirs(cache_dir, exist_ok=True) if dir_only: self.pithos.upload_from_string(rel_path, "", content_type="application/directory") else: rel_path = os.path.join(rel_path, alt_name if alt_name else f"dataset_{self._get_object_id(obj)}.dat") new_file = os.path.join(self.staging_path, rel_path) open(new_file, "w").close() self.pithos.upload_from_string(rel_path, "") return self def _get_remote_size(self, path): try: file = self.pithos.get_object_info(path) except ClientError as ce: if ce.status not in (404,): raise return 0 return int(file["content-length"]) def _delete_remote_all(self, path: str) -> bool: try: log.debug(f"On Pithos: delete -r {path}/") self.pithos.del_object(path, delimiter="/") return True except ClientError: log.exception(f"Could not delete path '{path}' from Pithos") return False def _delete_existing_remote(self, path: str) -> bool: try: self.pithos.del_object(path) return True except ClientError: log.exception(f"Could not delete path '{path}' from Pithos") return False def _get_object_url(self, obj, **kwargs): """ :returns: URL for direct access, None if no object """ if self._exists(obj, **kwargs): path = self._construct_path(obj, **kwargs) try: return self.pithos.publish_object(path) except ClientError as ce: log.exception(f'Trouble generating URL for dataset "{path}"') log.exception(f"Kamaki: {ce}") return None def _get_store_usage_percent(self): """ :returns: percentage indicating how full the store is """ quotas = self.astakos.get_quotas() project = self.config_dict["container"]["project"] pithos_quotas = quotas[project]["pithos.diskspace"] usage = pithos_quotas["usage"] limit = min(pithos_quotas["limit"], pithos_quotas["project_limit"]) return (100.0 * usage) / limit