|
@@ -176,9 +176,9 @@ class Consumer(object):
|
|
|
self.pool = pool
|
|
|
self.timer = timer
|
|
|
self.strategies = self.Strategies()
|
|
|
- conninfo = self.app.connection()
|
|
|
- self.connection_errors = conninfo.connection_errors
|
|
|
- self.channel_errors = conninfo.channel_errors
|
|
|
+ self.conninfo = self.app.connection()
|
|
|
+ self.connection_errors = self.conninfo.connection_errors
|
|
|
+ self.channel_errors = self.conninfo.channel_errors
|
|
|
self._restart_state = restart_state(maxR=5, maxT=1)
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
@@ -685,7 +685,8 @@ class Control(bootsteps.StartStopStep):
|
|
|
self.shutdown = self.box.shutdown
|
|
|
|
|
|
def include_if(self, c):
|
|
|
- return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL
|
|
|
+ return (c.app.conf.CELERY_ENABLE_REMOTE_CONTROL and
|
|
|
+ 'fanout' in c.conninfo.transport.implements.exchange_type)
|
|
|
|
|
|
|
|
|
class Gossip(bootsteps.ConsumerStep):
|