Browse Source

celeryev now has support for taking snapshots of the cluster state.

django-celery already implements a camera that stores the snapshots
in the database.

(see
http://github.com/ask/django-celery/commit/9b1d1d3e7b0bbd6456834adb0403f2c9b4359a2a)
Ask Solem 14 years ago
parent
commit
2a7e6136e9
2 changed files with 63 additions and 8 deletions
  1. 32 8
      celery/bin/celeryev.py
  2. 31 0
      celery/events/snapshot.py

+ 32 - 8
celery/bin/celeryev.py

@@ -18,6 +18,7 @@ from celery.events import EventReceiver
 from celery.events.state import State
 from celery.messaging import establish_connection
 from celery.datastructures import LocalCache
+from celery.utils import instantiate
 
 TASK_NAMES = LocalCache(0xFFF)
 
@@ -29,6 +30,15 @@ OPTION_LIST = (
     optparse.make_option('-d', '--DUMP',
         action="store_true", dest="dump",
         help="Dump events to stdout."),
+    optparse.make_option('-c', '--camera',
+        action="store", dest="camera",
+        help="Camera class to take event snapshots with."),
+    optparse.make_option('-f', '--frequency', '--freq',
+        action="store", dest="frequency", type="float", default=1.0,
+        help="Recording: Snapshot frequency."),
+    optparse.make_option('-x', '--verbose',
+        action="store_true", dest="verbose",
+        help="Show more output.")
 )
 
 
@@ -456,9 +466,26 @@ class DisplayThread(threading.Thread):
             self.display.nap()
 
 
+def run_camera(camera, freq, verbose=False):
+    sys.stderr.write(
+        "-> celeryev: Taking snapshots with %s (every %s secs.)\n" % (
+            camera, freq))
+    state = State()
+    cam = instantiate(camera, state, freq=freq, verbose=verbose)
+    cam.install()
+    conn = establish_connection()
+    recv = EventReceiver(conn, handlers={"*": state.event})
+    try:
+        recv.capture(limit=None)
+    finally:
+        cam.shutter()
+        conn.close()
+
 def eventtop():
     sys.stderr.write("-> celeryev: starting capture...\n")
     state = State()
+    cam = ModelCamera(state)
+    cam.install()
     display = CursesMonitor(state)
     display.init_screen()
     refresher = DisplayThread(display)
@@ -466,13 +493,7 @@ def eventtop():
     conn = establish_connection()
     recv = EventReceiver(conn, handlers={"*": state.event})
     try:
-        consumer = recv.consumer()
-        consumer.consume()
-        while True:
-            try:
-                conn.connection.drain_events()
-            except socket.timeout:
-                pass
+        recv.capture(limit=None)
     except Exception:
         refresher.shutdown = True
         refresher.join()
@@ -496,9 +517,12 @@ def eventdump():
         conn and conn.close()
 
 
-def run_celeryev(dump=False, **kwargs):
+def run_celeryev(dump=False, camera=None, frequency=1.0, verbose=False,
+        **kwargs):
     if dump:
         return eventdump()
+    if camera:
+        return run_camera(camera, frequency, verbose=verbose)
     return eventtop()
 
 

+ 31 - 0
celery/events/snapshot.py

@@ -0,0 +1,31 @@
+import sys
+import time
+import timer2
+
+from celery.utils.dispatch import Signal
+
+
+class Polaroid(object):
+    shutter_signal = Signal(providing_args=("state", ))
+
+    def __init__(self, state, freq=1.0, verbose=False):
+        self.state = state
+        self.freq = freq
+        self.verbose = verbose
+
+    def install(self):
+        timer2.apply_interval(self.freq * 1000.0, self.capture)
+
+    def on_shutter(self, state):
+        pass
+
+    def shutter(self):
+        if self.verbose:
+            sys.stderr.write("[%s] Shutter: %s\n" % (
+                time.asctime(), self.state))
+        self.shutter_signal.send(self.state)
+        self.on_shutter(self.state)
+        self.state.clear()
+
+    def capture(self):
+        return self.state.freeze_while(self.shutter)