Browse Source

Force using a fanout exchange for events when using Redis.

This ensures that the binding is deleted when the
event consumer exits, and also it will use LISTEN/SUBSCRIBE
for events rather than a list (which sounds more suitable).

Depends on:

    - ask/kombu@ee623816c9776b3266aff0e41ec7b57f9ce87cea

Closes #436
Ask Solem 13 years ago
parent
commit
844ba52315
1 changed files with 17 additions and 2 deletions
  1. 17 2
      celery/events/__init__.py

+ 17 - 2
celery/events/__init__.py

@@ -20,6 +20,7 @@ import threading
 
 from collections import deque
 from contextlib import contextmanager
+from copy import copy
 
 from kombu.common import eventloop
 from kombu.entity import Exchange, Queue
@@ -32,6 +33,14 @@ from celery.utils import uuid
 event_exchange = Exchange("celeryev", type="topic")
 
 
+def get_exchange(conn):
+    ex = copy(event_exchange)
+    if "redis" in conn.transport_cls:
+        # quick hack for #436
+        ex.type = "fanout"
+    return ex
+
+
 def Event(type, _fields=None, **fields):
     """Create an event.
 
@@ -91,9 +100,12 @@ class EventDispatcher(object):
     def __exit__(self, *exc_info):
         self.close()
 
+    def get_exchange(self):
+        return get_exchange(self.connection)
+
     def enable(self):
         self.publisher = Producer(self.channel or self.connection,
-                                  exchange=event_exchange,
+                                  exchange=self.get_exchange(),
                                   serializer=self.serializer)
         self.enabled = True
         for callback in self.on_enabled:
@@ -166,11 +178,14 @@ class EventReceiver(object):
         self.node_id = node_id or uuid()
         self.queue_prefix = queue_prefix
         self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
-                           exchange=event_exchange,
+                           exchange=self.get_exchange(),
                            routing_key=self.routing_key,
                            auto_delete=True,
                            durable=False)
 
+    def get_exchange(self):
+        return get_exchange(self.connection)
+
     def process(self, type, event):
         """Process the received event by dispatching it to the appropriate
         handler."""