controllers.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Worker Controller Threads
  3. """
  4. import time
  5. import threading
  6. from datetime import datetime
  7. from Queue import Empty as QueueEmpty
  8. from celery import log
  9. from celery.worker.revoke import revoked
  10. class BackgroundThread(threading.Thread):
  11. """Thread running an infinite loop which for every iteration
  12. calls its :meth:`on_iteration` method.
  13. This also implements graceful shutdown of the thread by providing
  14. the :meth:`stop` method.
  15. """
  16. is_infinite = True
  17. def __init__(self):
  18. super(BackgroundThread, self).__init__()
  19. self._shutdown = threading.Event()
  20. self._stopped = threading.Event()
  21. self.setDaemon(True)
  22. def run(self):
  23. """This is the body of the thread.
  24. To start the thread use :meth:`start` instead.
  25. """
  26. self.on_start()
  27. while self.is_infinite:
  28. if self._shutdown.isSet():
  29. break
  30. self.on_iteration()
  31. self._stopped.set() # indicate that we are stopped
  32. def on_start(self):
  33. """This handler is run at thread start, just before the infinite
  34. loop."""
  35. pass
  36. def on_iteration(self):
  37. """This is the method called for every iteration and must be
  38. implemented by every subclass of :class:`BackgroundThread`."""
  39. raise NotImplementedError(
  40. "InfiniteThreads must implement on_iteration")
  41. def on_stop(self):
  42. """This handler is run when the thread is shutdown."""
  43. pass
  44. def stop(self):
  45. """Gracefully shutdown the thread."""
  46. self.on_stop()
  47. self._shutdown.set()
  48. self._stopped.wait() # block until this thread is done
  49. class Mediator(BackgroundThread):
  50. """Thread continuously sending tasks in the queue to the pool.
  51. .. attribute:: ready_queue
  52. The task queue, a :class:`Queue.Queue` instance.
  53. .. attribute:: callback
  54. The callback used to process tasks retrieved from the
  55. :attr:`ready_queue`.
  56. """
  57. def __init__(self, ready_queue, callback, logger=None):
  58. super(Mediator, self).__init__()
  59. self.logger = logger or log.get_default_logger()
  60. self.ready_queue = ready_queue
  61. self.callback = callback
  62. def on_iteration(self):
  63. """Get tasks from bucket queue and apply the task callback."""
  64. try:
  65. # This blocks until there's a message in the queue.
  66. task = self.ready_queue.get(timeout=1)
  67. except QueueEmpty:
  68. time.sleep(1)
  69. else:
  70. if task.task_id in revoked: # task revoked
  71. task.on_ack()
  72. self.logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
  73. task.task_name, task.task_id))
  74. return
  75. self.logger.debug(
  76. "Mediator: Running callback for task: %s[%s]" % (
  77. task.task_name, task.task_id))
  78. self.callback(task) # execute
  79. class ScheduleController(BackgroundThread):
  80. """Schedules tasks with an ETA by moving them to the bucket queue."""
  81. def __init__(self, eta_schedule, logger=None):
  82. super(ScheduleController, self).__init__()
  83. self.logger = logger or log.get_default_logger()
  84. self._scheduler = iter(eta_schedule)
  85. self.debug = log.SilenceRepeated(self.logger.debug, max_iterations=10)
  86. def on_iteration(self):
  87. """Wake-up scheduler"""
  88. delay = self._scheduler.next()
  89. if delay is None:
  90. delay = 1
  91. self.debug("ScheduleController: Scheduler wake-up",
  92. "ScheduleController: Next wake-up eta %s seconds..." % delay)
  93. time.sleep(delay)