瀏覽代碼

Chords should not propagate errors. Closes #421. Thanks to John Garbutt

Ask Solem 14 年之前
父節點
當前提交
c2a4bbecdc
共有 2 個文件被更改,包括 10 次插入6 次删除
  1. 3 2
      celery/backends/redis.py
  2. 7 4
      celery/task/chords.py

+ 3 - 2
celery/backends/redis.py

@@ -75,14 +75,15 @@ class RedisBackend(KeyValueStoreBackend):
     def on_chord_apply(self, *args, **kwargs):
         pass
 
-    def on_chord_part_return(self, task, keyprefix="chord-unlock-%s"):
+    def on_chord_part_return(self, task, propagate=False,
+            keyprefix="chord-unlock-%s"):
         from celery.task.sets import subtask
         from celery.result import TaskSetResult
         setid = task.request.taskset
         key = keyprefix % setid
         deps = TaskSetResult.restore(setid, backend=task.backend)
         if self.client.incr(key) >= deps.total:
-            subtask(task.request.chord).delay(deps.join())
+            subtask(task.request.chord).delay(deps.join(propagate=propagate))
             deps.delete()
         self.client.expire(key, 86400)
 

+ 7 - 4
celery/task/chords.py

@@ -6,10 +6,11 @@ from celery.task.sets import TaskSet, subtask
 
 
 @current_app.task(name="celery.chord_unlock", max_retries=None)
-def _unlock_chord(setid, callback, interval=1, max_retries=None):
+def _unlock_chord(setid, callback, interval=1, propagate=False,
+        max_retries=None):
     result = TaskSetResult.restore(setid)
     if result.ready():
-        subtask(callback).delay(result.join())
+        subtask(callback).delay(result.join(propagate=propagate))
         result.delete()
     else:
         _unlock_chord.retry(countdown=interval, max_retries=max_retries)
@@ -19,7 +20,8 @@ class Chord(current_app.Task):
     accept_magic_kwargs = False
     name = "celery.chord"
 
-    def run(self, set, body, interval=1, max_retries=None, **kwargs):
+    def run(self, set, body, interval=1, max_retries=None,
+            propagate=False, **kwargs):
         if not isinstance(set, TaskSet):
             set = TaskSet(set)
         r = []
@@ -30,7 +32,8 @@ class Chord(current_app.Task):
             r.append(current_app.AsyncResult(uuid))
         current_app.TaskSetResult(setid, r).save()
         self.backend.on_chord_apply(setid, body, interval,
-                                    max_retries=max_retries)
+                                    max_retries=max_retries,
+                                    propagate=propagate)
         return set.apply_async(taskset_id=setid)