浏览代码

add.chunks(izip(1000, 1000), 100).group().skew(start=1, stop=30, step=2).apply_async(countdown=10)

Ask Solem 13 年之前
父节点
当前提交
7b90e456fd
共有 5 个文件被更改,包括 60 次插入6 次删除
  1. 1 1
      celery/__init__.py
  2. 14 1
      celery/app/builtins.py
  3. 4 0
      celery/app/task.py
  4. 37 4
      celery/canvas.py
  5. 4 0
      celery/local.py

+ 1 - 1
celery/__init__.py

@@ -21,7 +21,7 @@ old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
         "celery.app":       ["Celery", "bugreport"],
         "celery.app.state": ["current_app", "current_task"],
-        "celery.canvas":    ["chain", "chord", "group", "subtask"],
+        "celery.canvas":    ["chain", "chord", "chunks", "group", "subtask"],
         "celery.utils":     ["uuid"],
     },
     direct={"task": "celery.task"},

+ 14 - 1
celery/app/builtins.py

@@ -66,10 +66,23 @@ def add_unlock_chord_task(app):
             subtask(callback).delay(j(propagate=propagate))
         else:
             unlock_chord.retry(countdown=interval, max_retries=max_retries)
-
     return unlock_chord
 
 
+@builtin_task
+def add_chunk_task(app):
+    from celery.canvas import chunks as _chunks, subtask
+
+    @app.task(name="celery.apply_chunk")
+    def apply_chunk(task, part):
+        task = subtask(task)
+        return [task.type(*item) for item in part]
+
+    @app.task(name="celery.chunks")
+    def chunks(task, it, n):
+        return _chunks.apply_chunks(task, it, n)
+
+
 @builtin_task
 def add_group_task(app):
     from celery.canvas import subtask

+ 4 - 0
celery/app/task.py

@@ -709,6 +709,10 @@ class BaseTask(object):
         """``.s(*a, **k) -> .subtask(a, k)``"""
         return self.subtask(args, kwargs)
 
+    def chunks(self, it, n):
+        from celery import chunks
+        return chunks(self.s(), it, n)
+
     def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.
 

+ 37 - 4
celery/canvas.py

@@ -10,14 +10,15 @@
 """
 from __future__ import absolute_import
 
+from operator import itemgetter
 from itertools import chain as _chain
 
-from kombu.utils import kwdict, reprcall
+from kombu.utils import fxrange, kwdict, reprcall
 
 from celery import current_app
-from celery.local import Proxy
+from celery.local import Proxy, regen
 from celery.utils import cached_property, uuid
-from celery.utils.functional import maybe_list, is_list
+from celery.utils.functional import maybe_list, is_list, chunks as _chunks
 from celery.utils.compat import chain_from_iterable
 
 Chord = Proxy(lambda: current_app.tasks["celery.chord"])
@@ -207,10 +208,36 @@ class chain(Signature):
 Signature.register_type(chain)
 
 
+class chunks(Signature):
+    _unpack_args = itemgetter("task", "it", "n")
+
+    def __init__(self, task, it, n, **options):
+        Signature.__init__(self, "celery.chunks", (),
+                {"task": task, "it": it, "n": n}, **options)
+
+    @classmethod
+    def from_dict(self, d):
+        return chunks(*self._unpack_args(d["kwargs"]))
+
+    def __call__(self, **options):
+        return self.group()(**options)
+
+    def group(self):
+        task, it, n = self._unpack_args(self.kwargs)
+        return group(subtask("celery.apply_chunk", (task, part))
+                        for part in _chunks(iter(it), n))
+
+    @classmethod
+    def apply_chunks(cls, task, it, n):
+        return cls(task, it, n)()
+Signature.register_type(chunks)
+
+
 class group(Signature):
 
     def __init__(self, *tasks, **options):
-        tasks = tasks[0] if len(tasks) == 1 and is_list(tasks[0]) else tasks
+        tasks = regen(tasks[0] if len(tasks) == 1 and is_list(tasks[0])
+                               else tasks)
         Signature.__init__(self, "celery.group", (), {"tasks": tasks}, options)
         self.tasks, self.subtask_type = tasks, "group"
 
@@ -223,6 +250,12 @@ class group(Signature):
                                 map(Signature.clone, self.tasks))
         return self.type(tasks, result, gid)
 
+    def skew(self, start=1.0, stop=None, step=1.0):
+        _next_skew = fxrange(start, stop, step, repeatlast=True).next
+        for task in self.tasks:
+            task.set(countdown=_next_skew())
+        return self
+
     def __repr__(self):
         return repr(self.tasks)
 Signature.register_type(group)

+ 4 - 0
celery/local.py

@@ -198,3 +198,7 @@ def maybe_evaluate(obj):
         return obj.__maybe_evaluate__()
     except AttributeError:
         return obj
+
+
+def regen(it):
+    return PromiseProxy(list, (it, ))