controllers.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """
  2. Worker Controller Threads
  3. """
  4. import time
  5. import threading
  6. from Queue import Empty as QueueEmpty
  7. from celery import conf
  8. from celery import log
  9. class Mediator(threading.Thread):
  10. """Thread continuously sending tasks in the queue to the pool.
  11. .. attribute:: ready_queue
  12. The task queue, a :class:`Queue.Queue` instance.
  13. .. attribute:: callback
  14. The callback used to process tasks retrieved from the
  15. :attr:`ready_queue`.
  16. """
  17. def __init__(self, ready_queue, callback, logger=None):
  18. threading.Thread.__init__(self)
  19. self.logger = logger or log.get_default_logger()
  20. self.ready_queue = ready_queue
  21. self.callback = callback
  22. self._shutdown = threading.Event()
  23. self._stopped = threading.Event()
  24. self.setDaemon(True)
  25. def move(self):
  26. try:
  27. # This blocks until there's a message in the queue.
  28. task = self.ready_queue.get(timeout=1)
  29. except QueueEmpty:
  30. time.sleep(0.2)
  31. else:
  32. if task.revoked():
  33. return
  34. self.logger.debug(
  35. "Mediator: Running callback for task: %s[%s]" % (
  36. task.task_name, task.task_id))
  37. self.callback(task) # execute
  38. def run(self):
  39. while not self._shutdown.isSet():
  40. self.move()
  41. self._stopped.set() # indicate that we are stopped
  42. def stop(self):
  43. """Gracefully shutdown the thread."""
  44. self._shutdown.set()
  45. self._stopped.wait() # block until this thread is done
  46. self.join(1e100)