mediator.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.mediator
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. The mediator is an internal thread that moves tasks
  6. from an internal :class:`Queue` to the worker pool.
  7. This is only used if rate limits are enabled, as it moves
  8. messages from the rate limited queue (which holds tasks
  9. that are allowed to be processed) to the pool. Disabling
  10. rate limits will also disable this machinery,
  11. and can improve performance.
  12. :copyright: (c) 2009 - 2012 by Ask Solem.
  13. :license: BSD, see LICENSE for more details.
  14. """
  15. from __future__ import absolute_import
  16. import logging
  17. import sys
  18. import traceback
  19. from Queue import Empty
  20. from ..abstract import StartStopComponent
  21. from ..app import app_or_default
  22. from ..utils.threads import bgThread
  23. class WorkerComponent(StartStopComponent):
  24. name = "worker.mediator"
  25. requires = ("pool", "queues", )
  26. def __init__(self, w, **kwargs):
  27. w.mediator = None
  28. def include_if(self, w):
  29. return not w.disable_rate_limits or w.pool_cls.requires_mediator
  30. def create(self, w):
  31. m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,
  32. app=w.app, callback=w.process_task,
  33. logger=w.logger)
  34. return m
  35. class Mediator(bgThread):
  36. #: The task queue, a :class:`~Queue.Queue` instance.
  37. ready_queue = None
  38. #: Callback called when a task is obtained.
  39. callback = None
  40. def __init__(self, ready_queue, callback, logger=None, app=None):
  41. self.app = app_or_default(app)
  42. self.logger = logger or self.app.log.get_default_logger()
  43. self.ready_queue = ready_queue
  44. self.callback = callback
  45. self._does_debug = self.logger.isEnabledFor(logging.DEBUG)
  46. super(Mediator, self).__init__()
  47. def body(self):
  48. try:
  49. task = self.ready_queue.get(timeout=1.0)
  50. except Empty:
  51. return
  52. if task.revoked():
  53. return
  54. if self._does_debug:
  55. self.logger.debug(
  56. "Mediator: Running callback for task: %s[%s]" % (
  57. task.task_name, task.task_id))
  58. try:
  59. self.callback(task)
  60. except Exception, exc:
  61. self.logger.error("Mediator callback raised exception %r\n%s",
  62. exc, traceback.format_exc(),
  63. exc_info=sys.exc_info(),
  64. extra={"data": {"id": task.task_id,
  65. "name": task.task_name,
  66. "hostname": task.hostname}})
  67. move = body # XXX compat