mediator.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.mediator
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. The mediator is an internal thread that moves tasks
  6. from an internal :class:`Queue` to the worker pool.
  7. This is only used if rate limits are enabled, as it moves
  8. messages from the rate limited queue (which holds tasks
  9. that are allowed to be processed) to the pool. Disabling
  10. rate limits will also disable this machinery,
  11. and can improve performance.
  12. :copyright: (c) 2009 - 2011 by Ask Solem.
  13. :license: BSD, see LICENSE for more details.
  14. """
  15. from __future__ import absolute_import
  16. import traceback
  17. from Queue import Empty
  18. from ..app import app_or_default
  19. from ..utils.threads import bgThread
  20. class Mediator(bgThread):
  21. #: The task queue, a :class:`~Queue.Queue` instance.
  22. ready_queue = None
  23. #: Callback called when a task is obtained.
  24. callback = None
  25. def __init__(self, ready_queue, callback, logger=None, app=None):
  26. self.app = app_or_default(app)
  27. self.logger = logger or self.app.log.get_default_logger()
  28. self.ready_queue = ready_queue
  29. self.callback = callback
  30. super(Mediator, self).__init__()
  31. def next(self):
  32. try:
  33. task = self.ready_queue.get(timeout=1.0)
  34. except Empty:
  35. return
  36. if task.revoked():
  37. return
  38. self.logger.debug(
  39. "Mediator: Running callback for task: %s[%s]" % (
  40. task.task_name, task.task_id))
  41. try:
  42. self.callback(task)
  43. except Exception, exc:
  44. self.logger.error("Mediator callback raised exception %r\n%s",
  45. exc, traceback.format_exc(),
  46. exc_info=sys.exc_info(),
  47. extra={"data": {"id": task.task_id,
  48. "name": task.task_name,
  49. "hostname": task.hostname}})
  50. move = next # XXX compat