Selaa lähdekoodia

Only start timer threads when needed

Ask Solem 13 vuotta sitten
vanhempi
commit
73a7d10b56
3 muutettua tiedostoa jossa 17 lisäystä ja 5 poistoa
  1. 6 0
      celery/events/__init__.py
  2. 0 1
      celery/worker/__init__.py
  3. 11 4
      celery/worker/heartbeat.py

+ 6 - 0
celery/events/__init__.py

@@ -77,6 +77,8 @@ class EventDispatcher(object):
         self.publisher = None
         self._outbound_buffer = deque()
         self.serializer = serializer or self.app.conf.CELERY_EVENT_SERIALIZER
+        self.on_enabled = set()
+        self.on_disabled = set()
 
         self.enabled = enabled
         if self.enabled:
@@ -93,11 +95,15 @@ class EventDispatcher(object):
                                   exchange=event_exchange,
                                   serializer=self.serializer)
         self.enabled = True
+        for callback in self.on_enabled:
+            callback()
 
     def disable(self):
         if self.enabled:
             self.enabled = False
             self.close()
+            for callback in self.on_disabled:
+                callback()
 
     def send(self, type, **fields):
         """Send event.

+ 0 - 1
celery/worker/__init__.py

@@ -252,7 +252,6 @@ class WorkController(object):
         # and they must be stopped in reverse order.
         self.components = filter(None, (self.pool,
                                         self.mediator,
-                                        self.scheduler,
                                         self.beat,
                                         self.autoscaler,
                                         self.consumer))

+ 11 - 4
celery/worker/heartbeat.py

@@ -31,16 +31,23 @@ class Heart(object):
         self.interval = interval or 30
         self.tref = None
 
+        # Make event dispatcher start/stop us when it's
+        # enabled/disabled.
+        self.eventer.on_enabled.add(self.start)
+        self.eventer.on_disabled.add(self.stop)
+
     def _send(self, event):
         return self.eventer.send(event, **SOFTWARE_INFO)
 
     def start(self):
-        self._send("worker-online")
-        self.tref = self.timer.apply_interval(self.interval * 1000.0,
-                self._send, ("worker-heartbeat", ))
+        if self.eventer.enabled:
+            self._send("worker-online")
+            self.tref = self.timer.apply_interval(self.interval * 1000.0,
+                    self._send, ("worker-heartbeat", ))
 
     def stop(self):
         if self.tref is not None:
             self.timer.cancel(self.tref)
             self.tref = None
-        self._send("worker-offline")
+        if self.eventer.enabled:
+            self._send("worker-offline")