Browse Source

Cleanup the mailbox's producer pool after forking (#4472)

If the mailbox is used before forking the workers will
not be able to broadcast messages.  This is because the producer pool
instance on the mail box will be `closed.`  This fix will cause the
mailbox to load the producer pool again after forking.


Fixes https://github.com/celery/celery/issues/3751
Nick Eaket 7 years ago
parent
commit
ef3237d272
2 changed files with 26 additions and 0 deletions
  1. 15 0
      celery/app/control.py
  2. 11 0
      t/unit/app/test_control.py

+ 15 - 0
celery/app/control.py

@@ -10,14 +10,18 @@ import warnings
 
 from billiard.common import TERM_SIGNAME
 from kombu.pidbox import Mailbox
+from kombu.utils.compat import register_after_fork
 from kombu.utils.functional import lazy
 from kombu.utils.objects import cached_property
 
 from celery.exceptions import DuplicateNodenameWarning
+from celery.utils.log import get_logger
 from celery.utils.text import pluralize
 
 __all__ = ('Inspect', 'Control', 'flatten_reply')
 
+logger = get_logger(__name__)
+
 W_DUPNODE = """\
 Received multiple replies from node {0}: {1}.
 Please make sure you give each node a unique nodename using
@@ -51,6 +55,13 @@ def flatten_reply(reply):
     return nodes
 
 
+def _after_fork_cleanup_control(control):
+    try:
+        control._after_fork()
+    except Exception as exc:  # pylint: disable=broad-except
+        logger.info('after fork raised exception: %r', exc, exc_info=1)
+
+
 class Inspect(object):
     """API for app.control.inspect."""
 
@@ -158,6 +169,10 @@ class Control(object):
             queue_expires=app.conf.control_queue_expires,
             reply_queue_expires=app.conf.control_queue_expires,
         )
+        register_after_fork(self, _after_fork_cleanup_control)
+
+    def _after_fork(self):
+        del self.mailbox.producer_pool
 
     @cached_property
     def inspect(self):

+ 11 - 0
t/unit/app/test_control.py

@@ -487,3 +487,14 @@ class test_Control:
             uuids,
             connection=None, reply=False, signal=None,
             terminate=False, timeout=None)
+
+    def test_after_fork_clears_mailbox_pool(self):
+        amqp = Mock(name='amqp')
+        self.app.amqp = amqp
+        closed_pool = Mock(name='closed pool')
+        amqp.producer_pool = closed_pool
+        assert closed_pool is self.app.control.mailbox.producer_pool
+        self.app.control._after_fork()
+        new_pool = Mock(name='new pool')
+        amqp.producer_pool = new_pool
+        assert new_pool is self.app.control.mailbox.producer_pool