|
@@ -429,29 +429,26 @@ class Consumer(object):
|
|
|
update_readers(on_poll_start())
|
|
|
if readers or writers:
|
|
|
connection.more_to_read = True
|
|
|
- while connection.more_to_read:
|
|
|
- for fileno, event in poll(poll_timeout) or ():
|
|
|
- try:
|
|
|
- if event & READ:
|
|
|
- readers[fileno](fileno, event)
|
|
|
- if event & WRITE:
|
|
|
- writers[fileno](fileno, event)
|
|
|
- if event & ERR:
|
|
|
- for handlermap in readers, writers:
|
|
|
- try:
|
|
|
- handlermap[fileno](fileno, event)
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- except Empty:
|
|
|
- break
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
- if not keep_draining:
|
|
|
- connection.more_to_read = False
|
|
|
- if connection.more_to_read:
|
|
|
- drain_nowait()
|
|
|
- poll_timeout = 0
|
|
|
+ for fileno, event in poll(poll_timeout) or ():
|
|
|
+ try:
|
|
|
+ if event & READ:
|
|
|
+ readers[fileno](fileno, event)
|
|
|
+ if event & WRITE:
|
|
|
+ writers[fileno](fileno, event)
|
|
|
+ if event & ERR:
|
|
|
+ for handlermap in readers, writers:
|
|
|
+ try:
|
|
|
+ handlermap[fileno](fileno, event)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ except Empty:
|
|
|
+ break
|
|
|
+ except socket.error:
|
|
|
+ if self._state != CLOSE: # pragma: no cover
|
|
|
+ raise
|
|
|
+ while keep_draining and connection.more_to_read:
|
|
|
+ drain_nowait()
|
|
|
+ poll_timeout = 0
|
|
|
else:
|
|
|
# no sockets yet, startup is probably not done.
|
|
|
sleep(min(poll_timeout, 0.1))
|