Ask Solem 9 lat temu
rodzic
commit
8866282482

+ 3 - 1
celery/canvas.py

@@ -21,6 +21,7 @@ from operator import itemgetter
 from itertools import chain as _chain
 
 from kombu.utils import cached_property, fxrange, reprcall, uuid
+from kombu.utils.functional import maybe_list
 
 from celery._state import current_app
 from celery.local import try_import
@@ -527,7 +528,8 @@ class chain(Signature):
                 task.set_parent_id(parent_id)
 
             if link_error:
-                task.link_error(link_error)
+                for errback in maybe_list(link_error):
+                    task.link_error(errback)
 
             tasks.append(task)
             results.append(res)

+ 1 - 1
celery/tests/worker/test_loops.py

@@ -61,7 +61,7 @@ class X(object):
         self.Hub = self.hub
         self.blueprint.state = RUN
         # need this for create_task_handler
-        _consumer = Consumer(Mock(), timer=Mock(), app=app)
+        _consumer = Consumer(Mock(), timer=Mock(), controller=Mock(), app=app)
         _consumer.on_task_message = on_task_message or []
         self.obj.create_task_handler = _consumer.create_task_handler
         self.on_unknown_message = self.obj.on_unknown_message = Mock(

+ 2 - 0
celery/tests/worker/test_worker.py

@@ -62,6 +62,7 @@ class Consumer(__Consumer):
         kwargs.setdefault('without_mingle', True)  # disable Mingle step
         kwargs.setdefault('without_gossip', True)  # disable Gossip step
         kwargs.setdefault('without_heartbeat', True)  # disable Heart step
+        kwargs.setdefault('controller', Mock())
         super(Consumer, self).__init__(*args, **kwargs)
 
 
@@ -71,6 +72,7 @@ class _MyKombuConsumer(Consumer):
 
     def __init__(self, *args, **kwargs):
         kwargs.setdefault('pool', BasePool(2))
+        kwargs.setdefault('controller', Mock())
         super(_MyKombuConsumer, self).__init__(*args, **kwargs)
 
     def restart_heartbeat(self):