Browse Source

Task.replace changed, removes Task.replace_in_chord.

The two methods had almost the same functionality, but the old Task.replace
would force the new task to inherit the callbacks/errbacks of the existing
task.

If you replace a node in a tree, then you would not expect the new node to
inherit the children of the old node, so this seems like unexpected
behavior.

So self.replace(sig) now works for any task, in addition sig can now
be a group.

Groups are automatically converted to a chord, where the callback
will "accumulate" the results of the group tasks.

A new builtin task (`celery.accumulate` was added for this purpose)

Closes #817
Ask Solem 10 years ago
parent
commit
07ecd08a86
2 changed files with 25 additions and 18 deletions
  1. 10 0
      celery/app/builtins.py
  2. 15 18
      celery/app/task.py

+ 10 - 0
celery/app/builtins.py

@@ -33,6 +33,16 @@ def add_backend_cleanup_task(app):
     return backend_cleanup
 
 
+@connect_on_app_finalize
+def add_accumulate_task(app):
+    """This task is used by Task.replace when replacing a task with
+    a group, to "collect" results."""
+    @app.task(bind=True, name='celery.accumulate', shared=False, lazy=False)
+    def accumulate(self, *args, **kwargs):
+        index = kwargs.get('index')
+        return args[index] if index is not None else args
+
+
 @connect_on_app_finalize
 def add_unlock_chord_task(app):
     """This task is used by result backends without native chord support.

+ 15 - 18
celery/app/task.py

@@ -12,7 +12,7 @@ import sys
 
 from billiard.einfo import ExceptionInfo
 
-from celery import current_app
+from celery import current_app, group
 from celery import states
 from celery._state import _task_stack
 from celery.canvas import signature
@@ -687,30 +687,27 @@ class Task(object):
             return d.send(type_, uuid=req.id, **fields)
 
     def replace(self, sig):
-        request = self.request
-        sig.set_immutable(True)
-        chord_id, request.chord = request.chord, None
-        group_id, request.group = request.group, None
-        callbacks, request.callbacks = request.callbacks, [sig]
-        if group_id or chord_id:
-            sig.set(group=group_id, chord=chord_id)
-        sig |= callbacks[0]
-        return sig
-
-    def replace_in_chord(self, sig):
-        """Replace the current task (which must be a member of a chord)
-        with a new task.
-
-        Note that this will raise :exc:`~@Ignore`, so the best practice
-        is to always use ``return self.replace_in_chord(...)`` to convey
+        """Replace the current task, with a new task inheriting the
+        same task id.
+
+        :param sig: :class:`@signature`
+
+        Note: This will raise :exc:`~@Ignore`, so the best practice
+        is to always use ``raise self.replace_in_chord(...)`` to convey
         to the reader that the task will not continue after being replaced.
 
         :param: Signature of new task.
 
         """
+        chord = self.request.chord
+        if isinstance(sig, group):
+            sig |= self.app.tasks['celery.accumulate'].s(index=0).set(
+                chord=chord,
+            )
+            chord = None
         sig.freeze(self.request.id,
                    group_id=self.request.group,
-                   chord=self.request.chord,
+                   chord=chord,
                    root_id=self.request.root_id)
         sig.delay()
         raise Ignore('Chord member replaced by new task')