Quellcode durchsuchen

Heart is now using timer2.Timer

Ask Solem vor 14 Jahren
Ursprung
Commit
01487be740
3 geänderte Dateien mit 27 neuen und 51 gelöschten Zeilen
  1. 3 0
      celery/worker/__init__.py
  2. 6 4
      celery/worker/consumer.py
  3. 18 47
      celery/worker/heartbeat.py

+ 3 - 0
celery/worker/__init__.py

@@ -183,6 +183,8 @@ class WorkController(object):
                                 timeout=self.task_time_limit,
                                 soft_timeout=self.task_soft_time_limit,
                                 putlocks=self.pool_putlocks)
+        self.priority_timer = instantiate(self.pool.Timer)
+
         if not self.eta_scheduler_cls:
             # Default Timer is set by the pool, as e.g. eventlet
             # needs a custom implementation.
@@ -223,6 +225,7 @@ class WorkController(object):
                                     init_callback=self.ready_callback,
                                     initial_prefetch_count=prefetch_count,
                                     pool=self.pool,
+                                    priority_timer=self.priority_timer,
                                     app=self.app)
 
         # The order is important here;

+ 6 - 4
celery/worker/consumer.py

@@ -80,8 +80,8 @@ from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.exceptions import NotRegistered
 from celery.utils import noop
+from celery.utils import timer2
 from celery.utils.encoding import safe_repr, safe_str
-from celery.utils.timer2 import to_timestamp
 from celery.worker import state
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control.registry import Panel
@@ -229,7 +229,8 @@ class Consumer(object):
 
     def __init__(self, ready_queue, eta_schedule, logger,
             init_callback=noop, send_events=False, hostname=None,
-            initial_prefetch_count=2, pool=None, app=None):
+            initial_prefetch_count=2, pool=None, app=None,
+            priority_timer=None):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
@@ -244,6 +245,7 @@ class Consumer(object):
         self.event_dispatcher = None
         self.heart = None
         self.pool = pool
+        self.priority_timer = priority_timer or timer2.Timer()
         pidbox_state = AttributeDict(app=self.app,
                                      logger=logger,
                                      hostname=self.hostname,
@@ -308,7 +310,7 @@ class Consumer(object):
 
         if task.eta:
             try:
-                eta = to_timestamp(task.eta)
+                eta = timer2.to_timestamp(task.eta)
             except OverflowError, exc:
                 self.logger.error(
                     "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
@@ -505,7 +507,7 @@ class Consumer(object):
         self._state = RUN
 
     def restart_heartbeat(self):
-        self.heart = Heart(self.event_dispatcher)
+        self.heart = Heart(self.priority_timer, self.event_dispatcher)
         self.heart.start()
 
     def _open_connection(self):

+ 18 - 47
celery/worker/heartbeat.py

@@ -1,13 +1,11 @@
-import threading
-
-from time import time, sleep
-
 from celery.worker.state import SOFTWARE_INFO
+from celery.utils import timer2
 
 
-class Heart(threading.Thread):
-    """Thread sending heartbeats at regular intervals.
+class Heart(object):
+    """Timer sending heartbeats at regular intervals.
 
+    :param timer: Timer instance.
     :param eventer: Event dispatcher used to send the event.
     :keyword interval: Time in seconds between heartbeats.
                        Default is 2 minutes.
@@ -17,49 +15,22 @@ class Heart(threading.Thread):
     #: Beats per minute.
     bpm = 0.5
 
-    def __init__(self, eventer, interval=None):
-        super(Heart, self).__init__()
+    def __init__(self, timer, eventer, interval=None):
+        self.timer = timer
         self.eventer = eventer
-        self.bpm = interval and interval / 60.0 or self.bpm
-        self._shutdown = threading.Event()
-        self.setDaemon(True)
-        self.setName(self.__class__.__name__)
-        self._state = None
-
-    def run(self):
-        self._state = "RUN"
-        bpm = self.bpm
-        dispatch = self.eventer.send
-
-        dispatch("worker-online", **SOFTWARE_INFO)
-
-        # We can't sleep all of the interval, because then
-        # it takes 60 seconds (or value of interval) to shutdown
-        # the thread.
-
-        last_beat = None
-        while 1:
-            try:
-                now = time()
-            except TypeError:
-                # we lost the race at interpreter shutdown,
-                # so time() has been collected by gc.
-                return
+        self.interval = interval or 30
+        self.tref = None
 
-            if not last_beat or now > last_beat + (60.0 / bpm):
-                last_beat = now
-                dispatch("worker-heartbeat", **SOFTWARE_INFO)
-            if self._shutdown.isSet():
-                break
-            sleep(1)
+    def _send(self, event):
+        return self.eventer.send(event, **SOFTWARE_INFO)
 
-        dispatch("worker-offline", **SOFTWARE_INFO)
+    def start(self):
+        self._send("worker-online")
+        self.tref = self.timer.apply_interval(self.interval * 1000.0,
+                self._send, ("worker-heartbeat", ))
 
     def stop(self):
-        """Gracefully shutdown the thread."""
-        if not self._state == "RUN":
-            return
-        self._state = "CLOSE"
-        self._shutdown.set()
-        if self.isAlive():
-            self.join(1e10)
+        if self.tref is not None:
+            self.tref.cancel()
+            self.tref = None
+        self._send("worker-offline")