Browse Source

EventReceiver now sends heartbeat request to find workers, and celeryev displays them immediately when it starts

Ask Solem 14 years ago
parent
commit
bdd8499b1c
3 changed files with 51 additions and 23 deletions
  1. 36 17
      celery/events/__init__.py
  2. 8 6
      celery/events/cursesmon.py
  3. 7 0
      celery/worker/control/builtins.py

+ 36 - 17
celery/events/__init__.py

@@ -163,30 +163,49 @@ class EventReceiver(object):
         consumer.register_callback(self._receive)
         return consumer
 
-    def capture(self, limit=None, timeout=None):
-        """Open up a consumer capturing events.
-
-        This has to run in the main process, and it will never
-        stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
-
-        """
+    def itercapture(self, limit=None, timeout=None, wakeup=True):
         consumer = self.consumer()
         consumer.consume()
+        if wakeup:
+            self.wakeup_workers(channel=consumer.channel)
+
+        yield consumer
+
         try:
-            for iteration in count(0):
-                if limit and iteration > limit:
-                    break
-                try:
-                    self.connection.drain_events(timeout=timeout)
-                except socket.timeout:
-                    if timeout:
-                        raise
-                except socket.error:
-                    pass
+            self.drain_events(limit=limit, timeout=timeout)
         finally:
             consumer.cancel()
             consumer.channel.close()
 
+    def capture(self, limit=None, timeout=None, wakeup=True):
+        """Open up a consumer capturing events.
+
+        This has to run in the main process, and it will never
+        stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
+
+        """
+        list(self.itercapture(limit=limit,
+                              timeout=timeout,
+                              wakeup=wakeup))
+
+
+    def wakeup_workers(self, channel=None):
+        self.app.control.broadcast("heartbeat",
+                                   connection=self.connection,
+                                   channel=channel)
+
+    def drain_events(self, limit=None, timeout=None):
+        for iteration in count(0):
+            if limit and iteration > limit:
+                break
+            try:
+                self.connection.drain_events(timeout=timeout)
+            except socket.timeout:
+                if timeout:
+                    raise
+            except socket.error:
+                pass
+
     def _receive(self, message_data, message):
         type = message_data.pop("type").lower()
         self.process(type, create_event(type, message_data))

+ 8 - 6
celery/events/cursesmon.py

@@ -17,7 +17,7 @@ class CursesMonitor(object):
     keymap = {}
     win = None
     screen_width = None
-    screen_delay = 0.1
+    screen_delay = 10
     selected_task = None
     selected_position = 0
     selected_str = "Selected: "
@@ -305,7 +305,7 @@ class CursesMonitor(object):
         if self.workers:
             win.addstr(my - 4, x, self.online_str, curses.A_BOLD)
             win.addstr(my - 4, x + len(self.online_str),
-                    ", ".join(self.workers), curses.A_NORMAL)
+                    ", ".join(sorted(self.workers)), curses.A_NORMAL)
         else:
             win.addstr(my - 4, x, "No workers discovered.")
 
@@ -356,7 +356,7 @@ class CursesMonitor(object):
         curses.endwin()
 
     def nap(self):
-        curses.napms(int(self.screen_delay * 1000))
+        curses.napms(self.screen_delay)
 
     @property
     def tasks(self):
@@ -386,14 +386,16 @@ def evtop(app=None):
     sys.stderr.write("-> evtop: starting capture...\n")
     app = app_or_default(app)
     state = app.events.State()
+    conn = app.broker_connection()
+    recv = app.events.Receiver(conn, handlers={"*": state.event})
+    capture = recv.itercapture()
+    consumer = capture.next()
     display = CursesMonitor(state, app=app)
     display.init_screen()
     refresher = DisplayThread(display)
     refresher.start()
-    conn = app.broker_connection()
-    recv = app.events.Receiver(conn, handlers={"*": state.event})
     try:
-        recv.capture(limit=None)
+        capture.next()
     except Exception:
         refresher.shutdown = True
         refresher.join()

+ 7 - 0
celery/worker/control/builtins.py

@@ -40,6 +40,13 @@ def disable_events(panel):
     return {"ok": "events already disabled"}
 
 
+@Panel.register
+def heartbeat(panel):
+    panel.logger.debug("Heartbeat requested by remote.")
+    dispatcher = panel.listener.event_dispatcher
+    dispatcher.send("worker-heartbeat")
+
+
 @Panel.register
 def set_loglevel(panel, loglevel=None):
     if loglevel is not None: