Browse Source

Statedb improvements

Ask Solem 11 years ago
parent
commit
2aae4e99f9
4 changed files with 92 additions and 39 deletions
  1. 18 7
      celery/tests/worker/test_state.py
  2. 1 1
      celery/worker/components.py
  3. 3 0
      celery/worker/pidbox.py
  4. 70 31
      celery/worker/state.py

+ 18 - 7
celery/tests/worker/test_state.py

@@ -1,5 +1,7 @@
 from __future__ import absolute_import
 
+import pickle
+
 from mock import Mock, patch
 from time import time
 
@@ -72,7 +74,7 @@ class test_maybe_shutdown(Case):
 class test_Persistent(StateResetCase):
 
     def on_setup(self):
-        self.p = MyPersistent(filename='celery-state')
+        self.p = MyPersistent(state, filename='celery-state')
 
     def test_close_twice(self):
         self.p._is_open = False
@@ -94,7 +96,7 @@ class test_Persistent(StateResetCase):
 
     def test_merge(self, data=['foo', 'bar', 'baz']):
         self.add_revoked(*data)
-        self.p.merge(self.p.db)
+        self.p.merge()
         for item in data:
             self.assertIn(item, state.revoked)
 
@@ -102,30 +104,39 @@ class test_Persistent(StateResetCase):
         self.p.clock = Mock()
         self.p.clock.adjust.return_value = 626
         d = {'revoked': {'abc': time()}, 'clock': 313}
-        self.p.merge(d)
+        self.p._merge_with(d)
         self.p.clock.adjust.assert_called_with(313)
         self.assertEqual(d['clock'], 626)
         self.assertIn('abc', state.revoked)
 
     def test_sync_clock_and_purge(self):
+        passthrough = Mock()
+        passthrough.side_effect = lambda x: x
         with patch('celery.worker.state.revoked') as revoked:
             d = {'clock': 0}
             self.p.clock = Mock()
             self.p.clock.forward.return_value = 627
-            self.p.sync(d)
+            self.p._dumps = passthrough
+            self.p.compress = passthrough
+            self.p._sync_with(d)
             revoked.purge.assert_called_with()
             self.assertEqual(d['clock'], 627)
-            self.assertIs(d['revoked'], revoked)
+            self.assertNotIn('revoked', d)
+            self.assertIs(d['zrevoked'], revoked)
 
     def test_sync(self, data1=['foo', 'bar', 'baz'],
                   data2=['baz', 'ini', 'koz']):
         self.add_revoked(*data1)
         for item in data2:
             state.revoked.add(item)
-        self.p.sync(self.p.db)
+        self.p.sync()
 
+        self.assertTrue(self.p.db['zrevoked'])
+        pickled = self.p.decompress(self.p.db['zrevoked'])
+        self.assertTrue(pickled)
+        saved = pickle.loads(pickled)
         for item in data2:
-            self.assertIn(item, self.p.db['revoked'])
+            self.assertIn(item, saved)
 
 
 class SimpleReq(object):

+ 1 - 1
celery/worker/components.py

@@ -183,7 +183,7 @@ class StateDB(bootsteps.Step):
         w._persistence = None
 
     def create(self, w):
-        w._persistence = w.state.Persistent(w.state_db, w.app.clock)
+        w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
         atexit.register(w._persistence.save)
 
 

+ 3 - 0
celery/worker/pidbox.py

@@ -26,8 +26,11 @@ class Pidbox(object):
             handlers=control.Panel.data,
             state=AttributeDict(app=c.app, hostname=c.hostname, consumer=c),
         )
+        self._forward_clock = self.c.app.clock.forward
 
     def on_message(self, body, message):
+        self._forward_clock()  # just increase clock as clients usually don't
+                               # have a valid clock to adjust with.
         try:
             self.node.handle_message(body, message)
         except KeyError as exc:

+ 70 - 31
celery/worker/state.py

@@ -15,8 +15,9 @@ import os
 import sys
 import platform
 import shelve
+import zlib
 
-from kombu.serialization import pickle_protocol
+from kombu.serialization import pickle, pickle_protocol
 from kombu.utils import cached_property
 
 from celery import __version__
@@ -30,11 +31,11 @@ SOFTWARE_INFO = {'sw_ident': 'py-celery',
                  'sw_sys': platform.system()}
 
 #: maximum number of revokes to keep in memory.
-REVOKES_MAX = 10000
+REVOKES_MAX = 50000
 
 #: how many seconds a revoke will be active before
 #: being expired when the max limit has been exceeded.
-REVOKE_EXPIRES = 3600
+REVOKE_EXPIRES = 10800
 
 #: set of all reserved :class:`~celery.worker.job.Request`'s.
 reserved_requests = set()
@@ -140,49 +141,87 @@ class Persistent(object):
     """
     storage = shelve
     protocol = pickle_protocol
+    compress = zlib.compress
+    decompress = zlib.decompress
     _is_open = False
 
-    def __init__(self, filename, clock=None):
+    def __init__(self, state, filename, clock=None):
+        self.state = state
         self.filename = filename
         self.clock = clock
-        self._load()
-
-    def save(self):
-        self.sync(self.db)
-        self.db.sync()
-        self.close()
-
-    def merge(self, d):
-        saved = d.get('revoked') or LimitedSet()
-        if isinstance(saved, LimitedSet):
-            revoked.update(saved)
-        else:
-            # (pre 3.0.18) used to be stored as dict
-            for item in saved:
-                revoked.add(item)
-        if self.clock:
-            d['clock'] = self.clock.adjust(d.get('clock') or 0)
-        return d
-
-    def sync(self, d):
-        revoked.purge()
-        d['revoked'] = revoked
-        if self.clock:
-            d['clock'] = self.clock.forward()
-        return d
+        self.merge()
 
     def open(self):
         return self.storage.open(
             self.filename, protocol=self.protocol, writeback=True,
         )
 
+    def merge(self):
+        self._merge_with(self.db)
+
+    def sync(self):
+        self._sync_with(self.db)
+        self.db.sync()
+
     def close(self):
         if self._is_open:
             self.db.close()
             self._is_open = False
 
-    def _load(self):
-        self.merge(self.db)
+    def save(self):
+        self.sync()
+        self.close()
+
+    def _merge_with(self, d):
+        self._merge_revoked(d)
+        self._merge_clock(d)
+        return d
+
+    def _sync_with(self, d):
+        self._revoked_tasks.purge()
+        d.update(
+            __proto__=3,
+            zrevoked=self.compress(self._dumps(self._revoked_tasks)),
+            clock=self.clock.forward() if self.clock else 0,
+        )
+        return d
+
+    def _merge_clock(self, d):
+        if self.clock:
+            d['clock'] = self.clock.adjust(d.get('clock') or 0)
+
+    def _merge_revoked(self, d):
+        try:
+            self._merge_revoked_v3(d['zrevoked'])
+        except KeyError:
+            try:
+                self._merge_revoked_v2(d.pop('revoked'))
+            except KeyError:
+                pass
+        # purge expired items at boot
+        self._revoked_tasks.purge()
+
+    def _merge_revoked_v3(self, zrevoked):
+        if zrevoked:
+            self._revoked_tasks.update(pickle.loads(self.decompress(zrevoked)))
+
+    def _merge_revoked_v2(self, saved):
+        if not isinstance(saved, LimitedSet):
+            # (pre 3.0.18) used to be stored as a dict
+            return self._merge_revoked_v1(saved)
+        self._revoked_tasks.update(saved)
+
+    def _merge_revoked_v1(self, saved):
+        add = self._revoked_tasks.add
+        for item in saved:
+            add(item)
+
+    def _dumps(self, obj):
+        return pickle.dumps(obj, protocol=self.protocol)
+
+    @property
+    def _revoked_tasks(self):
+        return self.state.revoked
 
     @cached_property
     def db(self):