Browse Source

Adds xmap and xstarmap tasks

Ask Solem 13 years ago
parent
commit
e64d8b7aa0
4 changed files with 48 additions and 8 deletions
  1. 2 1
      celery/__init__.py
  2. 15 2
      celery/app/builtins.py
  3. 3 0
      celery/bin/celery.py
  4. 28 5
      celery/canvas.py

+ 2 - 1
celery/__init__.py

@@ -21,7 +21,8 @@ old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
         "celery.app":       ["Celery", "bugreport"],
         "celery.app.state": ["current_app", "current_task"],
-        "celery.canvas":    ["chain", "chord", "chunks", "group", "subtask"],
+        "celery.canvas":    ["chain", "chord", "chunks",
+                             "group", "subtask", "xmap", "xstarmap"],
         "celery.utils":     ["uuid"],
     },
     direct={"task": "celery.task"},

+ 15 - 2
celery/app/builtins.py

@@ -2,6 +2,8 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
+from itertools import starmap
+
 from celery.utils import uuid
 
 #: global list of functions defining a built-in task.
@@ -75,8 +77,19 @@ def add_map_task(app):
 
     @app.task(name="celery.map")
     def xmap(task, it):
-        task = subtask(task)
-        return [task.type(*item) for item in it]
+        task = subtask(task).type
+        return list(map(task, it))
+
+
+
+@builtin_task
+def add_starmap_task(app):
+    from celery.canvas import subtask
+
+    @app.task(name="celery.starmap")
+    def xstarmap(task, it):
+        task = subtask(task).type
+        return list(starmap(task, it))
 
 
 @builtin_task

+ 3 - 0
celery/bin/celery.py

@@ -415,6 +415,9 @@ class shell(Command):  # pragma: no cover
                        "chord": celery.chord,
                        "group": celery.group,
                        "chain": celery.chain,
+                       "chunks": celery.chunks,
+                       "xmap": celery.xmap,
+                       "xstarmap": celery.xstarmap,
                        "subtask": celery.subtask}
 
         if not without_tasks:

+ 28 - 5
celery/canvas.py

@@ -208,19 +208,42 @@ class chain(Signature):
 Signature.register_type(chain)
 
 
-class xmap(Signature):
+class _basemap(Signature):
+    _task_name = None
     _unpack_args = itemgetter("task", "it")
 
     def __init__(self, task, it, **options):
-        Signature.__init__(self, "celery.map", (),
-                {"task": task, "it": it, "n": n}, **options)
+        Signature.__init__(self, self._task_name, (),
+                {"task": task, "it": regen(it)}, **options)
+
+    def apply_async(self, *args, **kwargs):
+        # need to evaluate generators
+        task, it = self._unpack_args(self.kwargs)
+        return self.type.apply_async((), {"task": task, "it": list(it)})
 
     @classmethod
     def from_dict(self, d):
         return chunks(*self._unpack_args(d["kwargs"]), **d["options"])
+
+
+class xmap(_basemap):
+    _task_name = "celery.map"
+
+    def __repr__(self):
+        task, it = self._unpack_args(self.kwargs)
+        return "[%s(x) for x in %r]" % (task.name, it)
 Signature.register_type(xmap)
 
 
+class xstarmap(_basemap):
+    _task_name = "celery.starmap"
+
+    def __repr__(self):
+        task, it = self._unpack_args(self.kwargs)
+        return "[%s(*x) for x in %r]" % (task.name, it)
+Signature.register_type(xstarmap)
+
+
 class chunks(Signature):
     _unpack_args = itemgetter("task", "it", "n")
 
@@ -237,7 +260,7 @@ class chunks(Signature):
 
     def group(self):
         task, it, n = self._unpack_args(self.kwargs)
-        return group(xmap.s(task, part) for part in _chunks(iter(it), n))
+        return group(xstarmap.s(task, part) for part in _chunks(iter(it), n))
 
     @classmethod
     def apply_chunks(cls, task, it, n):
@@ -278,7 +301,7 @@ class chord(Signature):
 
     def __init__(self, header, body=None, **options):
         Signature.__init__(self, "celery.chord", (),
-                         {"header": list(header),
+                         {"header": regen(header),
                           "body": maybe_subtask(body)}, options)
         self.subtask_type = "chord"