Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.objectstore.pulsar
from __future__ import absolute_import # Need to import pulsar_client absolutely.
from ..objectstore import ObjectStore
try:
from pulsar.client.manager import ObjectStoreClientManager
except ImportError:
ObjectStoreClientManager = None
[docs]class PulsarObjectStore(ObjectStore):
"""
Object store implementation that delegates to a remote Pulsar server.
This may be more aspirational than practical for now, it would be good to
Galaxy to a point that a handler thread could be setup that doesn't attempt
to access the disk files returned by a (this) object store - just passing
them along to the Pulsar unmodified. That modification - along with this
implementation and Pulsar job destinations would then allow Galaxy to fully
manage jobs on remote servers with completely different mount points.
This implementation should be considered beta and may be dropped from
Galaxy at some future point or significantly modified.
"""
[docs] def __init__(self, config, config_xml):
self.pulsar_client = self.__build_pulsar_client(config_xml)
[docs] def exists(self, obj, **kwds):
return self.pulsar_client.exists(**self.__build_kwds(obj, **kwds))
[docs] def file_ready(self, obj, **kwds):
return self.pulsar_client.file_ready(**self.__build_kwds(obj, **kwds))
[docs] def create(self, obj, **kwds):
return self.pulsar_client.create(**self.__build_kwds(obj, **kwds))
[docs] def empty(self, obj, **kwds):
return self.pulsar_client.empty(**self.__build_kwds(obj, **kwds))
[docs] def size(self, obj, **kwds):
return self.pulsar_client.size(**self.__build_kwds(obj, **kwds))
[docs] def delete(self, obj, **kwds):
return self.pulsar_client.delete(**self.__build_kwds(obj, **kwds))
# TODO: Optimize get_data.
[docs] def get_data(self, obj, **kwds):
return self.pulsar_client.get_data(**self.__build_kwds(obj, **kwds))
[docs] def get_filename(self, obj, **kwds):
return self.pulsar_client.get_filename(**self.__build_kwds(obj, **kwds))
[docs] def update_from_file(self, obj, **kwds):
return self.pulsar_client.update_from_file(**self.__build_kwds(obj, **kwds))
[docs] def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None):
return None
def __build_kwds(self, obj, **kwds):
kwds['object_id'] = obj.id
return kwds
def __build_pulsar_client(self, config_xml):
if ObjectStoreClientManager is None:
raise Exception("Pulsar client code not available, cannot use this module.")
url = config_xml.get("url")
private_token = config_xml.get("private_token", None)
transport = config_xml.get("transport", None)
manager_options = dict(transport=transport)
client_options = dict(url=url, private_token=private_token)
pulsar_client = ObjectStoreClientManager(**manager_options).get_client(client_options)
return pulsar_client