Browse Source

chord/group now handles generators properly

Ask Solem 13 years ago
parent
commit
678e3ee5c8
2 changed files with 15 additions and 9 deletions
  1. 14 8
      celery/app/builtins.py
  2. 1 1
      celery/canvas.py

+ 14 - 8
celery/app/builtins.py

@@ -70,13 +70,18 @@ def add_unlock_chord_task(app):
 
 
 @builtin_task
-def add_chunk_task(app):
-    from celery.canvas import chunks as _chunks, subtask
+def add_map_task(app):
+    from celery.canvas import subtask
 
-    @app.task(name="celery.apply_chunk")
-    def apply_chunk(task, part):
+    @app.task(name="celery.map")
+    def xmap(task, it):
         task = subtask(task)
-        return [task.type(*item) for item in part]
+        return [task.type(*item) for item in it]
+
+
+@builtin_task
+def add_chunk_task(app):
+    from celery.canvas import chunks as _chunks
 
     @app.task(name="celery.chunks")
     def chunks(task, it, n):
@@ -126,7 +131,7 @@ def add_group_task(app):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
             tasks, result, gid = self.prepare(options, **kwargs)
-            super(Group, self).apply_async((tasks, result, gid), **options)
+            super(Group, self).apply_async((list(tasks), result, gid), **options)
             return result
 
         def apply(self, args=(), kwargs={}, **options):
@@ -205,10 +210,11 @@ def add_chord_task(app):
         def apply_async(self, args=(), kwargs={}, task_id=None, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
-            body = maybe_subtask(kwargs["body"])
+            header, body = (list(kwargs["header"]),
+                            maybe_subtask(kwargs["body"]))
 
             callback_id = body.options.setdefault("task_id", task_id or uuid())
-            parent = super(Chord, self).apply_async(args, kwargs, **options)
+            parent = super(Chord, self).apply_async((header, body), **options)
             body_result = self.AsyncResult(callback_id)
             body_result.parent = parent
             return body_result

+ 1 - 1
celery/canvas.py

@@ -216,7 +216,7 @@ class xmap(Signature):
                 {"task": task, "it": it, "n": n}, **options)
 
     @classmethod
-    def from_dict(self, d)
+    def from_dict(self, d):
         return chunks(*self._unpack_args(d["kwargs"]), **d["options"])
 Signature.register_type(xmap)