Browse Source

chord: Can now specify the execution options of the chord task, and the interval and max_retries arguments of the unlock task

Ask Solem 14 years ago
parent
commit
e4c7d30991
3 changed files with 12 additions and 8 deletions
  1. 3 2
      celery/backends/base.py
  2. 1 1
      celery/backends/pyredis.py
  3. 8 5
      celery/task/chord.py

+ 3 - 2
celery/backends/base.py

@@ -146,9 +146,10 @@ class BaseBackend(object):
     def on_chord_part_return(self, task):
         pass
 
-    def on_chord_apply(self, setid, body):
+    def on_chord_apply(self, setid, body, **kwargs):
         from celery.registry import tasks
-        tasks["celery.chord_unlock"].apply_async((setid, body, ), countdown=1)
+        tasks["celery.chord_unlock"].apply_async((setid, body, ), kwargs,
+                                                 countdown=1)
 
     def __reduce__(self):
         return (self.__class__, ())

+ 1 - 1
celery/backends/pyredis.py

@@ -81,7 +81,7 @@ class RedisBackend(KeyValueStoreBackend):
     def process_cleanup(self):
         self.close()
 
-    def on_chord_apply(self, setid, body):
+    def on_chord_apply(self, *args, **kwargs):
         pass
 
     def on_chord_part_return(self, task, keyprefix="chord-unlock-%s"):

+ 8 - 5
celery/task/chord.py

@@ -14,9 +14,10 @@ def _unlock_chord(setid, callback, interval=1, max_retries=None):
 
 
 class Chord(current_app.Task):
+    accept_magic_kwargs = False
     name = "celery.chord"
 
-    def run(self, set, body):
+    def run(self, set, body, max_retries=None, interval=1, **kwargs):
         if not isinstance(set, TaskSet):
             set = TaskSet(set)
         r = []
@@ -26,17 +27,19 @@ class Chord(current_app.Task):
             task.options.update(task_id=uuid, chord=body)
             r.append(current_app.AsyncResult(uuid))
         current_app.TaskSetResult(setid, r).save()
-        self.backend.on_chord_apply(setid, body)
+        self.backend.on_chord_apply(setid, body, max_retries, interval)
         return set.apply_async(taskset_id=setid)
 
 
 class chord(object):
     Chord = Chord
 
-    def __init__(self, tasks):
+    def __init__(self, tasks, **options):
         self.tasks = tasks
+        self.options = options
 
-    def __call__(self, body):
+    def __call__(self, body, **options):
         uuid = body.options.setdefault("task_id", gen_unique_id())
-        self.Chord.apply_async((list(self.tasks), body))
+        self.Chord.apply_async((list(self.tasks), body), self.options,
+                                **options)
         return body.type.app.AsyncResult(uuid)