Browse Source

Chords now calls callback.link_error

Ask Solem 11 years ago
parent
commit
d7bb6fc60f
2 changed files with 29 additions and 21 deletions
  1. 4 6
      celery/app/builtins.py
  2. 25 15
      celery/backends/base.py

+ 4 - 6
celery/app/builtins.py

@@ -105,16 +105,14 @@ def add_unlock_chord_task(app):
                     )
                 except StopIteration:
                     reason = repr(exc)
-
-                app._tasks[callback.task].backend.fail_from_current_stack(
-                    callback.id, exc=ChordError(reason),
-                )
+                app.backend.chord_error_from_stack(callback,
+                                                    ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
-                    app._tasks[callback.task].backend.fail_from_current_stack(
-                        callback.id,
+                    app.backend.chord_error_from_stack(
+                        callback,
                         exc=ChordError('Callback error: {0!r}'.format(exc)),
                     )
         else:

+ 25 - 15
celery/backends/base.py

@@ -111,6 +111,21 @@ class BaseBackend(object):
         return self.store_result(task_id, exc, status=states.FAILURE,
                                  traceback=traceback, request=request)
 
+    def chord_error_from_stack(self, callback, exc=None):
+        from celery import group
+        app = self.app
+        backend = app._tasks[callback.task].backend
+        try:
+            group(
+                [app.signature(errback)
+                 for errback in callback.options.get('link_error') or []],
+                app=app,
+            ).apply_async((callback.id, ))
+        except Exception as eb_exc:
+            return backend.fail_from_current_stack(callback.id, exc=eb_exc)
+        else:
+            return backend.fail_from_current_stack(callback.id, exc=exc)
+
     def fail_from_current_stack(self, task_id, exc=None):
         type_, real_exc, tb = sys.exc_info()
         try:
@@ -502,21 +517,18 @@ class KeyValueStoreBackend(BaseBackend):
             deps = GroupResult.restore(gid, backend=task.backend)
         except Exception as exc:
             callback = maybe_signature(task.request.chord, app=self.app)
-            return app._tasks[callback.task].backend.fail_from_current_stack(
-                callback.id,
-                exc=ChordError('Cannot restore group: {0!r}'.format(exc)),
+            return self.chord_error_from_stack(
+                callback,
+                ChordError('Cannot restore group: {0!r}'.format(exc)),
             )
         if deps is None:
             try:
                 raise ValueError(gid)
             except ValueError as exc:
                 callback = maybe_signature(task.request.chord, app=self.app)
-                task = app._tasks[callback.task]
-                return task.backend.fail_from_current_stack(
-                    callback.id,
-                    exc=ChordError('GroupResult {0} no longer exists'.format(
-                        gid,
-                    ))
+                return self.chord_error_from_stack(
+                    callback,
+                    ChordError('GroupResult {0} no longer exists'.format(gid)),
                 )
         val = self.incr(key)
         if val >= len(deps):
@@ -534,16 +546,14 @@ class KeyValueStoreBackend(BaseBackend):
                 except StopIteration:
                     reason = repr(exc)
 
-                app._tasks[callback.task].backend.fail_from_current_stack(
-                    callback.id, exc=ChordError(reason),
-                )
+                self.chord_error_from_stack(callback, ChordError(reason))
             else:
                 try:
                     callback.delay(ret)
                 except Exception as exc:
-                    app._tasks[callback.task].backend.fail_from_current_stack(
-                        callback.id,
-                        exc=ChordError('Callback error: {0!r}'.format(exc)),
+                    self.chord_error_from_stack(
+                        callback,
+                        ChordError('Callback error: {0!r}'.format(exc)),
                     )
             finally:
                 deps.delete()