Browse Source

Event Snapshots: Camera.cleanup + Shutter rate limits

Cleanup is used by the Django model camera to delete expired
task/worker state.

Shutter rate limits added (-r|--maxrate option to celeryev).
--frequency controls how often the camera thread wakes up,
while --maxrate controls how often it will actually take the snapshot.
This setting can be an integer (snapshots/s), or a rate limit string
which has the same syntax as the task rate limit strings ("200/m", "10/s",
"1/h", etc)

In the Django camera case, this rate limit can be used to control
how often the snapshots are written to database.

The rate limit is off by default, which means it will take a snapshot
for every --frequency seconds.
Ask Solem 14 years ago
parent
commit
88fad5666b
2 changed files with 42 additions and 15 deletions
  1. 7 4
      celery/bin/celeryev.py
  2. 35 11
      celery/events/snapshot.py

+ 7 - 4
celery/bin/celeryev.py

@@ -18,16 +18,19 @@ OPTION_LIST = (
         help="Recording: Snapshot frequency."),
     optparse.make_option('-x', '--verbose',
         action="store_true", dest="verbose",
-        help="Show more output.")
+        help="Show more output."),
+    optparse.make_option('-r', '--maxrate',
+        action="store", dest="maxrate", default=None,
+        help="Recording: Shutter rate limit (e.g. 10/m)"),
 )
 
 
-def run_celeryev(dump=False, camera=None, frequency=1.0, verbose=False,
-        **kwargs):
+def run_celeryev(dump=False, camera=None, frequency=1.0, maxrate=None,
+        verbose=None, **kwargs):
     if dump:
         return evdump()
     if camera:
-        return evcam(camera, frequency, verbose=verbose)
+        return evcam(camera, frequency, maxrate, verbose=verbose)
     return evtop()
 
 

+ 35 - 11
celery/events/snapshot.py

@@ -2,35 +2,56 @@ import sys
 import time
 import timer2
 
+from celery.datastructures import TokenBucket
 from celery.events import EventReceiver
 from celery.events.state import State
 from celery.messaging import establish_connection
 from celery.utils import instantiate
 from celery.utils.dispatch import Signal
+from celery.utils.timeutils import rate
 
 
 class Polaroid(object):
-    _tref = None
     shutter_signal = Signal(providing_args=("state", ))
+    cleanup_signal = Signal()
+
+    _tref = None
 
-    def __init__(self, state, freq=1.0, verbose=False):
+    def __init__(self, state, freq=1.0, maxrate=None, cleanup_freq=60.0,
+            verbose=False):
         self.state = state
         self.freq = freq
+        self.cleanup_freq = cleanup_freq
         self.verbose = verbose
+        self.maxrate = maxrate and TokenBucket(rate(maxrate))
 
     def install(self):
-        self._tref = timer2.apply_interval(self.freq * 1000.0, self.capture)
+        self._tref = timer2.apply_interval(self.freq * 1000.0,
+                                           self.capture)
+        self._ctref = timer2.apply_interval(self.cleanup_freq * 1000.0,
+                                            self.cleanup)
 
     def on_shutter(self, state):
         pass
 
-    def shutter(self):
+    def on_cleanup(self):
+        pass
+
+    def cleanup(self):
+        self.debug("Cleanup: Running...")
+        self.cleanup_signal.send(None)
+        self.on_cleanup()
+
+    def debug(self, msg):
         if self.verbose:
-            sys.stderr.write("[%s] Shutter: %s\n" % (
-                time.asctime(), self.state))
-        self.shutter_signal.send(self.state)
-        self.on_shutter(self.state)
-        self.state.clear()
+            sys.stderr.write("[%s] %s\n" % (time.asctime(), msg, ))
+
+    def shutter(self):
+        if self.maxrate is None or self.maxrate.can_consume():
+            self.debug("Shutter: %s" % (self.state, ))
+            self.shutter_signal.send(self.state)
+            self.on_shutter(self.state)
+            self.state.clear()
 
     def capture(self):
         return self.state.freeze_while(self.shutter)
@@ -39,6 +60,8 @@ class Polaroid(object):
         if self._tref:
             self._tref()
             self._tref.cancel()
+        if self._ctref:
+            self._ctref.cancel()
 
     def __enter__(self):
         self.install()
@@ -48,12 +71,13 @@ class Polaroid(object):
         self.cancel()
 
 
-def evcam(camera, freq, verbose=False):
+def evcam(camera, freq=1.0, maxrate=None, verbose=False):
     sys.stderr.write(
         "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
             camera, freq))
     state = State()
-    cam = instantiate(camera, state, freq=freq, verbose=verbose)
+    cam = instantiate(camera, state,
+                      freq=freq, maxrate=maxrate, verbose=verbose)
     cam.install()
     conn = establish_connection()
     recv = EventReceiver(conn, handlers={"*": state.event})