Переглянути джерело

Chord unlock: Retry if deps.ready() raises. Closes #2404

Ask Solem 10 роки тому
батько
коміт
63dcb10179
1 змінених файлів з 35 додано та 29 видалено
  1. 35 29
      celery/app/builtins.py

+ 35 - 29
celery/app/builtins.py

@@ -57,8 +57,8 @@ def add_unlock_chord_task(app):
     default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
     @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
-              default_retry_delay=1, ignore_result=True, lazy=False)
-    def unlock_chord(group_id, callback, interval=None, propagate=None,
+              default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
+    def unlock_chord(self, group_id, callback, interval=None, propagate=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
                      result_from_tuple=result_from_tuple):
@@ -69,45 +69,51 @@ def add_unlock_chord_task(app):
         # exception set to ChordError.
         propagate = default_propagate if propagate is None else propagate
         if interval is None:
-            interval = unlock_chord.default_retry_delay
+            interval = self.default_retry_delay
 
         # check if the task group is ready, and if so apply the callback.
         callback = maybe_signature(callback, app)
         deps = GroupResult(
             group_id,
             [result_from_tuple(r, app=app) for r in result],
+            app=app,
         )
         j = deps.join_native if deps.supports_native_join else deps.join
 
-        if deps.ready():
-            callback = maybe_signature(callback, app=app)
+        try:
+            ready = deps.ready()
+        except Exception as exc:
+            raise self.retry(
+                    exc=exc, countdown=interval, max_retries=max_retries,
+                )
+        else:
+            if not ready:
+                raise self.retry(countdown=interval, max_retries=max_retries)
+
+        callback = maybe_signature(callback, app=app)
+        try:
+            with allow_join_result():
+                ret = j(timeout=3.0, propagate=propagate)
+        except Exception as exc:
+            try:
+                culprit = next(deps._failed_join_report())
+                reason = 'Dependency {0.id} raised {1!r}'.format(
+                    culprit, exc,
+                )
+            except StopIteration:
+                reason = repr(exc)
+            logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
+            app.backend.chord_error_from_stack(callback,
+                                               ChordError(reason))
+        else:
             try:
-                with allow_join_result():
-                    ret = j(timeout=3.0, propagate=propagate)
+                callback.delay(ret)
             except Exception as exc:
-                try:
-                    culprit = next(deps._failed_join_report())
-                    reason = 'Dependency {0.id} raised {1!r}'.format(
-                        culprit, exc,
-                    )
-                except StopIteration:
-                    reason = repr(exc)
                 logger.error('Chord %r raised: %r', group_id, exc, exc_info=1)
-                app.backend.chord_error_from_stack(callback,
-                                                   ChordError(reason))
-            else:
-                try:
-                    callback.delay(ret)
-                except Exception as exc:
-                    logger.error('Chord %r raised: %r', group_id, exc,
-                                 exc_info=1)
-                    app.backend.chord_error_from_stack(
-                        callback,
-                        exc=ChordError('Callback error: {0!r}'.format(exc)),
-                    )
-        else:
-            raise unlock_chord.retry(countdown=interval,
-                                     max_retries=max_retries)
+                app.backend.chord_error_from_stack(
+                    callback,
+                    exc=ChordError('Callback error: {0!r}'.format(exc)),
+                )
     return unlock_chord