Browse Source

Calling a chord/group no longer modifies state so task_ids are reused on subsequent invocations

Ask Solem 12 years ago
parent
commit
67ee615c2e
3 changed files with 17 additions and 7 deletions
  1. 13 3
      Changelog
  2. 2 2
      celery/app/builtins.py
  3. 2 2
      celery/canvas.py

+ 13 - 3
Changelog

@@ -34,6 +34,19 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
     - Pool now sets a ``current_process().index`` attribute that can be used to create
       as many log files as there are processes in the pool.
 
+- Canvas: chord/group/chain no longer modifies the state when called
+
+    Previously calling a chord/group/chain would modify the ids of subtasks
+    so that::
+
+        >>> c = chord([add.s(2, 2), add.s(4, 4)], xsum.s())
+        >>> c()
+        >>> c() <-- call again
+
+    at the second time the ids for the tasks would be the same as in the
+    previous invocation.  This is now fixed, so that calling a subtask
+    won't mutate any options.
+
 - Worker: Fixed a bug where the request stack could be corrupted if
   relative imports are used.
 
@@ -42,9 +55,6 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
     Fix contributed by Sam Cooke.
 
-- ``subtask.clone`` now deepcopies options so that original subtask
-  is not modified when used in chains/groups etc.
-
 - Because of many bugs the fast local optimization has been disabled,
   and can only be enabled by setting the :envvar:`USE_FAST_LOCALS` attribute.
 

+ 2 - 2
celery/app/builtins.py

@@ -276,8 +276,8 @@ def add_chord_task(app):
             prepare_member = self._prepare_member
 
             # - convert back to group if serialized
-            if not isinstance(header, group):
-                header = group(map(maybe_subtask, header))
+            tasks = header.tasks if isinstance(header, group) else header
+            header = group(maybe_subtask(s).clone() for s in tasks)
             # - eager applies the group inline
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)

+ 2 - 2
celery/canvas.py

@@ -352,11 +352,11 @@ class chord(Signature):
 
     def __call__(self, body=None, **kwargs):
         _chord = self.Chord
-        body = self.kwargs['body'] = body or self.kwargs['body']
+        body = (body or self.kwargs['body']).clone()
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply((), kwargs)
         callback_id = body.options.setdefault('task_id', uuid())
-        _chord(**dict(self.kwargs, **kwargs))
+        _chord(**dict(self.kwargs, body=body, **kwargs))
         return _chord.AsyncResult(callback_id)
 
     def clone(self, *args, **kwargs):