Parcourir la source

Adds built-in xmap task

Ask Solem il y a 13 ans
Parent
commit
03756079a0
1 fichiers modifiés avec 16 ajouts et 4 suppressions
  1. 16 4
      celery/canvas.py

+ 16 - 4
celery/canvas.py

@@ -208,24 +208,36 @@ class chain(Signature):
 Signature.register_type(chain)
 
 
+class xmap(Signature):
+    _unpack_args = itemgetter("task", "it")
+
+    def __init__(self, task, it, **options):
+        Signature.__init__(self, "celery.map", (),
+                {"task": task, "it": it, "n": n}, **options)
+
+    @classmethod
+    def from_dict(self, d)
+        return chunks(*self._unpack_args(d["kwargs"]), **d["options"])
+Signature.register_type(xmap)
+
+
 class chunks(Signature):
     _unpack_args = itemgetter("task", "it", "n")
 
     def __init__(self, task, it, n, **options):
         Signature.__init__(self, "celery.chunks", (),
-                {"task": task, "it": it, "n": n}, **options)
+                {"task": task, "it": regen(it), "n": n}, **options)
 
     @classmethod
     def from_dict(self, d):
-        return chunks(*self._unpack_args(d["kwargs"]))
+        return chunks(*self._unpack_args(d["kwargs"]), **d["options"])
 
     def __call__(self, **options):
         return self.group()(**options)
 
     def group(self):
         task, it, n = self._unpack_args(self.kwargs)
-        return group(subtask("celery.apply_chunk", (task, part))
-                        for part in _chunks(iter(it), n))
+        return group(xmap.s(task, part) for part in _chunks(iter(it), n))
 
     @classmethod
     def apply_chunks(cls, task, it, n):