Sfoglia il codice sorgente

Fix issue #2225

Creating a chord no longer results in "TypeError: group object got
multiple values for keyword argument 'task_id'".  Chords now
complete without hanging.
Aneil Mallavarapu 10 anni fa
parent
commit
3d00cc63c5
2 ha cambiato i file con 6 aggiunte e 1 eliminazioni
  1. 1 0
      celery/app/amqp.py
  2. 5 1
      celery/backends/base.py

+ 1 - 0
celery/app/amqp.py

@@ -371,6 +371,7 @@ class AMQP(object):
                 'id': task_id,
                 'id': task_id,
                 'args': args,
                 'args': args,
                 'kwargs': kwargs,
                 'kwargs': kwargs,
+                'group': group_id,
                 'retries': retries,
                 'retries': retries,
                 'eta': eta,
                 'eta': eta,
                 'expires': expires,
                 'expires': expires,

+ 5 - 1
celery/backends/base.py

@@ -534,7 +534,11 @@ class KeyValueStoreBackend(BaseBackend):
     def _apply_chord_incr(self, header, partial_args, group_id, body,
     def _apply_chord_incr(self, header, partial_args, group_id, body,
                           result=None, options={}, **kwargs):
                           result=None, options={}, **kwargs):
         self.save_group(group_id, self.app.GroupResult(group_id, result))
         self.save_group(group_id, self.app.GroupResult(group_id, result))
-        return header(*partial_args, task_id=group_id, **options or {})
+
+        fixed_options = dict((k,v) for k,v in options.items() if k != 'task_id')
+
+        return header(*partial_args, task_id=group_id, **fixed_options or {})
+
 
 
     def on_chord_part_return(self, task, state, result, propagate=None):
     def on_chord_part_return(self, task, state, result, propagate=None):
         if not self.implements_incr:
         if not self.implements_incr: