Browse Source

Use Cassandra's built-in TTL support for result cleanup.

David Strauss 14 years ago
parent
commit
28bde61e09
1 changed files with 2 additions and 37 deletions
  1. 2 37
      celery/backends/cassandra.py

+ 2 - 37
celery/backends/cassandra.py

@@ -6,8 +6,6 @@ try:
 except ImportError:
     pycassa = None
 
-import itertools
-import random
 import socket
 import time
 
@@ -36,9 +34,6 @@ class CassandraBackend(BaseDictBackend):
     column_family = None
     _retry_timeout = 300
     _retry_wait = 3
-    _index_shards = 64
-    _index_keys = ["celery.results.index!%02x" % i
-                        for i in range(_index_shards)]
 
     def __init__(self, servers=None, keyspace=None, column_family=None,
             cassandra_options=None, **kwargs):
@@ -131,16 +126,12 @@ class CassandraBackend(BaseDictBackend):
         """Store return value and status of an executed task."""
         cf = self._get_column_family()
         date_done = datetime.utcnow()
-        index_key = 'celery.results.index!%02x' % (
-                random.randrange(self._index_shards))
-        index_column_name = '%8x!%s' % (time.mktime(date_done.timetuple()),
-                                        task_id)
+        lifetime = self.result_expires.days * 86400 + self.result_expires.seconds
         meta = {"status": status,
                 "result": pickle.dumps(result),
                 "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
                 "traceback": pickle.dumps(traceback)}
-        cf.insert(task_id, meta)
-        cf.insert(index_key, {index_column_name: status})
+        cf.insert(task_id, meta, ttl=lifetime)
 
     @_retry_on_error
     def _get_task_meta_for(self, task_id):
@@ -158,29 +149,3 @@ class CassandraBackend(BaseDictBackend):
         except (KeyError, pycassa.NotFoundException):
             meta = {"status": states.PENDING, "result": None}
         return meta
-
-    def cleanup(self):
-        """Delete expired metadata."""
-        self.logger.debug('Running cleanup...')
-        expires = datetime.utcnow() - self.result_expires
-        end_column = '%8x"' % (time.mktime(expires.timetuple()))
-
-        cf = self._get_column_family()
-        column_parent = C.ColumnParent(cf.column_family)
-        slice_pred = C.SlicePredicate(
-                            slice_range=C.SliceRange('', end_column,
-                                                     count=2 ** 30))
-        columns = cf.client.multiget_slice(cf.keyspace, self._index_keys,
-                                           column_parent, slice_pred,
-                                           self.read_consistency)
-
-        index_cols = [c.column.name
-                        for c in itertools.chain(*columns.values())]
-        for k in self._index_keys:
-            cf.remove(k, index_cols)
-
-        task_ids = [c[9:] for c in index_cols]
-        for k in task_ids:
-            cf.remove(k)
-
-        self.logger.debug('Cleaned %i expired results' % len(task_ids))