@@ -50,7 +50,8 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
# consumer.consume() may have prefetched up to our
# limit - drain an event so we are in a clean state
# prior to starting our event loop.
- connection.drain_events()
+ if connection.transport.driver_type == 'amqp':
+ hub.call_soon(connection.drain_events)
# FIXME: Use loop.run_forever
# Tried and works, but no time to test properly before release.