scheduler.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from __future__ import generators
  2. import time
  3. import heapq
  4. from celery.worker.revoke import revoked
  5. DEFAULT_MAX_INTERVAL = 2
  6. class Scheduler(object):
  7. """ETA scheduler.
  8. :param ready_queue: Queue to move items ready for processing.
  9. :keyword max_interval: Maximum sleep interval between iterations.
  10. Default is 2 seconds.
  11. """
  12. def __init__(self, ready_queue, max_interval=DEFAULT_MAX_INTERVAL):
  13. self.max_interval = float(max_interval)
  14. self.ready_queue = ready_queue
  15. self._queue = []
  16. def enter(self, item, eta=None, priority=0, callback=None):
  17. """Enter item into the scheduler.
  18. :param item: Item to enter.
  19. :param eta: Scheduled time as a :class:`datetime.datetime` object.
  20. :param priority: Unused.
  21. :param callback: Callback to call when the item is scheduled.
  22. This callback takes no arguments.
  23. """
  24. eta = eta and time.mktime(eta.timetuple()) or time.time()
  25. heapq.heappush(self._queue, (eta, priority, item, callback))
  26. def __iter__(self):
  27. """The iterator yields the time to sleep for between runs."""
  28. # localize variable access
  29. nowfun = time.time
  30. pop = heapq.heappop
  31. ready_queue = self.ready_queue
  32. while 1:
  33. if self._queue:
  34. eta, priority, item, callback = verify = self._queue[0]
  35. now = nowfun()
  36. # FIXME: Need a generic hook for this
  37. if item.task_id in revoked:
  38. event = pop(self._queue)
  39. if event is verify:
  40. item.on_ack()
  41. self.logger.warn(
  42. "Mediator: Skipping revoked task: %s[%s]" % (
  43. item.task_name, item.task_id))
  44. else:
  45. heapq.heappush(self._queue, event)
  46. if now < eta:
  47. yield min(eta - now, self.max_interval)
  48. else:
  49. event = pop(self._queue)
  50. if event is verify:
  51. ready_queue.put(item)
  52. callback and callback()
  53. yield 0
  54. else:
  55. heapq.heappush(self._queue, event)
  56. yield None
  57. def empty(self):
  58. """Is the schedule empty?"""
  59. return not self._queue
  60. def clear(self):
  61. self._queue = []
  62. @property
  63. def queue(self):
  64. events = list(self._queue)
  65. return map(heapq.heappop, [events]*len(events))