Browse Source

Merge branch 'pantheon-systems/master'

Ask Solem 14 years ago
parent
commit
94b8975872
2 changed files with 4 additions and 39 deletions
  1. 1 1
      celery/backends/base.py
  2. 3 38
      celery/backends/cassandra.py

+ 1 - 1
celery/backends/base.py

@@ -26,7 +26,7 @@ class BaseBackend(object):
         else:
             return self.prepare_value(result)
 
-    def store_result(self, task_id, result, status):
+    def store_result(self, task_id, result, status, traceback=None):
         """Store the result and status of a task."""
         raise NotImplementedError(
                 "store_result is not supported by this backend.")

+ 3 - 38
celery/backends/cassandra.py

@@ -6,8 +6,6 @@ try:
 except ImportError:
     pycassa = None
 
-import itertools
-import random
 import socket
 import time
 
@@ -36,9 +34,6 @@ class CassandraBackend(BaseDictBackend):
     column_family = None
     _retry_timeout = 300
     _retry_wait = 3
-    _index_shards = 64
-    _index_keys = ["celery.results.index!%02x" % i
-                        for i in range(_index_shards)]
 
     def __init__(self, servers=None, keyspace=None, column_family=None,
             cassandra_options=None, **kwargs):
@@ -93,7 +88,7 @@ class CassandraBackend(BaseDictBackend):
 
         self._column_family = None
 
-    def _retry_on_error(func):
+    def _retry_on_error(self, func):
         def wrapper(*args, **kwargs):
             self = args[0]
             ts = time.time() + self._retry_timeout
@@ -131,16 +126,12 @@ class CassandraBackend(BaseDictBackend):
         """Store return value and status of an executed task."""
         cf = self._get_column_family()
         date_done = datetime.utcnow()
-        index_key = 'celery.results.index!%02x' % (
-                random.randrange(self._index_shards))
-        index_column_name = '%8x!%s' % (time.mktime(date_done.timetuple()),
-                                        task_id)
+        lifetime = self.result_expires.days * 86400 + self.result_expires.seconds
         meta = {"status": status,
                 "result": pickle.dumps(result),
                 "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
                 "traceback": pickle.dumps(traceback)}
-        cf.insert(task_id, meta)
-        cf.insert(index_key, {index_column_name: status})
+        cf.insert(task_id, meta, ttl=lifetime)
 
     @_retry_on_error
     def _get_task_meta_for(self, task_id):
@@ -158,29 +149,3 @@ class CassandraBackend(BaseDictBackend):
         except (KeyError, pycassa.NotFoundException):
             meta = {"status": states.PENDING, "result": None}
         return meta
-
-    def cleanup(self):
-        """Delete expired metadata."""
-        self.logger.debug('Running cleanup...')
-        expires = datetime.utcnow() - self.result_expires
-        end_column = '%8x"' % (time.mktime(expires.timetuple()))
-
-        cf = self._get_column_family()
-        column_parent = C.ColumnParent(cf.column_family)
-        slice_pred = C.SlicePredicate(
-                            slice_range=C.SliceRange('', end_column,
-                                                     count=2 ** 30))
-        columns = cf.client.multiget_slice(cf.keyspace, self._index_keys,
-                                           column_parent, slice_pred,
-                                           self.read_consistency)
-
-        index_cols = [c.column.name
-                        for c in itertools.chain(*columns.values())]
-        for k in self._index_keys:
-            cf.remove(k, index_cols)
-
-        task_ids = [c[9:] for c in index_cols]
-        for k in task_ids:
-            cf.remove(k)
-
-        self.logger.debug('Cleaned %i expired results' % len(task_ids))