|
@@ -2,6 +2,7 @@ import time
|
|
import socket
|
|
import socket
|
|
import threading
|
|
import threading
|
|
|
|
|
|
|
|
+from collections import deque
|
|
from itertools import count
|
|
from itertools import count
|
|
|
|
|
|
from celery.messaging import EventPublisher, EventConsumer
|
|
from celery.messaging import EventPublisher, EventConsumer
|
|
@@ -43,6 +44,7 @@ class EventDispatcher(object):
|
|
self.enabled = enabled
|
|
self.enabled = enabled
|
|
self._lock = threading.Lock()
|
|
self._lock = threading.Lock()
|
|
self.publisher = None
|
|
self.publisher = None
|
|
|
|
+ self._outbound_buffer = deque()
|
|
|
|
|
|
if self.enabled:
|
|
if self.enabled:
|
|
self.enable()
|
|
self.enable()
|
|
@@ -68,11 +70,20 @@ class EventDispatcher(object):
|
|
return
|
|
return
|
|
|
|
|
|
self._lock.acquire()
|
|
self._lock.acquire()
|
|
|
|
+ event = Event(type, hostname=self.hostname, **fields)
|
|
try:
|
|
try:
|
|
- self.publisher.send(Event(type, hostname=self.hostname, **fields))
|
|
|
|
|
|
+ try:
|
|
|
|
+ self.publisher.send(event)
|
|
|
|
+ except Exception, exc:
|
|
|
|
+ self._outbound_buffer.append((event, exc))
|
|
finally:
|
|
finally:
|
|
self._lock.release()
|
|
self._lock.release()
|
|
|
|
|
|
|
|
+ def flush(self):
|
|
|
|
+ while self._outbound_buffer:
|
|
|
|
+ event, _ = self._outbound_buffer.popleft()
|
|
|
|
+ self.publisher.send(event)
|
|
|
|
+
|
|
def close(self):
|
|
def close(self):
|
|
"""Close the event dispatcher."""
|
|
"""Close the event dispatcher."""
|
|
self._lock.locked() and self._lock.release()
|
|
self._lock.locked() and self._lock.release()
|