Browse Source

Gossip/ConsumerStep must not use the same channel as the task consumer... Closes #1580

Ask Solem 11 years ago
parent
commit
c9e1efed9d
2 changed files with 14 additions and 8 deletions
  1. 7 6
      celery/bootsteps.py
  2. 7 2
      celery/events/__init__.py

+ 7 - 6
celery/bootsteps.py

@@ -404,16 +404,17 @@ class ConsumerStep(StartStopStep):
         raise NotImplementedError('missing get_consumers')
 
     def start(self, c):
-        self.consumers = self.get_consumers(c.connection)
+        channel = c.connection.channel()
+        self.consumers = self.get_consumers(channel)
         for consumer in self.consumers or []:
             consumer.consume()
 
     def stop(self, c):
+        channels = set()
         for consumer in self.consumers or []:
             ignore_errors(c.connection, consumer.cancel)
-
-    def shutdown(self, c):
-        self.stop(c)
-        for consumer in self.consumers or []:
             if consumer.channel:
-                ignore_errors(c.connection, consumer.channel.close)
+                channels.add(consumer.channel)
+        for channel in channels:
+            ignore_errors(c.connection, channel.close)
+    shutdown = stop

+ 7 - 2
celery/events/__init__.py

@@ -21,6 +21,7 @@ from copy import copy
 from operator import itemgetter
 
 from kombu import Exchange, Queue, Producer
+from kombu.connection import maybe_channel
 from kombu.mixins import ConsumerMixin
 from kombu.utils import cached_property
 
@@ -262,10 +263,10 @@ class EventReceiver(ConsumerMixin):
     """
     app = None
 
-    def __init__(self, connection, handlers=None, routing_key='#',
+    def __init__(self, channel, handlers=None, routing_key='#',
                  node_id=None, app=None, queue_prefix='celeryev'):
         self.app = app_or_default(app or self.app)
-        self.connection = connection
+        self.channel = maybe_channel(channel)
         self.handlers = {} if handlers is None else handlers
         self.routing_key = routing_key
         self.node_id = node_id or uuid()
@@ -339,6 +340,10 @@ class EventReceiver(ConsumerMixin):
     def _receive(self, body, message):
         self.process(*self.event_from_message(body))
 
+    @property
+    def connection(self):
+        return self.channel.connection.client if self.channel else None
+
 
 class Events(object):