mediator.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.mediator
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. The mediator is an internal thread that moves tasks
  6. from an internal :class:`Queue` to the worker pool.
  7. This is only used if rate limits are enabled, as it moves
  8. messages from the rate limited queue (which holds tasks
  9. that are allowed to be processed) to the pool. Disabling
  10. rate limits will also disable this machinery,
  11. and can improve performance.
  12. """
  13. from __future__ import absolute_import
  14. import logging
  15. from Queue import Empty
  16. from celery.app import app_or_default
  17. from celery.bootsteps import StartStopStep
  18. from celery.utils.threads import bgThread
  19. from celery.utils.log import get_logger
  20. from . import components
  21. logger = get_logger(__name__)
  22. class WorkerComponent(StartStopStep):
  23. label = 'Mediator'
  24. conditional = True
  25. requires = (components.Pool, components.Queues, )
  26. def __init__(self, w, **kwargs):
  27. w.mediator = None
  28. def include_if(self, w):
  29. return w.start_mediator
  30. def create(self, w):
  31. m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,
  32. app=w.app, callback=w.process_task)
  33. return m
  34. class Mediator(bgThread):
  35. """Mediator thread."""
  36. #: The task queue, a :class:`~Queue.Queue` instance.
  37. ready_queue = None
  38. #: Callback called when a task is obtained.
  39. callback = None
  40. def __init__(self, ready_queue, callback, app=None, **kw):
  41. self.app = app_or_default(app)
  42. self.ready_queue = ready_queue
  43. self.callback = callback
  44. self._does_debug = logger.isEnabledFor(logging.DEBUG)
  45. super(Mediator, self).__init__()
  46. def body(self):
  47. try:
  48. task = self.ready_queue.get(timeout=1.0)
  49. except Empty:
  50. return
  51. if self._does_debug:
  52. logger.debug('Mediator: Running callback for task: %s[%s]',
  53. task.name, task.id)
  54. try:
  55. self.callback(task)
  56. except Exception as exc:
  57. logger.error('Mediator callback raised exception %r',
  58. exc, exc_info=True,
  59. extra={'data': {'id': task.id,
  60. 'name': task.name,
  61. 'hostname': task.hostname}})