|
@@ -18,7 +18,10 @@ from collections import defaultdict
|
|
|
from functools import partial
|
|
|
from heapq import heappush
|
|
|
from operator import itemgetter
|
|
|
+from time import sleep
|
|
|
|
|
|
+from billiard.common import restart_state
|
|
|
+from billiard.exceptions import RestartFreqExceeded
|
|
|
from kombu.common import QoS, ignore_errors
|
|
|
from kombu.syn import _detect_environment
|
|
|
from kombu.utils.encoding import safe_repr
|
|
@@ -150,6 +153,7 @@ class Consumer(object):
|
|
|
conninfo = self.app.connection()
|
|
|
self.connection_errors = conninfo.connection_errors
|
|
|
self.channel_errors = conninfo.channel_errors
|
|
|
+ self._restart_state = restart_state(maxR=5, maxT=1)
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
|
self._quick_put = self.ready_queue.put
|
|
@@ -187,6 +191,11 @@ class Consumer(object):
|
|
|
ns.start(self)
|
|
|
except self.connection_errors:
|
|
|
maybe_shutdown()
|
|
|
+ try:
|
|
|
+ self._restart_state.step()
|
|
|
+ except RestartFreqExceeded as exc:
|
|
|
+ crit('Frequent restarts detected: %r', exc, exc_info=1)
|
|
|
+ sleep(1)
|
|
|
if ns.state != CLOSE and self.connection:
|
|
|
warn(CONNECTION_RETRY, exc_info=True)
|
|
|
ns.restart(self)
|