Browse Source

Fixes event dispatcher bug

Ask Solem 12 years ago
parent
commit
9bee487420
1 changed files with 8 additions and 13 deletions
  1. 8 13
      celery/events/__init__.py

+ 8 - 13
celery/events/__init__.py

@@ -105,28 +105,25 @@ class EventDispatcher(object):
         if not connection and channel:
             self.connection = channel.connection.client
         self.enabled = enabled
-        if self.connection.transport.driver_type in self.DISABLED_TRANSPORTS:
+        conninfo = self.connection or self.app.connection()
+        self.exchange = get_exchange(conninfo)
+        if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
             self.enabled = False
         if self.enabled:
             self.enable()
         self.headers = {'hostname': self.hostname}
         self.pid = os.getpid()
 
+
     def __enter__(self):
         return self
 
     def __exit__(self, *exc_info):
         self.close()
 
-    def get_exchange(self):
-        if self.connection:
-            return get_exchange(self.connection)
-        else:
-            return get_exchange(self.channel.connection.client)
-
     def enable(self):
         self.producer = Producer(self.channel or self.connection,
-                                 exchange=self.get_exchange(),
+                                 exchange=self.exchange,
                                  serializer=self.serializer)
         self.enabled = True
         for callback in self.on_enabled:
@@ -146,7 +143,7 @@ class EventDispatcher(object):
             clock = None if blind else self.clock.forward()
             event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
                           pid=self.pid, clock=clock, **fields)
-            exchange = get_exchange(producer.connection)
+            exchange = self.exchange
             producer.publish(
                 event,
                 routing_key=type.replace('-', '.'),
@@ -223,14 +220,12 @@ class EventReceiver(ConsumerMixin):
         self.node_id = node_id or uuid()
         self.queue_prefix = queue_prefix
         self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
-                           exchange=self.get_exchange(),
+                           exchange=self.exchange,
                            routing_key=self.routing_key,
                            auto_delete=True,
                            durable=False)
         self.adjust_clock = self.app.clock.adjust
-
-    def get_exchange(self):
-        return get_exchange(self.connection)
+        self.exchange = get_exchange(self.connection or self.app.connection())
 
     def process(self, type, event):
         """Process the received event by dispatching it to the appropriate