|
@@ -3,15 +3,20 @@ import heapq
|
|
|
|
|
|
from celery.worker.revoke import revoked
|
|
|
|
|
|
+DEFAULT_MAX_INTERVAL = 2
|
|
|
+
|
|
|
|
|
|
class Scheduler(object):
|
|
|
"""ETA scheduler.
|
|
|
|
|
|
:param ready_queue: Queue to move items ready for processing.
|
|
|
+ :keyword max_interval: Maximum sleep interval between iterations.
|
|
|
+ Default is 2 seconds.
|
|
|
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, ready_queue):
|
|
|
+ def __init__(self, ready_queue, max_interval=DEFAULT_MAX_INTERVAL):
|
|
|
+ self.max_interval = max_interval
|
|
|
self.ready_queue = ready_queue
|
|
|
self._queue = []
|
|
|
|
|
@@ -53,7 +58,7 @@ class Scheduler(object):
|
|
|
heapq.heappush(heap, event)
|
|
|
|
|
|
if now < eta:
|
|
|
- yield eta - now
|
|
|
+ yield min(eta - now, float(self.max_interval))
|
|
|
else:
|
|
|
event = pop(heap)
|
|
|
|