Browse Source

Add countdown option for chord unlock. Closes #1146

Jun Sakai 12 years ago
parent
commit
8f4efe4499
2 changed files with 7 additions and 4 deletions
  1. 2 1
      celery/app/builtins.py
  2. 5 3
      celery/backends/base.py

+ 2 - 1
celery/app/builtins.py

@@ -273,7 +273,7 @@ def add_chord_task(app):
         accept_magic_kwargs = False
         ignore_result = False
 
-        def run(self, header, body, partial_args=(), interval=1,
+        def run(self, header, body, partial_args=(), interval=1, countdown=1,
                 max_retries=None, propagate=False, eager=False, **kwargs):
             group_id = uuid()
             AsyncResult = self.app.AsyncResult
@@ -292,6 +292,7 @@ def add_chord_task(app):
             # - fallback implementations schedules the chord_unlock task here
             app.backend.on_chord_apply(group_id, body,
                                        interval=interval,
+                                       countdown=countdown,
                                        max_retries=max_retries,
                                        propagate=propagate,
                                        result=results)

+ 5 - 3
celery/backends/base.py

@@ -229,10 +229,12 @@ class BaseBackend(object):
     def on_chord_part_return(self, task, propagate=False):
         pass
 
-    def fallback_chord_unlock(self, group_id, body, result=None, **kwargs):
+    def fallback_chord_unlock(self, group_id, body, result=None,
+                              countdown=1, **kwargs):
         kwargs['result'] = [r.id for r in result]
-        self.app.tasks['celery.chord_unlock'].apply_async((group_id, body, ),
-                                                          kwargs, countdown=1)
+        self.app.tasks['celery.chord_unlock'].apply_async(
+            (group_id, body, ), kwargs, countdown=countdown,
+        )
     on_chord_apply = fallback_chord_unlock
 
     def current_task_children(self):