Browse Source

Allows to have a "detailed" mode for the Cassandra backend.
Basically the idea is to keep all states using Cassandra wide columns.
New states are then appened to the row as new columns, the last state being the last column.

To use this feature, the Column Family MUST use a TimeUUID as the column name (comparator_type).
For instance, using cassandra-cli:
create column family task_states with comparator = TimeUUIDType;

Because it uses TimeUUIDs, the last column is guaranteed to be the last state.

Steeve Morin 13 years ago
parent
commit
21cdf82fc4
1 changed files with 26 additions and 12 deletions
  1. 26 12
      celery/backends/cassandra.py

+ 26 - 12
celery/backends/cassandra.py

@@ -35,11 +35,12 @@ class CassandraBackend(BaseDictBackend):
     servers = []
     keyspace = None
     column_family = None
+    detailed_mode = False
     _retry_timeout = 300
     _retry_wait = 3
 
     def __init__(self, servers=None, keyspace=None, column_family=None,
-            cassandra_options=None, **kwargs):
+            cassandra_options=None, detailed_mode=False, **kwargs):
         """Initialize Cassandra backend.
 
         Raises :class:`celery.exceptions.ImproperlyConfigured` if
@@ -69,6 +70,9 @@ class CassandraBackend(BaseDictBackend):
         self.cassandra_options = dict(cassandra_options or {},
                                    **self.app.conf.get("CASSANDRA_OPTIONS",
                                                        {}))
+        self.detailed_mode = detailed_mode or \
+                                self.app.conf.get("CASSANDRA_DETAILED_MODE",
+                                                  self.detailed_mode)
         read_cons = self.app.conf.get("CASSANDRA_READ_CONSISTENCY",
                                       "LOCAL_QUORUM")
         write_cons = self.app.conf.get("CASSANDRA_WRITE_CONSISTENCY",
@@ -127,11 +131,16 @@ class CassandraBackend(BaseDictBackend):
             cf = self._get_column_family()
             date_done = self.app.now()
             meta = {"status": status,
-                    "result": self.encode(result),
                     "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
                     "traceback": self.encode(traceback)}
-            cf.insert(task_id, meta,
-                      ttl=timedelta_seconds(self.expires))
+            if self.detailed_mode:
+                meta["result"] = result
+                cf.insert(task_id, {date_done: self.encode(meta)},
+                          ttl=timedelta_seconds(self.expires))
+            else:
+                meta["result"] = self.encode(result)
+                cf.insert(task_id, meta,
+                          ttl=timedelta_seconds(self.expires))
 
         return self._retry_on_error(_do_store)
 
@@ -141,14 +150,19 @@ class CassandraBackend(BaseDictBackend):
         def _do_get():
             cf = self._get_column_family()
             try:
-                obj = cf.get(task_id)
-                meta = {
-                    "task_id": task_id,
-                    "status": obj["status"],
-                    "result": self.decode(obj["result"]),
-                    "date_done": obj["date_done"],
-                    "traceback": self.decode(obj["traceback"]),
-                }
+                if self.detailed_mode:
+                    row = cf.get(task_id, column_reversed=True, column_count=1)
+                    meta = self.decode(row.values()[0])
+                    meta["task_id"] = task_id
+                else:
+                    obj = cf.get(task_id)
+                    meta = {
+                        "task_id": task_id,
+                        "status": obj["status"],
+                        "result": self.decode(obj["result"]),
+                        "date_done": obj["date_done"],
+                        "traceback": self.decode(obj["traceback"]),
+                    }
             except (KeyError, pycassa.NotFoundException):
                 meta = {"status": states.PENDING, "result": None}
             return meta