Browse Source

Group chains are now automatically upgraded to chords. E.g. group(...) | s -> chord(group(...), body=s)

Ask Solem 12 years ago
parent
commit
18327a37b9
2 changed files with 31 additions and 15 deletions
  1. 28 14
      celery/app/builtins.py
  2. 3 1
      celery/canvas.py

+ 28 - 14
celery/app/builtins.py

@@ -116,7 +116,7 @@ def add_chunk_task(app):
 @shared_task
 def add_group_task(app):
     _app = app
-    from celery.canvas import subtask
+    from celery.canvas import maybe_subtask, subtask
     from celery.result import from_serializable
 
     class Group(app.Task):
@@ -141,18 +141,21 @@ def add_group_task(app):
             return result
 
         def prepare(self, options, tasks, **kwargs):
-            r = []
             options['group_id'] = group_id = \
                     options.setdefault('task_id', uuid())
-            for task in tasks:
+
+            def prepare_member(task):
+                task = maybe_subtask(task)
                 opts = task.options
                 opts['group_id'] = group_id
                 try:
                     tid = opts['task_id']
                 except KeyError:
                     tid = opts['task_id'] = uuid()
-                r.append(self.AsyncResult(tid))
-            return tasks, self.app.GroupResult(group_id, r), group_id
+                return task, self.AsyncResult(tid)
+
+            tasks, results = zip(*[prepare_member(task) for task in tasks])
+            return tasks, self.app.GroupResult(group_id, results), group_id
 
         def apply_async(self, args=(), kwargs={}, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
@@ -170,7 +173,7 @@ def add_group_task(app):
 
 @shared_task
 def add_chain_task(app):
-    from celery.canvas import maybe_subtask
+    from celery.canvas import chord, group, maybe_subtask
     _app = app
 
     class Chain(app.Task):
@@ -183,17 +186,28 @@ def add_chain_task(app):
                 return self.apply(args, kwargs, **options)
             options.pop('publisher', None)
             group_id = options.pop('group_id', None)
-            chord = options.pop('chord', None)
-            tasks = [maybe_subtask(t).clone(
-                        task_id=options.pop('task_id', uuid()),
-                        **options
-                    )
-                    for t in kwargs['tasks']]
+            chord_id = options.pop('chord', None)
+
+            def prepare_steps(tasks, opts):
+                i, size = 0, len(tasks);
+                while i < size:
+                    sig = maybe_subtask(tasks[i])
+                    task_id = sig.options.get('task_id')
+                    if isinstance(sig, group) and i + 1 < size:
+                        i += 1
+                        sig = chord(sig, body=tasks[i],
+                                    task_id=task_id or uuid(), **opts)
+                    else:
+                        sig = sig.clone(task_id=task_id or uuid(), **opts)
+                    yield sig
+                    i += 1
+
+            tasks = list(prepare_steps(kwargs['tasks'], options))
             reduce(lambda a, b: a.link(b), tasks)
             if group_id:
                 tasks[-1].set(group_id=group_id)
-            if chord:
-                tasks[-1].set(chord=chord)
+            if chord_id:
+                tasks[-1].set(chord=chord_id)
             tasks[0].apply_async()
             results = [task.type.AsyncResult(task.options['task_id'])
                             for task in tasks]

+ 3 - 1
celery/canvas.py

@@ -390,4 +390,6 @@ def subtask(varies, *args, **kwargs):
 
 
 def maybe_subtask(d):
-    return subtask(d) if d is not None and not isinstance(d, Signature) else d
+    if d is not None and isinstance(d, dict) and not isinstance(d, Signature):
+        return subtask(d)
+    return d