Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web_stack.message
"""Web Application Stack worker messaging
"""
import json
import logging
import six
log = logging.getLogger(__name__)
[docs]class ApplicationStackMessageDispatcher:
def __func_name(self, func, name):
if not name:
name = func.__name__
return name
[docs] def register_func(self, func, name=None):
name = self.__func_name(func, name)
self.__funcs[name] = func
[docs] def deregister_func(self, func=None, name=None):
name = self.__func_name(func, name)
try:
del self.__funcs[name]
except KeyError:
pass
@property
def handler_count(self):
return len(self.__funcs)
[docs] def dispatch(self, msg_str):
msg = decode(msg_str)
try:
msg.validate()
except AssertionError as exc:
log.error('Invalid message received: %s, error: %s', msg_str, exc)
return
if msg.target not in self.__funcs:
log.error("Received message with target '%s' but no functions were registered with that name. Params were: %s", msg.target, msg.params)
else:
self.__funcs[msg.target](msg)
[docs]class ApplicationStackMessage(dict):
target = None
default_handler = None
_validate_kwargs = ('target',)
[docs] def __init__(self, target=None, **kwargs):
self['target'] = target or self.__class__.target
self._merge_class_tuples()
def _merge_class_tuples(self):
"""Locates any class-level tuples beginning with a single (but not double) underscore in the MRO and creates a
property on the instance with the same name (without the leading underscore) that will return the union of
those tuples.
"""
names = set()
for cls in reversed(self.__class__.mro()):
names.update([x for x in dir(cls) if x.startswith('_') and not x.startswith('__') and type(getattr(cls, x)) == tuple])
for name in names:
setattr(self.__class__, name.lstrip('_'), property(lambda self, name=name: self._get_list_from_mro(name)))
def _get_list_from_mro(self, name):
"""Locate all class-level tuples with the given `name` in the MRO and return their union.
"""
r = set()
for cls in reversed(self.__class__.mro()):
r.update(getattr(cls, name, []))
return r
def _validate_items(self, obj, items, name):
for item in items:
assert item in obj, "Missing '{}' message {}".format(item, name)
[docs] def bind_default_handler(self, obj, name):
"""Bind the default handler method to `obj` as attribute `name`.
This could also be implemented as a mixin class.
"""
assert self.default_handler is not None, '%s has no default handler method, cannot bind' % self.__class__.__name__
setattr(obj, name, six.create_bound_method(self.default_handler, obj))
log.debug("Bound default message handler '%s.%s' to %s", self.__class__.__name__, self.default_handler.__name__,
getattr(obj, name))
@property
def target(self):
return self['target']
@target.setter
def set_target(self, target):
self['target'] = target
[docs]class ParamMessage(ApplicationStackMessage):
_validate_kwargs = ('params',)
_validate_params = ()
_exclude_params = ()
[docs] def __init__(self, target=None, params=None, **kwargs):
super().__init__(target=target)
self['params'] = params or {}
for k, v in kwargs.items():
self['params'][k] = v
[docs] def validate(self):
super().validate()
self._validate_items(self['params'], self.validate_params, 'parameters')
@property
def params(self):
d = self['params'].copy()
for key in self.exclude_params:
d.pop(key, None)
return d
@params.setter
def set_params(self, params):
self['params'] = params
[docs]class TaskMessage(ParamMessage):
_validate_params = ('task',)
_exclude_params = ('task',)
[docs] @staticmethod
def default_handler(self, msg):
"""Can be bound to an instance of any class that has message handling methods named like `_handle_{task}_method`
"""
name = '_handle_{task}_msg'.format(task=msg.task)
assert name in dir(self), "{cls} has no method _handle_{task}_msg, cannot handle message: {msg}".format(
cls=self.__class__.__name__,
task=msg.task,
msg=msg)
getattr(self, '_handle_%s_msg' % msg.task)(**msg.params)
@property
def task(self):
return self['params']['task']
[docs]class WorkflowSchedulingMessage(TaskMessage):
target = 'workflow_scheduling'
_validate_params = ('workflow_invocation_id',)
[docs]def decode(msg_str):
d = json.loads(msg_str)
cls = d.pop('__classname__')
return globals()[cls](**d)