|
@@ -7,6 +7,7 @@ The consumers highly-optimized inner loop.
|
|
|
"""
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
+import errno
|
|
|
import socket
|
|
|
|
|
|
from celery.bootsteps import RUN
|
|
@@ -21,6 +22,15 @@ logger = get_logger(__name__)
|
|
|
error = logger.error
|
|
|
|
|
|
|
|
|
+def _quick_drain(connection, timeout=0.1):
|
|
|
+ try:
|
|
|
+ connection.drain_events(timeout=timeout)
|
|
|
+ except Exception as exc:
|
|
|
+ exc_errno = getattr(exc, 'errno', None)
|
|
|
+ if exc_errno is not None and exc_errno != errno.EAGAIN:
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
def asynloop(obj, connection, consumer, blueprint, hub, qos,
|
|
|
heartbeat, clock, hbrate=2.0, RUN=RUN):
|
|
|
"""Non-blocking event loop consuming messages until connection is lost,
|
|
@@ -51,7 +61,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
|
|
|
# limit - drain an event so we are in a clean state
|
|
|
# prior to starting our event loop.
|
|
|
if connection.transport.driver_type == 'amqp':
|
|
|
- hub.call_soon(connection.drain_events)
|
|
|
+ hub.call_soon(_quick_drain, connection)
|
|
|
|
|
|
# FIXME: Use loop.run_forever
|
|
|
# Tried and works, but no time to test properly before release.
|