|
@@ -146,6 +146,7 @@ class Consumer(object):
|
|
|
'celery.worker.consumer:Events',
|
|
|
'celery.worker.consumer:Gossip',
|
|
|
'celery.worker.consumer:Heart',
|
|
|
+ 'celery.worker.consumer:Control',
|
|
|
'celery.worker.consumer:Tasks',
|
|
|
'celery.worker.consumer:Evloop',
|
|
|
'celery.worker.consumer:Agent',
|
|
@@ -200,15 +201,10 @@ class Consumer(object):
|
|
|
# so if the connection timeout is exceeded once, it can NEVER
|
|
|
# connect again.
|
|
|
self.app.conf.BROKER_CONNECTION_TIMEOUT = None
|
|
|
-
|
|
|
-
|
|
|
- additional_steps = None
|
|
|
- if self.app.conf.CELERY_ENABLE_REMOTE_CONTROL:
|
|
|
- additional_steps = ['celery.worker.consumer:Control']
|
|
|
|
|
|
self.steps = []
|
|
|
self.blueprint = self.Blueprint(
|
|
|
- app=self.app, on_close=self.on_close, steps=additional_steps,
|
|
|
+ app=self.app, on_close=self.on_close,
|
|
|
)
|
|
|
self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
|
|
|
|
|
@@ -488,6 +484,9 @@ class Control(bootsteps.StartStopStep):
|
|
|
self.start = self.box.start
|
|
|
self.stop = self.box.stop
|
|
|
self.shutdown = self.box.shutdown
|
|
|
+
|
|
|
+ def include_if(self, c):
|
|
|
+ return c.app.conf.CELERY_ENABLE_REMOTE_CONTROL
|
|
|
|
|
|
|
|
|
class Tasks(bootsteps.StartStopStep):
|