|
@@ -33,11 +33,12 @@ class CassandraBackend(BaseDictBackend):
|
|
|
servers = []
|
|
|
keyspace = None
|
|
|
column_family = None
|
|
|
+ detailed_mode = False
|
|
|
_retry_timeout = 300
|
|
|
_retry_wait = 3
|
|
|
|
|
|
def __init__(self, servers=None, keyspace=None, column_family=None,
|
|
|
- cassandra_options=None, **kwargs):
|
|
|
+ cassandra_options=None, detailed_mode=False, **kwargs):
|
|
|
"""Initialize Cassandra backend.
|
|
|
|
|
|
Raises :class:`celery.exceptions.ImproperlyConfigured` if
|
|
@@ -67,6 +68,9 @@ class CassandraBackend(BaseDictBackend):
|
|
|
self.cassandra_options = dict(cassandra_options or {},
|
|
|
**self.app.conf.get("CASSANDRA_OPTIONS",
|
|
|
{}))
|
|
|
+ self.detailed_mode = detailed_mode or \
|
|
|
+ self.app.conf.get("CASSANDRA_DETAILED_MODE",
|
|
|
+ self.detailed_mode)
|
|
|
read_cons = self.app.conf.get("CASSANDRA_READ_CONSISTENCY",
|
|
|
"LOCAL_QUORUM")
|
|
|
write_cons = self.app.conf.get("CASSANDRA_WRITE_CONSISTENCY",
|
|
@@ -126,11 +130,16 @@ class CassandraBackend(BaseDictBackend):
|
|
|
cf = self._get_column_family()
|
|
|
date_done = self.app.now()
|
|
|
meta = {"status": status,
|
|
|
- "result": self.encode(result),
|
|
|
"date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
|
"traceback": self.encode(traceback)}
|
|
|
- cf.insert(task_id, meta,
|
|
|
- ttl=timedelta_seconds(self.expires))
|
|
|
+ if self.detailed_mode:
|
|
|
+ meta["result"] = result
|
|
|
+ cf.insert(task_id, {date_done: self.encode(meta)},
|
|
|
+ ttl=timedelta_seconds(self.expires))
|
|
|
+ else:
|
|
|
+ meta["result"] = self.encode(result)
|
|
|
+ cf.insert(task_id, meta,
|
|
|
+ ttl=timedelta_seconds(self.expires))
|
|
|
|
|
|
return self._retry_on_error(_do_store)
|
|
|
|
|
@@ -140,14 +149,19 @@ class CassandraBackend(BaseDictBackend):
|
|
|
def _do_get():
|
|
|
cf = self._get_column_family()
|
|
|
try:
|
|
|
- obj = cf.get(task_id)
|
|
|
- meta = {
|
|
|
- "task_id": task_id,
|
|
|
- "status": obj["status"],
|
|
|
- "result": self.decode(obj["result"]),
|
|
|
- "date_done": obj["date_done"],
|
|
|
- "traceback": self.decode(obj["traceback"]),
|
|
|
- }
|
|
|
+ if self.detailed_mode:
|
|
|
+ row = cf.get(task_id, column_reversed=True, column_count=1)
|
|
|
+ meta = self.decode(row.values()[0])
|
|
|
+ meta["task_id"] = task_id
|
|
|
+ else:
|
|
|
+ obj = cf.get(task_id)
|
|
|
+ meta = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "status": obj["status"],
|
|
|
+ "result": self.decode(obj["result"]),
|
|
|
+ "date_done": obj["date_done"],
|
|
|
+ "traceback": self.decode(obj["traceback"]),
|
|
|
+ }
|
|
|
except (KeyError, pycassa.NotFoundException):
|
|
|
meta = {"status": states.PENDING, "result": None}
|
|
|
return meta
|