scheduler.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from __future__ import generators
  2. import time
  3. import heapq
  4. from celery.worker.revoke import revoked
  5. from celery import log
  6. DEFAULT_MAX_INTERVAL = 2
  7. class Scheduler(object):
  8. """ETA scheduler.
  9. :param ready_queue: Queue to move items ready for processing.
  10. :keyword max_interval: Maximum sleep interval between iterations.
  11. Default is 2 seconds.
  12. """
  13. def __init__(self, ready_queue, logger=None,
  14. max_interval=DEFAULT_MAX_INTERVAL):
  15. self.max_interval = float(max_interval)
  16. self.ready_queue = ready_queue
  17. self.logger = logger or log.get_default_logger()
  18. self._queue = []
  19. def enter(self, item, eta=None, priority=0, callback=None):
  20. """Enter item into the scheduler.
  21. :param item: Item to enter.
  22. :param eta: Scheduled time as a :class:`datetime.datetime` object.
  23. :param priority: Unused.
  24. :param callback: Callback to call when the item is scheduled.
  25. This callback takes no arguments.
  26. """
  27. eta = eta and time.mktime(eta.timetuple()) or time.time()
  28. heapq.heappush(self._queue, (eta, priority, item, callback))
  29. def __iter__(self):
  30. """The iterator yields the time to sleep for between runs."""
  31. # localize variable access
  32. nowfun = time.time
  33. pop = heapq.heappop
  34. ready_queue = self.ready_queue
  35. while 1:
  36. if self._queue:
  37. eta, priority, item, callback = verify = self._queue[0]
  38. now = nowfun()
  39. # FIXME: Need a generic hook for this
  40. if item.task_id in revoked:
  41. event = pop(self._queue)
  42. if event is verify:
  43. item.on_ack()
  44. self.logger.warn(
  45. "Mediator: Skipping revoked task: %s[%s]" % (
  46. item.task_name, item.task_id))
  47. else:
  48. heapq.heappush(self._queue, event)
  49. if now < eta:
  50. yield min(eta - now, self.max_interval)
  51. else:
  52. event = pop(self._queue)
  53. if event is verify:
  54. ready_queue.put(item)
  55. if callback is not None:
  56. callback()
  57. yield 0
  58. else:
  59. heapq.heappush(self._queue, event)
  60. yield None
  61. def empty(self):
  62. """Is the schedule empty?"""
  63. return not self._queue
  64. def clear(self):
  65. self._queue = []
  66. def info(self):
  67. return ({"eta": eta, "priority": priority, "item": item}
  68. for eta, priority, item, _ in self.queue)
  69. @property
  70. def queue(self):
  71. events = list(self._queue)
  72. return map(heapq.heappop, [events]*len(events))