Explorar o código

Merge pull request #2226 from aneilbaboo/master

Fix issue #2225
Omer Katz %!s(int64=9) %!d(string=hai) anos
pai
achega
9b2c3bfc2a
Modificáronse 3 ficheiros con 12 adicións e 7 borrados
  1. 1 0
      celery/app/amqp.py
  2. 7 3
      celery/backends/base.py
  3. 4 4
      celery/canvas.py

+ 1 - 0
celery/app/amqp.py

@@ -371,6 +371,7 @@ class AMQP(object):
                 'id': task_id,
                 'args': args,
                 'kwargs': kwargs,
+                'group': group_id,
                 'retries': retries,
                 'eta': eta,
                 'expires': expires,

+ 7 - 3
celery/backends/base.py

@@ -357,8 +357,8 @@ class BaseBackend(object):
 
     def apply_chord(self, header, partial_args, group_id, body,
                     options={}, **kwargs):
-        options['task_id'] = group_id
-        result = header(*partial_args, **options or {})
+        fixed_options = dict((k,v) for k,v in options.items() if k!='task_id')
+        result = header(*partial_args, task_id=group_id, **fixed_options or {})
         self.fallback_chord_unlock(group_id, body, **kwargs)
         return result
 
@@ -534,7 +534,11 @@ class KeyValueStoreBackend(BaseBackend):
     def _apply_chord_incr(self, header, partial_args, group_id, body,
                           result=None, options={}, **kwargs):
         self.save_group(group_id, self.app.GroupResult(group_id, result))
-        return header(*partial_args, task_id=group_id, **options or {})
+
+        fixed_options = dict((k,v) for k,v in options.items() if k != 'task_id')
+
+        return header(*partial_args, task_id=group_id, **fixed_options or {})
+
 
     def on_chord_part_return(self, task, state, result, propagate=None):
         if not self.implements_incr:

+ 4 - 4
celery/canvas.py

@@ -474,9 +474,9 @@ class chain(Signature):
             if link_error:
                 task.set(link_error=link_error)
 
-            if not isinstance(prev_task, chord):
-                results.append(res)
-                tasks.append(task)
+            tasks.append(task)
+            results.append(res)
+
             prev_task, prev_res = task, res
 
         return tasks, results
@@ -603,7 +603,7 @@ def _maybe_group(tasks):
     elif isinstance(tasks, Signature):
         tasks = [tasks]
     else:
-        tasks = regen(tasks)
+        tasks = map(signature, regen(tasks))
     return tasks