|
@@ -384,22 +384,12 @@ class Consumer(object):
|
|
|
poll = hub.poller.poll
|
|
|
fire_timers = hub.fire_timers
|
|
|
scheduled = hub.timer._queue
|
|
|
- on_poll_start = self.connection.transport.on_poll_start
|
|
|
- strategies = self.strategies
|
|
|
connection = self.connection
|
|
|
+ on_poll_start = connection.transport.on_poll_start
|
|
|
+ strategies = self.strategies
|
|
|
drain_nowait = connection.drain_nowait
|
|
|
on_task_callbacks = hub.on_task
|
|
|
- buffer = []
|
|
|
-
|
|
|
- def flush_buffer():
|
|
|
- for name, body, message in buffer:
|
|
|
- try:
|
|
|
- strategies[name](message, body, message.ack_log_error)
|
|
|
- except KeyError, exc:
|
|
|
- self.handle_unknown_task(body, message, exc)
|
|
|
- except InvalidTaskError, exc:
|
|
|
- self.handle_invalid_task(body, message, exc)
|
|
|
- buffer[:] = []
|
|
|
+ keep_draining = connection.transport.nb_keep_draining
|
|
|
|
|
|
def on_task_received(body, message):
|
|
|
if on_task_callbacks:
|
|
@@ -414,12 +404,6 @@ class Consumer(object):
|
|
|
self.handle_unknown_task(body, message, exc)
|
|
|
except InvalidTaskError, exc:
|
|
|
self.handle_invalid_task(body, message, exc)
|
|
|
- #bufferlen = len(buffer)
|
|
|
- #buffer.append((name, body, message))
|
|
|
- #if bufferlen + 1 >= 4:
|
|
|
- # flush_buffer()
|
|
|
- #if bufferlen:
|
|
|
- # fire_timers()
|
|
|
|
|
|
self.task_consumer.callbacks = [on_task_received]
|
|
|
self.task_consumer.consume()
|
|
@@ -458,6 +442,8 @@ class Consumer(object):
|
|
|
except socket.error:
|
|
|
if self._state != CLOSE: # pragma: no cover
|
|
|
raise
|
|
|
+ if not keep_draining:
|
|
|
+ connection.more_to_read = False
|
|
|
if connection.more_to_read:
|
|
|
drain_nowait()
|
|
|
time_to_sleep = 0
|