|
@@ -38,38 +38,37 @@ class Scheduler(object):
|
|
|
"""The iterator yields the time to sleep for between runs."""
|
|
|
|
|
|
# localize variable access
|
|
|
- heap = self._queue
|
|
|
nowfun = time.time
|
|
|
pop = heapq.heappop
|
|
|
ready_queue = self.ready_queue
|
|
|
|
|
|
while 1:
|
|
|
- if heap:
|
|
|
- eta, priority, item, callback = verify = heap[0]
|
|
|
+ if self._queue:
|
|
|
+ eta, priority, item, callback = verify = self._queue[0]
|
|
|
now = nowfun()
|
|
|
|
|
|
# FIXME: Need a generic hook for this
|
|
|
if item.task_id in revoked:
|
|
|
- event = pop(heap)
|
|
|
+ event = pop(self._queue)
|
|
|
if event is verify:
|
|
|
item.on_ack()
|
|
|
self.logger.warn(
|
|
|
"Mediator: Skipping revoked task: %s[%s]" % (
|
|
|
item.task_name, item.task_id))
|
|
|
else:
|
|
|
- heapq.heappush(heap, event)
|
|
|
+ heapq.heappush(self._queue, event)
|
|
|
|
|
|
if now < eta:
|
|
|
yield min(eta - now, self.max_interval)
|
|
|
else:
|
|
|
- event = pop(heap)
|
|
|
+ event = pop(self._queue)
|
|
|
|
|
|
if event is verify:
|
|
|
ready_queue.put(item)
|
|
|
callback and callback()
|
|
|
yield 0
|
|
|
else:
|
|
|
- heapq.heappush(heap, event)
|
|
|
+ heapq.heappush(self._queue, event)
|
|
|
yield None
|
|
|
|
|
|
def empty(self):
|