mediator.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.mediator
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. The mediator is an internal thread that moves tasks
  6. from an internal :class:`Queue` to the worker pool.
  7. This is only used if rate limits are enabled, as it moves
  8. messages from the rate limited queue (which holds tasks
  9. that are allowed to be processed) to the pool. Disabling
  10. rate limits will also disable this machinery,
  11. and can improve performance.
  12. """
  13. from __future__ import absolute_import
  14. import logging
  15. from Queue import Empty
  16. from celery.app import app_or_default
  17. from celery.bootsteps import StartStopStep
  18. from celery.utils.threads import bgThread
  19. from celery.utils.log import get_logger
  20. from . import components
  21. logger = get_logger(__name__)
  22. class WorkerComponent(StartStopStep):
  23. requires = (components.Pool, components.Queues, )
  24. def __init__(self, w, **kwargs):
  25. w.mediator = None
  26. def include_if(self, w):
  27. return w.start_mediator
  28. def create(self, w):
  29. m = w.mediator = self.instantiate(w.mediator_cls, w.ready_queue,
  30. app=w.app, callback=w.process_task)
  31. return m
  32. class Mediator(bgThread):
  33. """Mediator thread."""
  34. #: The task queue, a :class:`~Queue.Queue` instance.
  35. ready_queue = None
  36. #: Callback called when a task is obtained.
  37. callback = None
  38. def __init__(self, ready_queue, callback, app=None, **kw):
  39. self.app = app_or_default(app)
  40. self.ready_queue = ready_queue
  41. self.callback = callback
  42. self._does_debug = logger.isEnabledFor(logging.DEBUG)
  43. super(Mediator, self).__init__()
  44. def body(self):
  45. try:
  46. task = self.ready_queue.get(timeout=1.0)
  47. except Empty:
  48. return
  49. if self._does_debug:
  50. logger.debug('Mediator: Running callback for task: %s[%s]',
  51. task.name, task.id)
  52. try:
  53. self.callback(task)
  54. except Exception as exc:
  55. logger.error('Mediator callback raised exception %r',
  56. exc, exc_info=True,
  57. extra={'data': {'id': task.id,
  58. 'name': task.name,
  59. 'hostname': task.hostname}})