__init__.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. The Multiprocessing Worker Server
  3. """
  4. import traceback
  5. import logging
  6. from Queue import Queue
  7. from celery import conf
  8. from celery import registry
  9. from celery.log import setup_logger
  10. from celery.beat import ClockServiceThread
  11. from celery.worker.pool import TaskPool
  12. from celery.worker.buckets import TaskBucket
  13. from celery.worker.listener import CarrotListener
  14. from celery.worker.scheduler import Scheduler
  15. from celery.worker.controllers import Mediator, ScheduleController
  16. class WorkController(object):
  17. """Executes tasks waiting in the task queue.
  18. :param concurrency: see :attr:`concurrency`.
  19. :param logfile: see :attr:`logfile`.
  20. :param loglevel: see :attr:`loglevel`.
  21. :param embed_clockservice: see :attr:`run_clockservice`.
  22. :param send_events: see :attr:`send_events`.
  23. .. attribute:: concurrency
  24. The number of simultaneous processes doing work (default:
  25. :const:`celery.conf.DAEMON_CONCURRENCY`)
  26. .. attribute:: loglevel
  27. The loglevel used (default: :const:`logging.INFO`)
  28. .. attribute:: logfile
  29. The logfile used, if no logfile is specified it uses ``stderr``
  30. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  31. .. attribute:: embed_clockservice
  32. If ``True``, celerybeat is embedded, running in the main worker
  33. process as a thread.
  34. .. attribute:: send_events
  35. Enable the sending of monitoring events, these events can be captured
  36. by monitors (celerymon).
  37. .. attribute:: logger
  38. The :class:`logging.Logger` instance used for logging.
  39. .. attribute:: is_detached
  40. Flag describing if the worker is running as a daemon or not.
  41. .. attribute:: pool
  42. The :class:`multiprocessing.Pool` instance used.
  43. .. attribute:: ready_queue
  44. The :class:`Queue.Queue` that holds tasks ready for immediate
  45. processing.
  46. .. attribute:: hold_queue
  47. The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
  48. back the task include waiting for ``eta`` to pass or the task is being
  49. retried.
  50. .. attribute:: schedule_controller
  51. Instance of :class:`celery.worker.controllers.ScheduleController`.
  52. .. attribute:: mediator
  53. Instance of :class:`celery.worker.controllers.Mediator`.
  54. .. attribute:: broker_listener
  55. Instance of :class:`CarrotListener`.
  56. """
  57. loglevel = logging.ERROR
  58. concurrency = conf.DAEMON_CONCURRENCY
  59. logfile = conf.DAEMON_LOG_FILE
  60. _state = None
  61. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  62. send_events=conf.CELERY_SEND_EVENTS,
  63. is_detached=False, embed_clockservice=False):
  64. # Options
  65. self.loglevel = loglevel or self.loglevel
  66. self.concurrency = concurrency or self.concurrency
  67. self.logfile = logfile or self.logfile
  68. self.is_detached = is_detached
  69. self.logger = setup_logger(loglevel, logfile)
  70. self.embed_clockservice = embed_clockservice
  71. self.send_events = send_events
  72. # Queues
  73. if conf.DISABLE_RATE_LIMITS:
  74. self.ready_queue = Queue()
  75. else:
  76. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  77. self.eta_scheduler = Scheduler(self.ready_queue)
  78. self.logger.debug("Instantiating thread components...")
  79. # Threads+Pool
  80. self.schedule_controller = ScheduleController(self.eta_scheduler)
  81. self.pool = TaskPool(self.concurrency, logger=self.logger)
  82. self.broker_listener = CarrotListener(self.ready_queue,
  83. self.eta_scheduler,
  84. logger=self.logger,
  85. send_events=send_events,
  86. initial_prefetch_count=concurrency)
  87. self.mediator = Mediator(self.ready_queue, self.safe_process_task)
  88. self.clockservice = None
  89. if self.embed_clockservice:
  90. self.clockservice = ClockServiceThread(logger=self.logger,
  91. is_detached=self.is_detached)
  92. # The order is important here;
  93. # the first in the list is the first to start,
  94. # and they must be stopped in reverse order.
  95. self.components = filter(None, (self.pool,
  96. self.mediator,
  97. self.schedule_controller,
  98. self.clockservice,
  99. self.broker_listener))
  100. def start(self):
  101. """Starts the workers main loop."""
  102. self._state = "RUN"
  103. try:
  104. for component in self.components:
  105. self.logger.debug("Starting thread %s..." % \
  106. component.__class__.__name__)
  107. component.start()
  108. finally:
  109. self.stop()
  110. def safe_process_task(self, task):
  111. """Same as :meth:`process_task`, but catches all exceptions
  112. the task raises and log them as errors, to make sure the
  113. worker doesn't die."""
  114. try:
  115. try:
  116. self.process_task(task)
  117. except Exception, exc:
  118. self.logger.critical("Internal error %s: %s\n%s" % (
  119. exc.__class__, exc, traceback.format_exc()))
  120. except (SystemExit, KeyboardInterrupt):
  121. self.stop()
  122. def process_task(self, task):
  123. """Process task by sending it to the pool of workers."""
  124. task.execute_using_pool(self.pool, self.loglevel, self.logfile)
  125. def stop(self):
  126. """Gracefully shutdown the worker server."""
  127. if self._state != "RUN":
  128. return
  129. [component.stop() for component in reversed(self.components)]
  130. self._state = "STOP"