controllers.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. """
  2. Worker Controller Threads
  3. """
  4. import time
  5. import threading
  6. from Queue import Empty as QueueEmpty
  7. from celery import log
  8. from celery.worker.revoke import revoked
  9. class BackgroundThread(threading.Thread):
  10. """Thread running an infinite loop which for every iteration
  11. calls its :meth:`on_iteration` method.
  12. This also implements graceful shutdown of the thread by providing
  13. the :meth:`stop` method.
  14. """
  15. def __init__(self):
  16. super(BackgroundThread, self).__init__()
  17. self._shutdown = threading.Event()
  18. self._stopped = threading.Event()
  19. self.setDaemon(True)
  20. def run(self):
  21. """This is the body of the thread.
  22. To start the thread use :meth:`start` instead.
  23. """
  24. self.on_start()
  25. while 1:
  26. if self._shutdown.isSet():
  27. break
  28. self.on_iteration()
  29. self._stopped.set() # indicate that we are stopped
  30. def on_start(self):
  31. """This handler is run at thread start, just before the infinite
  32. loop."""
  33. pass
  34. def on_iteration(self):
  35. """This is the method called for every iteration and must be
  36. implemented by every subclass of :class:`BackgroundThread`."""
  37. raise NotImplementedError(
  38. "InfiniteThreads must implement on_iteration")
  39. def on_stop(self):
  40. """This handler is run when the thread is shutdown."""
  41. pass
  42. def stop(self):
  43. """Gracefully shutdown the thread."""
  44. self.on_stop()
  45. self._shutdown.set()
  46. self._stopped.wait() # block until this thread is done
  47. class Mediator(BackgroundThread):
  48. """Thread continuously sending tasks in the queue to the pool.
  49. .. attribute:: ready_queue
  50. The task queue, a :class:`Queue.Queue` instance.
  51. .. attribute:: callback
  52. The callback used to process tasks retrieved from the
  53. :attr:`ready_queue`.
  54. """
  55. def __init__(self, ready_queue, callback, logger=None):
  56. super(Mediator, self).__init__()
  57. self.logger = logger or log.get_default_logger()
  58. self.ready_queue = ready_queue
  59. self.callback = callback
  60. def on_iteration(self):
  61. """Get tasks from bucket queue and apply the task callback."""
  62. try:
  63. # This blocks until there's a message in the queue.
  64. task = self.ready_queue.get(timeout=1)
  65. except QueueEmpty:
  66. time.sleep(0.2)
  67. else:
  68. if task.task_id in revoked: # task revoked
  69. task.on_ack()
  70. self.logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
  71. task.task_name, task.task_id))
  72. return
  73. self.logger.debug(
  74. "Mediator: Running callback for task: %s[%s]" % (
  75. task.task_name, task.task_id))
  76. self.callback(task) # execute
  77. class ScheduleController(BackgroundThread):
  78. """Schedules tasks with an ETA by moving them to the bucket queue."""
  79. def __init__(self, eta_schedule, logger=None):
  80. super(ScheduleController, self).__init__()
  81. self.logger = logger or log.get_default_logger()
  82. self._scheduler = iter(eta_schedule)
  83. self.debug = log.SilenceRepeated(self.logger.debug, max_iterations=10)
  84. def on_iteration(self):
  85. """Wake-up scheduler"""
  86. delay = self._scheduler.next()
  87. if delay is None:
  88. delay = 1
  89. self.debug("ScheduleController: Scheduler wake-up",
  90. "ScheduleController: Next wake-up eta %s seconds..." % delay)
  91. time.sleep(delay)