ソースを参照

Better support for link and link_error tasks for chords

Steeve Morin 12 年 前
コミット
173d6f307b
1 ファイル変更15 行追加7 行削除
  1. 15 7
      celery/app/builtins.py

+ 15 - 7
celery/app/builtins.py

@@ -229,7 +229,7 @@ def add_chain_task(app):
             return tasks, results
 
         def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
-                task_id=None, **options):
+                task_id=None, link=None, link_error=None, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
             options.pop('publisher', None)
@@ -242,6 +242,13 @@ def add_chain_task(app):
             if task_id:
                 tasks[-1].set(task_id=task_id)
                 result = tasks[-1].type.AsyncResult(task_id)
+            # make sure we can do a link() and link_error() on a chain object.
+            if link:
+                tasks[-1].set(link=link)
+            # and if any task in the chain fails, call the errbacks
+            if link_error:
+                for task in tasks:
+                    task.set(link_error=link_error)
             tasks[0].apply_async()
             return result
 
@@ -307,16 +314,17 @@ def add_chord_task(app):
         def apply_async(self, args=(), kwargs={}, task_id=None, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
-            group_id = options.pop('group_id', None)
-            chord = options.pop('chord', None)
             header = kwargs.pop('header')
             body = kwargs.pop('body')
             header, body = (list(maybe_subtask(header)),
                             maybe_subtask(body))
-            if group_id:
-                body.set(group_id=group_id)
-            if chord:
-                body.set(chord=chord)
+            # forward certain options to body
+            for opt_name in ['group_id', 'chord']:
+                opt_value = options.pop(opt_name, None)
+                if opt_value:
+                    body.set(**{opt_name: opt_value})
+            map(body.link, options.pop('link', []))
+            map(body.link_error, options.pop('link_error', []))
             callback_id = body.options.setdefault('task_id', task_id or uuid())
             parent = super(Chord, self).apply_async((header, body, args),
                                                      kwargs, **options)