controllers.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """
  2. Worker Controller Threads
  3. """
  4. from celery.backends import default_periodic_status_backend
  5. from Queue import Empty as QueueEmpty
  6. from datetime import datetime
  7. from multiprocessing import get_logger
  8. import threading
  9. import time
  10. class InfinityThread(threading.Thread):
  11. """Thread running an infinite loop which for every iteration
  12. calls its :meth:`on_iteration` method.
  13. This also implements graceful shutdown of the thread by providing
  14. the :meth:`stop` method.
  15. """
  16. is_infinite = True
  17. def __init__(self):
  18. super(InfinityThread, self).__init__()
  19. self._shutdown = threading.Event()
  20. self._stopped = threading.Event()
  21. self.setDaemon(True)
  22. def run(self):
  23. """This is the body of the thread.
  24. To start the thread use :meth:`start` instead.
  25. """
  26. while self.is_infinite:
  27. if self._shutdown.isSet():
  28. break
  29. self.on_iteration()
  30. self._stopped.set() # indicate that we are stopped
  31. def on_iteration(self):
  32. """This is the method called for every iteration and must be
  33. implemented by every subclass of :class:`InfinityThread`."""
  34. raise NotImplementedError(
  35. "InfiniteThreads must implement on_iteration")
  36. def stop(self):
  37. """Gracefully shutdown the thread."""
  38. self._shutdown.set()
  39. self._stopped.wait() # block until this thread is done
  40. class Mediator(InfinityThread):
  41. """Thread continuously sending tasks in the queue to the pool.
  42. .. attribute:: bucket_queue
  43. The task queue, a :class:`Queue.Queue` instance.
  44. .. attribute:: callback
  45. The callback used to process tasks retrieved from the
  46. :attr:`bucket_queue`.
  47. """
  48. def __init__(self, bucket_queue, callback):
  49. super(Mediator, self).__init__()
  50. self.bucket_queue = bucket_queue
  51. self.callback = callback
  52. def on_iteration(self):
  53. logger = get_logger()
  54. try:
  55. logger.debug("Mediator: Trying to get message from bucket_queue")
  56. # This blocks until there's a message in the queue.
  57. task = self.bucket_queue.get(timeout=1)
  58. except QueueEmpty:
  59. logger.debug("Mediator: Bucket queue is empty.")
  60. pass
  61. else:
  62. logger.debug("Mediator: Running callback for task: %s[%s]" % (
  63. task.task_name, task.task_id))
  64. self.callback(task)
  65. class PeriodicWorkController(InfinityThread):
  66. """A thread that continuously checks if there are
  67. :class:`celery.task.PeriodicTask` tasks waiting for execution,
  68. and executes them. It also finds tasks in the hold queue that is
  69. ready for execution and moves them to the bucket queue.
  70. (Tasks in the hold queue are tasks waiting for retry, or with an
  71. ``eta``/``countdown``.)
  72. """
  73. def __init__(self, bucket_queue, hold_queue):
  74. super(PeriodicWorkController, self).__init__()
  75. self.hold_queue = hold_queue
  76. self.bucket_queue = bucket_queue
  77. def on_iteration(self):
  78. logger = get_logger()
  79. logger.debug("PeriodicWorkController: Running periodic tasks...")
  80. self.run_periodic_tasks()
  81. logger.debug("PeriodicWorkController: Processing hold queue...")
  82. self.process_hold_queue()
  83. logger.debug("PeriodicWorkController: Going to sleep...")
  84. time.sleep(1)
  85. def run_periodic_tasks(self):
  86. default_periodic_status_backend.run_periodic_tasks()
  87. def process_hold_queue(self):
  88. """Finds paused tasks that are ready for execution and move
  89. them to the :attr:`bucket_queue`."""
  90. logger = get_logger()
  91. try:
  92. logger.debug(
  93. "PeriodicWorkController: Getting next task from hold queue..")
  94. task, eta = self.hold_queue.get_nowait()
  95. except QueueEmpty:
  96. logger.debug("PeriodicWorkController: Hold queue is empty")
  97. return
  98. if datetime.now() >= eta:
  99. logger.debug(
  100. "PeriodicWorkController: Time to run %s[%s] (%s)..." % (
  101. task.task_name, task.task_id, eta))
  102. self.bucket_queue.put(task)
  103. else:
  104. logger.debug(
  105. "PeriodicWorkController: ETA not ready for %s[%s] (%s)..." % (
  106. task.task_name, task.task_id, eta))
  107. self.hold_queue.put((task, eta))