|
@@ -419,9 +419,9 @@ class Consumer(object):
|
|
|
elif state.should_terminate:
|
|
|
raise SystemTerminate()
|
|
|
|
|
|
- # fire any ready timers, this also determines
|
|
|
- # when we need to wake up next.
|
|
|
- time_to_sleep = fire_timers() if scheduled else 1
|
|
|
+ # fire any ready timers, this also returns
|
|
|
+ # the number of seconds until we need to fire timers again.
|
|
|
+ poll_timeout = fire_timers() if scheduled else 1
|
|
|
|
|
|
if qos.prev != qos.value:
|
|
|
update_qos()
|
|
@@ -430,7 +430,7 @@ class Consumer(object):
|
|
|
if readers or writers:
|
|
|
connection.more_to_read = True
|
|
|
while connection.more_to_read:
|
|
|
- for fileno, event in poll(time_to_sleep) or ():
|
|
|
+ for fileno, event in poll(poll_timeout) or ():
|
|
|
try:
|
|
|
if event & READ:
|
|
|
readers[fileno](fileno, event)
|
|
@@ -451,9 +451,10 @@ class Consumer(object):
|
|
|
connection.more_to_read = False
|
|
|
if connection.more_to_read:
|
|
|
drain_nowait()
|
|
|
- time_to_sleep = 0
|
|
|
+ poll_timeout = 0
|
|
|
else:
|
|
|
- sleep(min(time_to_sleep, 0.1))
|
|
|
+ # no sockets yet, startup is probably not done.
|
|
|
+ sleep(min(poll_timeout, 0.1))
|
|
|
|
|
|
def on_task(self, task, task_reserved=task_reserved):
|
|
|
"""Handle received task.
|