|
@@ -88,24 +88,22 @@ class CassandraBackend(BaseDictBackend):
|
|
|
|
|
|
self._column_family = None
|
|
|
|
|
|
- def _retry_on_error(self, func):
|
|
|
- def wrapper(*args, **kwargs):
|
|
|
- self = args[0]
|
|
|
- ts = time.time() + self._retry_timeout
|
|
|
- while 1:
|
|
|
- try:
|
|
|
- return func(*args, **kwargs)
|
|
|
- except (pycassa.InvalidRequestException,
|
|
|
- pycassa.TimedOutException,
|
|
|
- pycassa.UnavailableException,
|
|
|
- socket.error,
|
|
|
- socket.timeout,
|
|
|
- Thrift.TException), exc:
|
|
|
- self.logger.warn('Cassandra error: %s. Retrying...' % exc)
|
|
|
- if time.time() > ts:
|
|
|
- raise
|
|
|
- time.sleep(self._retry_wait)
|
|
|
- return wrapper
|
|
|
+ def _retry_on_error(self, fun):
|
|
|
+ self = args[0]
|
|
|
+ ts = time.time() + self._retry_timeout
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ return fun(*args, **kwargs)
|
|
|
+ except (pycassa.InvalidRequestException,
|
|
|
+ pycassa.TimedOutException,
|
|
|
+ pycassa.UnavailableException,
|
|
|
+ socket.error,
|
|
|
+ socket.timeout,
|
|
|
+ Thrift.TException), exc:
|
|
|
+ if time.time() > ts:
|
|
|
+ raise
|
|
|
+ self.logger.warn('Cassandra error: %r. Retrying...' % (exc, ))
|
|
|
+ time.sleep(self._retry_wait)
|
|
|
|
|
|
def _get_column_family(self):
|
|
|
if self._column_family is None:
|
|
@@ -121,30 +119,37 @@ class CassandraBackend(BaseDictBackend):
|
|
|
if self._column_family is not None:
|
|
|
self._column_family = None
|
|
|
|
|
|
- @_retry_on_error
|
|
|
def _store_result(self, task_id, result, status, traceback=None):
|
|
|
"""Store return value and status of an executed task."""
|
|
|
- cf = self._get_column_family()
|
|
|
- date_done = datetime.utcnow()
|
|
|
- meta = {"status": status,
|
|
|
- "result": pickle.dumps(result),
|
|
|
- "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
|
- "traceback": pickle.dumps(traceback)}
|
|
|
- cf.insert(task_id, meta, ttl=timedelta_seconds(self.result_expires))
|
|
|
-
|
|
|
- @_retry_on_error
|
|
|
+
|
|
|
+ def _do_store():
|
|
|
+ cf = self._get_column_family()
|
|
|
+ date_done = datetime.utcnow()
|
|
|
+ meta = {"status": status,
|
|
|
+ "result": pickle.dumps(result),
|
|
|
+ "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
|
+ "traceback": pickle.dumps(traceback)}
|
|
|
+ cf.insert(task_id, meta,
|
|
|
+ ttl=timedelta_seconds(self.result_expires))
|
|
|
+
|
|
|
+ return self._retry_on_error(_do_store)
|
|
|
+
|
|
|
def _get_task_meta_for(self, task_id):
|
|
|
"""Get task metadata for a task by id."""
|
|
|
- cf = self._get_column_family()
|
|
|
- try:
|
|
|
- obj = cf.get(task_id)
|
|
|
- meta = {
|
|
|
- "task_id": task_id,
|
|
|
- "status": obj["status"],
|
|
|
- "result": pickle.loads(str(obj["result"])),
|
|
|
- "date_done": obj["date_done"],
|
|
|
- "traceback": pickle.loads(str(obj["traceback"])),
|
|
|
- }
|
|
|
- except (KeyError, pycassa.NotFoundException):
|
|
|
- meta = {"status": states.PENDING, "result": None}
|
|
|
- return meta
|
|
|
+
|
|
|
+ def _do_get():
|
|
|
+ cf = self._get_column_family()
|
|
|
+ try:
|
|
|
+ obj = cf.get(task_id)
|
|
|
+ meta = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "status": obj["status"],
|
|
|
+ "result": pickle.loads(str(obj["result"])),
|
|
|
+ "date_done": obj["date_done"],
|
|
|
+ "traceback": pickle.loads(str(obj["traceback"])),
|
|
|
+ }
|
|
|
+ except (KeyError, pycassa.NotFoundException):
|
|
|
+ meta = {"status": states.PENDING, "result": None}
|
|
|
+ return meta
|
|
|
+
|
|
|
+ return self._retry_on_error(_do_get)
|