Browse Source

[worker] Now sleeps for 1 second if consumer restarted more than 5 times in 1 second. Closes #1053

Ask Solem 12 years ago
parent
commit
7c83829272
1 changed files with 8 additions and 0 deletions
  1. 8 0
      celery/worker/consumer.py

+ 8 - 0
celery/worker/consumer.py

@@ -80,6 +80,8 @@ import threading
 from time import sleep
 from time import sleep
 from Queue import Empty
 from Queue import Empty
 
 
+from billiard.common import restart_state
+from billiard.exceptions import RestartFreqExceeded
 from kombu.syn import _detect_environment
 from kombu.syn import _detect_environment
 from kombu.utils.encoding import safe_repr, safe_str, bytes_t
 from kombu.utils.encoding import safe_repr, safe_str, bytes_t
 from kombu.utils.eventio import READ, WRITE, ERR
 from kombu.utils.eventio import READ, WRITE, ERR
@@ -355,6 +357,7 @@ class Consumer(object):
         conninfo = self.app.connection()
         conninfo = self.app.connection()
         self.connection_errors = conninfo.connection_errors
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
         self.channel_errors = conninfo.channel_errors
+        self._restart_state = restart_state(maxR=5, maxT=1)
 
 
         self._does_info = logger.isEnabledFor(logging.INFO)
         self._does_info = logger.isEnabledFor(logging.INFO)
         self.strategies = {}
         self.strategies = {}
@@ -397,6 +400,11 @@ class Consumer(object):
         while self._state != CLOSE:
         while self._state != CLOSE:
             self.restart_count += 1
             self.restart_count += 1
             self.maybe_shutdown()
             self.maybe_shutdown()
+            try:
+                self._restart_state.step()
+            except RestartFreqExceeded as exc:
+                crit('Frequent restarts detected: %r', exc, exc_info=1)
+                sleep(1)
             try:
             try:
                 self.reset_connection()
                 self.reset_connection()
                 self.consume_messages()
                 self.consume_messages()