Browse Source

Adds taskset callback examples to taskset userguide

Ask Solem 14 years ago
parent
commit
fef57c192c
1 changed files with 78 additions and 1 deletions
  1. 78 1
      docs/userguide/tasksets.rst

+ 78 - 1
docs/userguide/tasksets.rst

@@ -99,7 +99,7 @@ A task set takes a list of :class:`~celery.task.sets.subtask`'s::
     ... ])
 
     >>> result = job.apply_async()
-    
+
     >>> result.ready()  # have all subtasks completed?
     True
     >>> result.successful() # were all subtasks successful?
@@ -158,3 +158,80 @@ It supports the following operations:
     Gather the results for all of the subtasks
     and return a list with them ordered by the order of which they
     were called.
+
+
+Task set callbacks
+------------------
+
+
+Simple, but may take a long time before your callback is called:
+
+
+.. code-block:: python
+    from celery import current_app
+    from celery.task import subtask
+
+    def join_taskset(setid, subtasks, callback, interval=15, max_retries=None):
+        result = TaskSetResult(setid, subtasks)
+        if result.ready():
+            return subtask(callback).delay(result.join())
+        join_taskset.retry(countdown=interval, max_retries=max_retries)
+
+
+
+Using Redis and atomic counters:
+
+
+.. code-block:: python
+    from celery import current_app
+    from celery.task import Task, TaskSet
+    from celery.result import TaskSetResult
+    from celery.utils import gen_unique_id, cached_property
+    from redis import Redis
+    from time import sleep
+
+    class supports_taskset_callback(Task):
+        abstract = True
+        accept_magic_kwargs = False
+
+        def after_return(self, \*args, \*\*kwargs):
+            if self.request.taskset:
+                callback = self.request.kwargs.get("callback")
+                if callback:
+                    setid = self.request.taskset
+                    # task set must be saved in advance, so the task doesn't
+                    # try to restore it before that happens.  This is why we
+                    # use the `apply_presaved_taskset` below.
+                    result = TaskSetResult.restore(setid)
+                    current = self.redis.incr("taskset-" + setid)
+                    if current >= result.total:
+                        r = subtask(callback).delay(result.join())
+
+        @cached_property
+        def redis(self):
+            return Redis(host="localhost", port=6379)
+
+    @task(base=supports_taskset_callback)
+    def add(x, y, \*\*kwargs):
+        return x + y
+
+    @task
+    def sum_of(numbers):
+        print("TASKSET READY: %r" % (sum(numbers), ))
+
+    def apply_presaved_taskset(tasks):
+        r = []
+        setid = gen_unique_id()
+        for task in tasks:
+            uuid = gen_unique_id()
+            task.options["task_id"] = uuid
+            r.append((task, current_app.AsyncResult(uuid)))
+        ts = current_app.TaskSetResult(setid, [task[1] for task in r])
+        ts.save()
+        return TaskSet(task[0] for task in r).apply_async(taskset_id=setid)
+
+
+    # sum of 100 add tasks
+    result = apply_presaved_taskset(
+                add.subtask((i, i), {"callback": sum_of.subtask()})
+                    for i in xrange(100))