Browse Source

Add callback for consumer start

Rumyana Neykova 13 năm trước cách đây
mục cha
commit
40e02ed82d
1 tập tin đã thay đổi với 13 bổ sung3 xóa
  1. 13 3
      celery/worker/__init__.py

+ 13 - 3
celery/worker/__init__.py

@@ -71,7 +71,8 @@ class Namespace(bootsteps.Namespace):
                           'celery.worker.autoscale',
                           'celery.worker.autoreload',
                           'celery.worker.consumer',
-                          'celery.worker.mediator')
+                          'celery.worker.mediator', 
+                          'celery.worker.actorsbootstrap')
 
     def modules(self):
         return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
@@ -90,6 +91,7 @@ class WorkController(configurated):
     send_events = from_config()
     pool_cls = from_config('pool')
     consumer_cls = from_config('consumer')
+    actors_manager_cls = from_config('actor_manager')
     mediator_cls = from_config('mediator')
     timer_cls = from_config('timer')
     timer_precision = from_config('timer_precision')
@@ -97,6 +99,7 @@ class WorkController(configurated):
     autoreloader_cls = from_config('autoreloader')
     schedule_filename = from_config()
     scheduler_cls = from_config('celerybeat_scheduler')
+    actors_manager_cls = from_config('actors_manager')
     task_time_limit = from_config()
     task_soft_time_limit = from_config()
     max_tasks_per_child = from_config()
@@ -110,9 +113,11 @@ class WorkController(configurated):
 
     _state = None
     _running = 0
+    on_consumer_ready_callbacks = None
     pidlock = None
 
     def __init__(self, app=None, hostname=None, **kwargs):
+        self.on_consumer_ready_callbacks = []
         self.app = app_or_default(app or self.app)
         # all new threads start without a current app, so if an app is not
         # passed on to the thread it will fall back to the "default app",
@@ -137,7 +142,9 @@ class WorkController(configurated):
         pass
 
     def on_consumer_ready(self, consumer):
-        pass
+        print 'In consumer_ready'
+        print self.on_consumer_ready_callbacks
+        [callback(consumer) for callback in self.on_consumer_ready_callbacks] 
 
     def setup_instance(self, queues=None, ready_callback=None,
             pidfile=None, include=None, **kwargs):
@@ -156,7 +163,9 @@ class WorkController(configurated):
 
         # Options
         self.loglevel = mlevel(self.loglevel)
-        self.ready_callback = ready_callback or self.on_consumer_ready
+        if ready_callback:
+            self.on_consumer_ready_callbacks.append(ready_callback)
+        self.ready_callback = self.on_consumer_ready
         self.use_eventloop = self.should_use_eventloop()
 
         signals.worker_init.send(sender=self)
@@ -201,6 +210,7 @@ class WorkController(configurated):
         if self.pidfile:
             self.pidlock = platforms.create_pidlock(self.pidfile)
         try:
+            print self.components
             for i, component in enumerate(self.components):
                 logger.debug('Starting %s...', qualname(component))
                 self._running = i + 1