mediator.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import os
  2. import sys
  3. import threading
  4. import traceback
  5. from Queue import Empty
  6. from celery.app import app_or_default
  7. class Mediator(threading.Thread):
  8. """Thread continuously moving tasks from the ready queue onto the pool."""
  9. #: The task queue, a :class:`~Queue.Queue` instance.
  10. ready_queue = None
  11. #: Callback called when a task is obtained.
  12. callback = None
  13. def __init__(self, ready_queue, callback, logger=None, app=None):
  14. threading.Thread.__init__(self)
  15. self.app = app_or_default(app)
  16. self.logger = logger or self.app.log.get_default_logger()
  17. self.ready_queue = ready_queue
  18. self.callback = callback
  19. self._shutdown = threading.Event()
  20. self._stopped = threading.Event()
  21. self.setDaemon(True)
  22. self.setName(self.__class__.__name__)
  23. def move(self):
  24. try:
  25. task = self.ready_queue.get(timeout=1.0)
  26. except Empty:
  27. return
  28. if task.revoked():
  29. return
  30. self.logger.debug(
  31. "Mediator: Running callback for task: %s[%s]" % (
  32. task.task_name, task.task_id))
  33. try:
  34. self.callback(task)
  35. except Exception, exc:
  36. self.logger.error("Mediator callback raised exception %r\n%s" % (
  37. exc, traceback.format_exc()),
  38. exc_info=sys.exc_info(),
  39. extra={"data": {"id": task.task_id,
  40. "name": task.task_name,
  41. "hostname": task.hostname}})
  42. def run(self):
  43. """Move tasks until :meth:`stop` is called."""
  44. while not self._shutdown.isSet():
  45. try:
  46. self.move()
  47. except Exception, exc:
  48. self.logger.error("Mediator crash: %r" % (exc, ),
  49. exc_info=sys.exc_info())
  50. # exiting by normal means does not work here, so force exit.
  51. os._exit(1)
  52. self._stopped.set()
  53. def stop(self):
  54. """Gracefully shutdown the thread."""
  55. self._shutdown.set()
  56. self._stopped.wait()
  57. self.join(1e10)