Prechádzať zdrojové kódy

Propagate connection errors occurring in pool callbacks. Closes #1226 (also depends on celery/billiard@5f6cba7200565d846e3bbffbc89716292f448738

Ask Solem 12 rokov pred
rodič
commit
4a728d18a6

+ 3 - 1
celery/concurrency/base.py

@@ -57,11 +57,12 @@ class BasePool(object):
     uses_semaphore = False
 
     def __init__(self, limit=None, putlocks=True,
-                 forking_enable=True, **options):
+                 forking_enable=True, callbacks_propagate=(), **options):
         self.limit = limit
         self.putlocks = putlocks
         self.options = options
         self.forking_enable = forking_enable
+        self.callbacks_propagate = callbacks_propagate
         self._does_debug = logger.isEnabledFor(logging.DEBUG)
 
     def on_start(self):
@@ -134,6 +135,7 @@ class BasePool(object):
 
         return self.on_apply(target, args, kwargs,
                              waitforslot=self.putlocks,
+                             callbacks_propagate=self.callbacks_propagate,
                              **options)
 
     def _get_info(self):

+ 0 - 21
celery/tests/concurrency/test_processes.py

@@ -12,7 +12,6 @@ from celery.utils.functional import noop
 from celery.tests.utils import Case
 try:
     from celery.concurrency import processes as mp
-    from billiard.pool import safe_apply_callback
 except ImportError:
 
     class _mp(object):
@@ -33,7 +32,6 @@ except ImportError:
             def apply_async(self, *args, **kwargs):
                 pass
     mp = _mp()  # noqa
-    safe_apply_callback = None  # noqa
 
 
 class Object(object):   # for writeable attributes.
@@ -135,25 +133,6 @@ class test_TaskPool(Case):
         pool.terminate()
         self.assertTrue(_pool.terminated)
 
-    def test_safe_apply_callback(self):
-        if safe_apply_callback is None:
-            raise SkipTest('multiprocessig not supported')
-        _good_called = [0]
-        _evil_called = [0]
-
-        def good(x):
-            _good_called[0] = 1
-            return x
-
-        def evil(x):
-            _evil_called[0] = 1
-            raise KeyError(x)
-
-        self.assertIsNone(safe_apply_callback(good, 10))
-        self.assertIsNone(safe_apply_callback(evil, 10))
-        self.assertTrue(_good_called[0])
-        self.assertTrue(_evil_called[0])
-
     def test_apply_async(self):
         pool = TaskPool(10)
         pool.start()

+ 6 - 1
celery/worker/__init__.py

@@ -168,6 +168,9 @@ class Pool(bootsteps.StartStopComponent):
             allow_restart=allow_restart,
             forking_enable=forking_enable,
             semaphore=semaphore,
+            callbacks_propagate=(
+                w._conninfo.connection_errors + w._conninfo.channel_errors
+            ),
         )
         if w.hub:
             w.hub.on_init.append(partial(self.on_poll_init, pool, w))
@@ -323,6 +326,8 @@ class WorkController(configurated):
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self.pidfile = pidfile
         self.pidlock = None
+        # this connection is not established, only used for params
+        self._conninfo = self.app.connection()
         self.use_eventloop = self.should_use_eventloop()
 
         # Update celery_include to have all known task modules, so that we
@@ -393,7 +398,7 @@ class WorkController(configurated):
 
     def should_use_eventloop(self):
         return (detect_environment() == 'default' and
-                self.app.connection().is_evented and not self.app.IS_WINDOWS)
+                self._conninfo.is_evented and not self.app.IS_WINDOWS)
 
     def stop(self, in_sighandler=False):
         """Graceful shutdown of the worker server."""