|
@@ -1,6 +1,8 @@
|
|
import time
|
|
import time
|
|
import heapq
|
|
import heapq
|
|
|
|
|
|
|
|
+from celery.worker.revoke import revoked
|
|
|
|
+
|
|
|
|
|
|
class Scheduler(object):
|
|
class Scheduler(object):
|
|
"""ETA scheduler.
|
|
"""ETA scheduler.
|
|
@@ -40,12 +42,22 @@ class Scheduler(object):
|
|
eta, priority, item, callback = verify = heap[0]
|
|
eta, priority, item, callback = verify = heap[0]
|
|
now = nowfun()
|
|
now = nowfun()
|
|
|
|
|
|
|
|
+ # FIXME: Need a generic hook for this
|
|
|
|
+ if item.task_id in revoked:
|
|
|
|
+ event = pop(heap)
|
|
|
|
+ if event is verify:
|
|
|
|
+ item.on_ack()
|
|
|
|
+ self.logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
|
|
|
|
+ item.task_name, item.task_id))
|
|
|
|
+ else:
|
|
|
|
+ heapq.heappush(heap, event)
|
|
|
|
+
|
|
if now < eta:
|
|
if now < eta:
|
|
yield eta - now
|
|
yield eta - now
|
|
else:
|
|
else:
|
|
event = pop(heap)
|
|
event = pop(heap)
|
|
|
|
|
|
- if event is verify: # pragma: no cover
|
|
|
|
|
|
+ if event is verify:
|
|
ready_queue.put(item)
|
|
ready_queue.put(item)
|
|
callback and callback()
|
|
callback and callback()
|
|
yield 0
|
|
yield 0
|