瀏覽代碼

Chord unlock: Retry if deps.ready() raises. Closes #2404

Conflicts:
	celery/app/builtins.py
Ask Solem 10 年之前
父節點
當前提交
139285d8a4
共有 1 個文件被更改,包括 36 次插入29 次删除
  1. 36 29
      celery/app/builtins.py

+ 36 - 29
celery/app/builtins.py

@@ -51,8 +51,9 @@ def add_unlock_chord_task(app):
     default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
     @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
-              default_retry_delay=1, ignore_result=True, _force_evaluate=True)
-    def unlock_chord(group_id, callback, interval=None, propagate=None,
+              default_retry_delay=1, ignore_result=True, _force_evaluate=True,
+              bind=True)
+    def unlock_chord(self, group_id, callback, interval=None, propagate=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
                      result_from_tuple=result_from_tuple):
@@ -63,44 +64,50 @@ def add_unlock_chord_task(app):
         # exception set to ChordError.
         propagate = default_propagate if propagate is None else propagate
         if interval is None:
-            interval = unlock_chord.default_retry_delay
+            interval = self.default_retry_delay
 
         # check if the task group is ready, and if so apply the callback.
         deps = GroupResult(
             group_id,
             [result_from_tuple(r, app=app) for r in result],
+            app=app,
         )
         j = deps.join_native if deps.supports_native_join else deps.join
 
-        if deps.ready():
-            callback = signature(callback, app=app)
+        try:
+            ready = deps.ready()
+        except Exception as exc:
+            raise self.retry(
+                exc=exc, countdown=interval, max_retries=max_retries,
+            )
+        else:
+            if not ready:
+                raise self.retry(countdown=interval, max_retries=max_retries)
+
+        callback = signature(callback, app=app)
+        try:
+            with allow_join_result():
+                ret = j(timeout=3.0, propagate=propagate)
+        except Exception as exc:
             try:
-                with allow_join_result():
-                    ret = j(timeout=3.0, propagate=propagate)
+                culprit = next(deps._failed_join_report())
+                reason = 'Dependency {0.id} raised {1!r}'.format(
+                    culprit, exc,
+                )
+            except StopIteration:
+                reason = repr(exc)
+            logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
+            app.backend.chord_error_from_stack(callback,
+                                               ChordError(reason))
+        else:
+            try:
+                callback.delay(ret)
             except Exception as exc:
-                try:
-                    culprit = next(deps._failed_join_report())
-                    reason = 'Dependency {0.id} raised {1!r}'.format(
-                        culprit, exc,
-                    )
-                except StopIteration:
-                    reason = repr(exc)
                 logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
-                app.backend.chord_error_from_stack(callback,
-                                                   ChordError(reason))
-            else:
-                try:
-                    callback.delay(ret)
-                except Exception as exc:
-                    logger.error('Chord %r raised: %r', group_id, exc,
-                                 exc_info=1)
-                    app.backend.chord_error_from_stack(
-                        callback,
-                        exc=ChordError('Callback error: {0!r}'.format(exc)),
-                    )
-        else:
-            raise unlock_chord.retry(countdown=interval,
-                                     max_retries=max_retries)
+                app.backend.chord_error_from_stack(
+                    callback,
+                    exc=ChordError('Callback error: {0!r}'.format(exc)),
+                )
     return unlock_chord