controllers.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. """
  2. Worker Controller Threads
  3. """
  4. from celery.backends import default_periodic_status_backend
  5. from Queue import Empty as QueueEmpty
  6. from datetime import datetime
  7. from multiprocessing import get_logger
  8. import traceback
  9. import threading
  10. import time
  11. class BackgroundThread(threading.Thread):
  12. """Thread running an infinite loop which for every iteration
  13. calls its :meth:`on_iteration` method.
  14. This also implements graceful shutdown of the thread by providing
  15. the :meth:`stop` method.
  16. """
  17. is_infinite = True
  18. def __init__(self):
  19. super(BackgroundThread, self).__init__()
  20. self._shutdown = threading.Event()
  21. self._stopped = threading.Event()
  22. self.setDaemon(True)
  23. def run(self):
  24. """This is the body of the thread.
  25. To start the thread use :meth:`start` instead.
  26. """
  27. self.on_start()
  28. while self.is_infinite:
  29. if self._shutdown.isSet():
  30. break
  31. self.on_iteration()
  32. self._stopped.set() # indicate that we are stopped
  33. def on_start(self):
  34. """This handler is run at thread start, just before the infinite
  35. loop."""
  36. pass
  37. def on_iteration(self):
  38. """This is the method called for every iteration and must be
  39. implemented by every subclass of :class:`BackgroundThread`."""
  40. raise NotImplementedError(
  41. "InfiniteThreads must implement on_iteration")
  42. def on_stop(self):
  43. """This handler is run when the thread is shutdown."""
  44. pass
  45. def stop(self):
  46. """Gracefully shutdown the thread."""
  47. self.on_stop()
  48. self._shutdown.set()
  49. self._stopped.wait() # block until this thread is done
  50. class Mediator(BackgroundThread):
  51. """Thread continuously sending tasks in the queue to the pool.
  52. .. attribute:: bucket_queue
  53. The task queue, a :class:`Queue.Queue` instance.
  54. .. attribute:: callback
  55. The callback used to process tasks retrieved from the
  56. :attr:`bucket_queue`.
  57. """
  58. def __init__(self, bucket_queue, callback):
  59. super(Mediator, self).__init__()
  60. self.bucket_queue = bucket_queue
  61. self.callback = callback
  62. def on_iteration(self):
  63. logger = get_logger()
  64. try:
  65. logger.debug("Mediator: Trying to get message from bucket_queue")
  66. # This blocks until there's a message in the queue.
  67. task = self.bucket_queue.get(timeout=1)
  68. except QueueEmpty:
  69. logger.debug("Mediator: Bucket queue is empty.")
  70. pass
  71. else:
  72. logger.debug("Mediator: Running callback for task: %s[%s]" % (
  73. task.task_name, task.task_id))
  74. self.callback(task)
  75. class PeriodicWorkController(BackgroundThread):
  76. """A thread that continuously checks if there are
  77. :class:`celery.task.PeriodicTask` tasks waiting for execution,
  78. and executes them. It also finds tasks in the hold queue that is
  79. ready for execution and moves them to the bucket queue.
  80. (Tasks in the hold queue are tasks waiting for retry, or with an
  81. ``eta``/``countdown``.)
  82. """
  83. def __init__(self, bucket_queue, hold_queue):
  84. super(PeriodicWorkController, self).__init__()
  85. self.hold_queue = hold_queue
  86. self.bucket_queue = bucket_queue
  87. def on_start(self):
  88. """Do backend-specific periodic task initialization."""
  89. default_periodic_status_backend.init_periodic_tasks()
  90. def on_iteration(self):
  91. logger = get_logger()
  92. logger.debug("PeriodicWorkController: Running periodic tasks...")
  93. try:
  94. self.run_periodic_tasks()
  95. except Exception, exc:
  96. logger.error(
  97. "PeriodicWorkController got exception: %s\n%s" % (
  98. exc, traceback.format_exc()))
  99. logger.debug("PeriodicWorkController: Processing hold queue...")
  100. self.process_hold_queue()
  101. logger.debug("PeriodicWorkController: Going to sleep...")
  102. time.sleep(1)
  103. def run_periodic_tasks(self):
  104. logger = get_logger()
  105. applied = default_periodic_status_backend.run_periodic_tasks()
  106. for task, task_id in applied:
  107. logger.debug(
  108. "PeriodicWorkController: Periodic task %s applied (%s)" % (
  109. task.name, task_id))
  110. def process_hold_queue(self):
  111. """Finds paused tasks that are ready for execution and move
  112. them to the :attr:`bucket_queue`."""
  113. logger = get_logger()
  114. try:
  115. logger.debug(
  116. "PeriodicWorkController: Getting next task from hold queue..")
  117. task, eta = self.hold_queue.get_nowait()
  118. except QueueEmpty:
  119. logger.debug("PeriodicWorkController: Hold queue is empty")
  120. return
  121. if datetime.now() >= eta:
  122. logger.debug(
  123. "PeriodicWorkController: Time to run %s[%s] (%s)..." % (
  124. task.task_name, task.task_id, eta))
  125. self.bucket_queue.put(task)
  126. else:
  127. logger.debug(
  128. "PeriodicWorkController: ETA not ready for %s[%s] (%s)..." % (
  129. task.task_name, task.task_id, eta))
  130. self.hold_queue.put((task, eta))