Browse Source

Better chord unlock error handling for kvstores

Ask Solem 12 years ago
parent
commit
9193ca3e5b
2 changed files with 17 additions and 6 deletions
  1. 2 1
      celery/app/builtins.py
  2. 15 5
      celery/backends/base.py

+ 2 - 1
celery/app/builtins.py

@@ -112,7 +112,8 @@ def add_unlock_chord_task(app):
                 except Exception, exc:
                     app._tasks[callback.task].backend.fail_from_current_stack(
                         callback.id,
-                        exc=ChordError('Call callback error: %r' % (exc, )))
+                        exc=ChordError('Callback error: %r' % (exc, )),
+                    )
         else:
             return unlock_chord.retry(countdown=interval,
                                       max_retries=max_retries)

+ 15 - 5
celery/backends/base.py

@@ -485,6 +485,7 @@ class KeyValueStoreBackend(BaseDictBackend):
             return
         from celery import subtask
         from celery.result import GroupResult
+        app = self.app
         if propagate is None:
             propagate = self.app.conf.CELERY_CHORD_PROPAGATES
         gid = task.request.group
@@ -499,13 +500,22 @@ class KeyValueStoreBackend(BaseDictBackend):
             try:
                 ret = j(propagate=propagate)
             except Exception, exc:
-                culprit = deps._failed_join_report().next()
-                self.app._tasks[callback.task].backend.fail_from_current_stack(
-                    callback.id, exc=ChordError('Dependency %s raised %r' % (
-                        culprit.id, exc))
+                try:
+                    culprit = deps._failed_join_report().next()
+                    reason = 'Dependency %s raised %r' % (culprit.id, exc)
+                except StopIteration:
+                    reason = repr(exc)
+                app._tasks[callback.task].backend.fail_from_current_stack(
+                    callback.id, exc=ChordError(reason),
                 )
             else:
-                callback.delay(ret)
+                try:
+                    callback.delay(ret)
+                except Exception, exc:
+                    app._tasks[callback.task].backend.fail_from_current_stack(
+                        callback.id,
+                        exc=ChordError('Callback error: %r' % (exc, )),
+                    )
             finally:
                 deps.delete()
                 self.client.delete(key)