Browse Source

Mingle+Gossip should only be enabled for amqp+redis transports

Closes #1664
Ask Solem 11 years ago
parent
commit
3c2db4ef7b
1 changed files with 19 additions and 7 deletions
  1. 19 7
      celery/worker/consumer.py

+ 19 - 7
celery/worker/consumer.py

@@ -193,13 +193,12 @@ class Consumer(object):
         self.task_buckets = defaultdict(lambda: None)
         self.reset_rate_limits()
 
-        if hub:
+        self.hub = hub
+        if self.hub:
             self.amqheartbeat = amqheartbeat
             if self.amqheartbeat is None:
                 self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
-            self.hub = hub
         else:
-            self.hub = None
             self.amqheartbeat = 0
 
         if not hasattr(self, 'loop'):
@@ -588,9 +587,10 @@ class Gossip(bootsteps.ConsumerStep):
     _cons_stamp_fields = itemgetter(
         'id', 'clock', 'hostname', 'pid', 'topic', 'action', 'cver',
     )
+    compatible_transports = set(['amqp', 'redis'])
 
     def __init__(self, c, without_gossip=False, interval=5.0, **kwargs):
-        self.enabled = not without_gossip
+        self.enabled = not without_gossip and self.compatible_transport(c.app)
         self.app = c.app
         c.gossip = self
         self.Receiver = c.app.events.Receiver
@@ -598,12 +598,15 @@ class Gossip(bootsteps.ConsumerStep):
         self.full_hostname = '.'.join([self.hostname, str(c.pid)])
 
         self.timer = c.timer
-        self.state = c.app.events.State()
+        if self.enabled:
+            self.state = c.app.events.State()
+            if c.hub:
+                c._mutex = DummyLock()
+            self.update_state = self.state.worker_event
         self.interval = interval
         self._tref = None
         self.consensus_requests = defaultdict(list)
         self.consensus_replies = {}
-        self.update_state = self.state.worker_event
         self.event_handlers = {
             'worker.elect': self.on_elect,
             'worker.elect.ack': self.on_elect_ack,
@@ -614,6 +617,10 @@ class Gossip(bootsteps.ConsumerStep):
             'task': self.call_task
         }
 
+    def compatible_transport(self, app):
+        with app.connection() as conn:
+            return conn.transport.driver_type in self.compatible_transports
+
     def election(self, id, topic, action=None):
         self.consensus_replies[id] = []
         self.dispatcher.send(
@@ -732,9 +739,14 @@ class Gossip(bootsteps.ConsumerStep):
 class Mingle(bootsteps.StartStopStep):
     label = 'Mingle'
     requires = (Gossip, )
+    compatible_transports = set(['amqp', 'redis'])
 
     def __init__(self, c, without_mingle=False, **kwargs):
-        self.enabled = not without_mingle
+        self.enabled = not without_mingle and self.compatible_transport(c.app)
+
+    def compatible_transport(self, app):
+        with app.connection() as conn:
+            return conn.transport.driver_type in self.compatible_transports
 
     def start(self, c):
         info('mingle: searching for neighbors')