Browse Source

Implemented Heartbeats (may not be thread safe yet, must be tested)

Ask Solem 15 years ago
parent
commit
169d299483

+ 6 - 0
celery/monitoring/handlers/api.py

@@ -53,6 +53,11 @@ def list_task_types(request):
     return monitor_state.tasks_by_type()
 
 
+@api_handler
+def list_workers(request):
+    return monitor_state.list_workers()
+
+
 class RevokeTaskHandler(APIHandler):
 
     SUPPORTED_METHODS = ["POST"]
@@ -70,4 +75,5 @@ API = [
        (r"/task/$", list_tasks),
        (r"/revoke/task/", RevokeTaskHandler),
        (r"/task/(.+)", task_state),
+       (r"/worker/", list_workers),
 ]

+ 11 - 4
celery/monitoring/state.py

@@ -29,7 +29,6 @@ class MonitorState(object):
 
         task_events = []
         for event in self.task_events[task_id]:
-            print("EVENT >>>>>> %s" % event)
             if event["state"] in ("task-failed", "task-retried"):
                 task_info["exception"] = event["exception"]
                 task_info["traceback"] = event["traceback"]
@@ -40,13 +39,12 @@ class MonitorState(object):
 
         return task_info
 
-
     def receive_task_event(self, event):
         event["state"] = event.pop("type")
         event["when"] = self.timestamp_to_isoformat(event["timestamp"])
         self.task_events[event["uuid"]].append(event)
 
-    def timestamp_to_datetime(self, timestamp):
+    def timestamp_to_isoformat(self, timestamp):
         return datetime.fromtimestamp(timestamp).isoformat()
 
     def receive_heartbeat(self, event):
@@ -61,8 +59,17 @@ class MonitorState(object):
         self.tasks[task_info["uuid"]] = task_info
         self.task_events[event["uuid"]].append(event)
 
+    def list_workers(self):
+        alive_workers = []
+        for hostname, events in self.workers.items():
+            if events[-1]["state"] == "worker-online":
+                alive_workers.append({hostname: events[-1]["when"]})
+        return alive_workers
+
     def receive_worker_event(self, event):
-        self.workers[event["hostname"]].append(event["type"])
+        event["state"] = event.pop("type")
+        event["when"] = self.timestamp_to_isoformat(event["timestamp"])
+        self.workers[event["hostname"]].append(event)
 
     def worker_is_alive(self, hostname):
         last_worker_event = self.workers[hostname][-1]

+ 10 - 3
celery/worker/__init__.py

@@ -17,12 +17,13 @@ from celery import registry
 from celery.log import setup_logger
 from celery.beat import ClockServiceThread
 from celery.utils import retry_over_time
-from celery.worker.pool import TaskPool
 from celery.worker.job import TaskWrapper
+from celery.worker.pool import TaskPool
+from celery.worker.revoke import revoked
+from celery.worker.buckets import TaskBucket
+from celery.worker.heartbeat import Heart
 from celery.worker.scheduler import Scheduler
 from celery.worker.controllers import Mediator, ScheduleController
-from celery.worker.buckets import TaskBucket
-from celery.worker.revoke import revoked
 from celery.messaging import get_consumer_set, BroadcastConsumer
 from celery.exceptions import NotRegistered
 from celery.datastructures import SharedCounter
@@ -60,6 +61,8 @@ class CarrotListener(object):
         self.logger = logger
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.event_dispatcher = None
+        self.hostname = socket.gethostname()
+        self.heart = None
 
     def start(self):
         """Start the consumer.
@@ -146,6 +149,8 @@ class CarrotListener(object):
 
     def close_connection(self):
         """Close the AMQP connection."""
+        if self.heart:
+            self.heart.stop()
         if self.task_consumer:
             self.task_consumer.close()
             self.task_consumer = None
@@ -172,6 +177,8 @@ class CarrotListener(object):
         self.task_consumer.add_consumer(self.broadcast_consumer)
         self.task_consumer.register_callback(self.receive_message)
         self.event_dispatcher = EventDispatcher(self.amqp_connection)
+        self.heart = Heart(self.event_dispatcher, hostname=self.hostname)
+        self.heart.start()
 
     def _open_connection(self):
         """Retries connecting to the AMQP broker over time.

+ 38 - 0
celery/worker/heartbeat.py

@@ -0,0 +1,38 @@
+import socket
+import threading
+from time import sleep
+
+
+class Heart(threading.Thread):
+    interval = 60
+
+    def __init__(self, eventer, hostname=None, interval=None):
+        super(Heart, self).__init__()
+        self.eventer = eventer
+        self.interval = interval or self.interval
+        self.hostname = hostname or socket.gethostname()
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+        self.setDaemon(True)
+
+    def run(self):
+        hostname = self.hostname
+        interval = self.interval
+        send = self.eventer.send
+
+        send("worker-online", hostname=hostname)
+
+        while 1:
+            if self._shutdown.isSet():
+                break
+            send("worker-heartbeat", hostname=hostname)
+            sleep(interval)
+        self._stopped.set()
+
+        send("worker-offline", hostname=hostname)
+
+    def stop(self):
+        """Gracefully shutdown the thread."""
+        self.on_stop()
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done