controllers.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. """
  2. Worker Controller Threads
  3. """
  4. import logging
  5. import sys
  6. import threading
  7. import traceback
  8. from Queue import Empty
  9. from celery.app import app_or_default
  10. from celery.utils.compat import log_with_extra
  11. class Mediator(threading.Thread):
  12. """Thread continuously moving tasks from the ready queue into the pool."""
  13. #: The task queue, a :class:`~Queue.Queue` instance.
  14. ready_queue = None
  15. #: Callback called when a task is obtained.
  16. callback = None
  17. def __init__(self, ready_queue, callback, logger=None, app=None):
  18. threading.Thread.__init__(self)
  19. self.app = app_or_default(app)
  20. self.logger = logger or self.app.log.get_default_logger()
  21. self.ready_queue = ready_queue
  22. self.callback = callback
  23. self._shutdown = threading.Event()
  24. self._stopped = threading.Event()
  25. self.setDaemon(True)
  26. self.setName(self.__class__.__name__)
  27. def move(self):
  28. try:
  29. task = self.ready_queue.get(timeout=1.0)
  30. except Empty:
  31. return
  32. if task.revoked():
  33. return
  34. self.logger.debug(
  35. "Mediator: Running callback for task: %s[%s]" % (
  36. task.task_name, task.task_id))
  37. try:
  38. self.callback(task)
  39. except Exception, exc:
  40. log_with_extra(self.logger, logging.ERROR,
  41. "Mediator callback raised exception %r\n%s" % (
  42. exc, traceback.format_exc()),
  43. exc_info=sys.exc_info(),
  44. extra={"data": {"hostname": task.hostname,
  45. "id": task.task_id,
  46. "name": task.task_name}})
  47. def run(self):
  48. """Move tasks forver or until :meth:`stop` is called."""
  49. while not self._shutdown.isSet():
  50. self.move()
  51. self._stopped.set()
  52. def stop(self):
  53. """Gracefully shutdown the thread."""
  54. self._shutdown.set()
  55. self._stopped.wait()
  56. self.join(1e10)