소스 검색

Fix failure when task has multiple callbacks

Exception was:

File "celery/app/trace.py", line 276, in trace_task
    group.apply_async((retval, ))
TypeError: unbound method apply_async() must be called with group instance as first argument (got tuple instance instead)
NotSqrt 10 년 전
부모
커밋
825b4e4c37
2개의 변경된 파일26개의 추가작업 그리고 1개의 파일을 삭제
  1. 1 1
      celery/app/trace.py
  2. 25 0
      celery/tests/tasks/test_trace.py

+ 1 - 1
celery/app/trace.py

@@ -273,7 +273,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                                     else:
                                         sigs.append(sig)
                                 for group_ in groups:
-                                    group.apply_async((retval, ))
+                                    group_.apply_async((retval, ))
                                 if sigs:
                                     group(sigs).apply_async((retval, ))
                             else:

+ 25 - 0
celery/tests/tasks/test_trace.py

@@ -93,6 +93,31 @@ class test_trace(TraceCase):
         finally:
             signals.task_success.receivers[:] = []
 
+    def test_multiple_callbacks(self):
+        """
+        Regression test on trace with multiple callbacks
+
+        Uses the signature of the following canvas:
+            chain(
+                empty.subtask(link=empty.subtask()),
+                group(empty.subtask(), empty.subtask())
+            )
+        """
+
+        @self.app.task(shared=False)
+        def empty(*args, **kwargs):
+            pass
+        empty.backend = Mock()
+
+        sig = {'chord_size': None, 'task': 'empty', 'args': (), 'options': {}, 'subtask_type': None, 'kwargs': {}, 'immutable': False}
+        callbacks = [
+            sig,
+            {'chord_size': None, 'task': 'celery.group', 'args': (), 'options': {}, 'subtask_type': 'group', 'kwargs': {'tasks': (empty(), empty())}, 'immutable': False}
+        ]
+
+        # should not raise an exception
+        self.trace(empty, [], {}, request={'callbacks': callbacks})
+
     def test_when_chord_part(self):
 
         @self.app.task(shared=False)