|
@@ -79,6 +79,7 @@ import threading
|
|
|
from time import sleep
|
|
|
from Queue import Empty
|
|
|
|
|
|
+from kombu.syn import _detect_environment
|
|
|
from kombu.utils.encoding import safe_repr
|
|
|
from kombu.utils.eventio import READ, WRITE, ERR
|
|
|
|
|
@@ -357,6 +358,12 @@ class Consumer(object):
|
|
|
if not hub:
|
|
|
self.amqheartbeat = 0
|
|
|
|
|
|
+ if _detect_environment() == 'gevent':
|
|
|
+ # there's a gevent bug that causes timeouts to not be reset,
|
|
|
+ # so if the connection timeout is exceeded once, it can NEVER
|
|
|
+ # connect again.
|
|
|
+ self.app.conf.BROKER_CONNECTION_TIMEOUT = None
|
|
|
+
|
|
|
def update_strategies(self):
|
|
|
S = self.strategies
|
|
|
app = self.app
|
|
@@ -606,7 +613,7 @@ class Consumer(object):
|
|
|
debug('Closing broker connection...')
|
|
|
self.maybe_conn_error(connection.close)
|
|
|
|
|
|
- def stop_consumers(self, close_connection=True):
|
|
|
+ def stop_consumers(self, close_connection=True, join=True):
|
|
|
"""Stop consuming tasks and broadcast commands, also stops
|
|
|
the heartbeat thread and event dispatcher.
|
|
|
|
|
@@ -623,7 +630,7 @@ class Consumer(object):
|
|
|
self.heart = self.heart.stop()
|
|
|
|
|
|
debug('Cancelling task consumer...')
|
|
|
- if self.task_consumer:
|
|
|
+ if join and self.task_consumer:
|
|
|
self.maybe_conn_error(self.task_consumer.cancel)
|
|
|
|
|
|
if self.event_dispatcher:
|
|
@@ -632,7 +639,7 @@ class Consumer(object):
|
|
|
self.maybe_conn_error(self.event_dispatcher.close)
|
|
|
|
|
|
debug('Cancelling broadcast consumer...')
|
|
|
- if self.broadcast_consumer:
|
|
|
+ if join and self.broadcast_consumer:
|
|
|
self.maybe_conn_error(self.broadcast_consumer.cancel)
|
|
|
|
|
|
if close_connection:
|
|
@@ -707,7 +714,7 @@ class Consumer(object):
|
|
|
"""Re-establish the broker connection and set up consumers,
|
|
|
heartbeat and the event dispatcher."""
|
|
|
debug('Re-establishing connection to the broker...')
|
|
|
- self.stop_consumers()
|
|
|
+ self.stop_consumers(join=False)
|
|
|
|
|
|
# Clear internal queues to get rid of old messages.
|
|
|
# They can't be acked anyway, as a delivery tag is specific
|
|
@@ -794,7 +801,7 @@ class Consumer(object):
|
|
|
# anymore.
|
|
|
self.close()
|
|
|
debug('Stopping consumers...')
|
|
|
- self.stop_consumers(close_connection=False)
|
|
|
+ self.stop_consumers(close_connection=False, join=True)
|
|
|
|
|
|
def close(self):
|
|
|
self._state = CLOSE
|