|
@@ -52,7 +52,8 @@ class EventDispatcher(object):
|
|
|
"""
|
|
|
|
|
|
def __init__(self, connection=None, hostname=None, enabled=True,
|
|
|
- channel=None, buffer_while_offline=True, app=None):
|
|
|
+ channel=None, buffer_while_offline=True, app=None,
|
|
|
+ serializer=None):
|
|
|
self.app = app_or_default(app)
|
|
|
self.connection = connection
|
|
|
self.channel = channel
|
|
@@ -62,6 +63,7 @@ class EventDispatcher(object):
|
|
|
self._lock = threading.Lock()
|
|
|
self.publisher = None
|
|
|
self._outbound_buffer = deque()
|
|
|
+ self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
|
|
|
|
|
|
if self.enabled:
|
|
|
self.enable()
|
|
@@ -70,9 +72,8 @@ class EventDispatcher(object):
|
|
|
conf = self.app.conf
|
|
|
self.enabled = True
|
|
|
channel = self.channel or self.connection.channel()
|
|
|
- self.publisher = Producer(channel,
|
|
|
- exchange=event_exchange,
|
|
|
- serializer=conf.CELERY_EVENT_SERIALIZER)
|
|
|
+ self.publisher = Producer(channel, exchange=event_exchange,
|
|
|
+ serializer=self.serializer)
|
|
|
|
|
|
def disable(self):
|
|
|
self.enabled = False
|
|
@@ -106,7 +107,10 @@ class EventDispatcher(object):
|
|
|
|
|
|
def flush(self):
|
|
|
while self._outbound_buffer:
|
|
|
- type, fields, _ = self._outbound_buffer.popleft()
|
|
|
+ try:
|
|
|
+ type, fields, _ = self._outbound_buffer.popleft()
|
|
|
+ except IndexError:
|
|
|
+ return
|
|
|
self.send(type, **fields)
|
|
|
|
|
|
def copy_buffer(self, other):
|