Procházet zdrojové kódy

Merge pull request #2515 from NotSqrt/multiple-callbacks

Fix failure when task has multiple callbacks
PMickael před 10 roky
rodič
revize
11b8576fcc
2 změnil soubory, kde provedl 31 přidání a 1 odebrání
  1. 1 1
      celery/app/trace.py
  2. 30 0
      celery/tests/tasks/test_trace.py

+ 1 - 1
celery/app/trace.py

@@ -273,7 +273,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                                     else:
                                         sigs.append(sig)
                                 for group_ in groups:
-                                    group.apply_async((retval, ))
+                                    group_.apply_async((retval, ))
                                 if sigs:
                                     group(sigs).apply_async((retval, ))
                             else:

+ 30 - 0
celery/tests/tasks/test_trace.py

@@ -93,6 +93,36 @@ class test_trace(TraceCase):
         finally:
             signals.task_success.receivers[:] = []
 
+    def test_multiple_callbacks(self):
+        """
+        Regression test on trace with multiple callbacks
+
+        Uses the signature of the following canvas:
+            chain(
+                empty.subtask(link=empty.subtask()),
+                group(empty.subtask(), empty.subtask())
+            )
+        """
+
+        @self.app.task(shared=False)
+        def empty(*args, **kwargs):
+            pass
+        empty.backend = Mock()
+
+        sig = {
+            'chord_size': None, 'task': 'empty', 'args': (), 'options': {},
+            'subtask_type': None, 'kwargs': {}, 'immutable': False
+        }
+        group_sig = {
+            'chord_size': None, 'task': 'celery.group', 'args': (),
+            'options': {}, 'subtask_type': 'group',
+            'kwargs': {'tasks': (empty(), empty())}, 'immutable': False
+        }
+        callbacks = [sig, group_sig]
+
+        # should not raise an exception
+        self.trace(empty, [], {}, request={'callbacks': callbacks})
+
     def test_when_chord_part(self):
 
         @self.app.task(shared=False)