Browse Source

Consumer does not shutdown properly when embedded in gevent application (#3746)

fixes #3745
Arcadiy Ivanov 8 years ago
parent
commit
c1fa9af097
2 changed files with 35 additions and 3 deletions
  1. 4 2
      celery/worker/consumer/consumer.py
  2. 31 1
      t/unit/worker/test_consumer.py

+ 4 - 2
celery/worker/consumer/consumer.py

@@ -42,6 +42,8 @@ from celery.worker.state import (
 __all__ = ['Consumer', 'Evloop', 'dump_body']
 
 CLOSE = bootsteps.CLOSE
+TERMINATE = bootsteps.TERMINATE
+STOP_CONDITIONS = {CLOSE, TERMINATE}
 logger = get_logger(__name__)
 debug, info, warn, error, crit = (logger.debug, logger.info, logger.warning,
                                   logger.error, logger.critical)
@@ -305,7 +307,7 @@ class Consumer(object):
 
     def start(self):
         blueprint = self.blueprint
-        while blueprint.state != CLOSE:
+        while blueprint.state not in STOP_CONDITIONS:
             maybe_shutdown()
             if self.restart_count:
                 try:
@@ -324,7 +326,7 @@ class Consumer(object):
                 if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
                     raise  # Too many open files
                 maybe_shutdown()
-                if blueprint.state != CLOSE:
+                if blueprint.state not in STOP_CONDITIONS:
                     if self.connection:
                         self.on_connection_error_after_connected(exc)
                     else:

+ 31 - 1
t/unit/worker/test_consumer.py

@@ -10,7 +10,8 @@ from case import ContextMock, Mock, call, patch, skip
 from billiard.exceptions import RestartFreqExceeded
 
 from celery.worker.consumer.agent import Agent
-from celery.worker.consumer.consumer import CLOSE, Consumer, dump_body
+from celery.worker.consumer.consumer import (CLOSE, TERMINATE,
+                                             Consumer, dump_body)
 from celery.worker.consumer.gossip import Gossip
 from celery.worker.consumer.heart import Heart
 from celery.worker.consumer.mingle import Mingle
@@ -153,6 +154,35 @@ class test_Consumer:
             c.start()
             sleep.assert_called_with(1)
 
+    def test_do_not_restart_when_closed(self):
+        c = self.get_consumer()
+
+        c.blueprint.state = None
+
+        def bp_start(*args, **kwargs):
+            c.blueprint.state = CLOSE
+
+        c.blueprint.start.side_effect = bp_start
+        with patch('celery.worker.consumer.consumer.sleep'):
+            c.start()
+
+        c.blueprint.start.assert_called_once_with(c)
+
+    def test_do_not_restart_when_terminated(self):
+        c = self.get_consumer()
+
+        c.blueprint.state = None
+
+        def bp_start(*args, **kwargs):
+            c.blueprint.state = TERMINATE
+
+        c.blueprint.start.side_effect = bp_start
+
+        with patch('celery.worker.consumer.consumer.sleep'):
+            c.start()
+
+        c.blueprint.start.assert_called_once_with(c)
+
     def test_no_retry_raises_error(self):
         self.app.conf.broker_connection_retry = False
         c = self.get_consumer()