__init__.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. """
  2. The Multiprocessing Worker Server
  3. """
  4. import traceback
  5. import logging
  6. from Queue import Queue
  7. from celery import conf
  8. from celery import registry
  9. from celery import platform
  10. from celery import signals
  11. from celery.log import setup_logger, _hijack_multiprocessing_logger
  12. from celery.beat import ClockServiceThread
  13. from celery.worker.pool import TaskPool
  14. from celery.worker.buckets import TaskBucket
  15. from celery.worker.listener import CarrotListener
  16. from celery.worker.scheduler import Scheduler
  17. from celery.worker.controllers import Mediator, ScheduleController
  18. def process_initializer():
  19. # There seems to a bug in multiprocessing (backport?)
  20. # when detached, where the worker gets EOFErrors from time to time
  21. # and the logger is left from the parent process causing a crash.
  22. _hijack_multiprocessing_logger()
  23. platform.set_mp_process_title("celeryd")
  24. class WorkController(object):
  25. """Executes tasks waiting in the task queue.
  26. :param concurrency: see :attr:`concurrency`.
  27. :param logfile: see :attr:`logfile`.
  28. :param loglevel: see :attr:`loglevel`.
  29. :param embed_clockservice: see :attr:`run_clockservice`.
  30. :param send_events: see :attr:`send_events`.
  31. .. attribute:: concurrency
  32. The number of simultaneous processes doing work (default:
  33. :const:`celery.conf.CELERYD_CONCURRENCY`)
  34. .. attribute:: loglevel
  35. The loglevel used (default: :const:`logging.INFO`)
  36. .. attribute:: logfile
  37. The logfile used, if no logfile is specified it uses ``stderr``
  38. (default: :const:`celery.conf.CELERYD_LOG_FILE`).
  39. .. attribute:: embed_clockservice
  40. If ``True``, celerybeat is embedded, running in the main worker
  41. process as a thread.
  42. .. attribute:: send_events
  43. Enable the sending of monitoring events, these events can be captured
  44. by monitors (celerymon).
  45. .. attribute:: logger
  46. The :class:`logging.Logger` instance used for logging.
  47. .. attribute:: is_detached
  48. Flag describing if the worker is running as a daemon or not.
  49. .. attribute:: pool
  50. The :class:`multiprocessing.Pool` instance used.
  51. .. attribute:: ready_queue
  52. The :class:`Queue.Queue` that holds tasks ready for immediate
  53. processing.
  54. .. attribute:: hold_queue
  55. The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
  56. back the task include waiting for ``eta`` to pass or the task is being
  57. retried.
  58. .. attribute:: schedule_controller
  59. Instance of :class:`celery.worker.controllers.ScheduleController`.
  60. .. attribute:: mediator
  61. Instance of :class:`celery.worker.controllers.Mediator`.
  62. .. attribute:: listener
  63. Instance of :class:`CarrotListener`.
  64. """
  65. loglevel = logging.ERROR
  66. concurrency = conf.CELERYD_CONCURRENCY
  67. logfile = conf.CELERYD_LOG_FILE
  68. _state = None
  69. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  70. send_events=conf.SEND_EVENTS,
  71. is_detached=False, embed_clockservice=False):
  72. # Options
  73. self.loglevel = loglevel or self.loglevel
  74. self.concurrency = concurrency or self.concurrency
  75. self.logfile = logfile or self.logfile
  76. self.is_detached = is_detached
  77. self.logger = setup_logger(loglevel, logfile)
  78. self.embed_clockservice = embed_clockservice
  79. self.send_events = send_events
  80. # Queues
  81. if conf.DISABLE_RATE_LIMITS:
  82. self.ready_queue = Queue()
  83. else:
  84. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  85. self.eta_schedule = Scheduler(self.ready_queue)
  86. self.logger.debug("Instantiating thread components...")
  87. # Threads + Pool + Consumer
  88. self.pool = TaskPool(self.concurrency,
  89. logger=self.logger,
  90. initializer=process_initializer)
  91. self.mediator = Mediator(self.ready_queue,
  92. callback=self.process_task,
  93. logger=self.logger)
  94. self.scheduler = ScheduleController(self.eta_schedule,
  95. logger=self.logger)
  96. # Need a tight loop interval when embedded so the program
  97. # can be stopped in a sensible short time.
  98. self.clockservice = self.embed_clockservice and ClockServiceThread(
  99. logger=self.logger,
  100. is_detached=self.is_detached,
  101. max_interval=1) or None
  102. self.listener = CarrotListener(self.ready_queue,
  103. self.eta_schedule,
  104. logger=self.logger,
  105. send_events=send_events,
  106. initial_prefetch_count=concurrency)
  107. # The order is important here;
  108. # the first in the list is the first to start,
  109. # and they must be stopped in reverse order.
  110. self.components = filter(None, (self.pool,
  111. self.mediator,
  112. self.scheduler,
  113. self.clockservice,
  114. self.listener))
  115. def start(self):
  116. """Starts the workers main loop."""
  117. self._state = "RUN"
  118. try:
  119. for component in self.components:
  120. self.logger.debug("Starting thread %s..." % \
  121. component.__class__.__name__)
  122. component.start()
  123. finally:
  124. self.stop()
  125. def process_task(self, task):
  126. """Process task by sending it to the pool of workers."""
  127. try:
  128. try:
  129. task.execute_using_pool(self.pool, self.loglevel,
  130. self.logfile)
  131. except Exception, exc:
  132. self.logger.critical("Internal error %s: %s\n%s" % (
  133. exc.__class__, exc, traceback.format_exc()))
  134. except (SystemExit, KeyboardInterrupt):
  135. self.stop()
  136. def stop(self):
  137. """Gracefully shutdown the worker server."""
  138. if self._state != "RUN":
  139. return
  140. signals.worker_shutdown.send(sender=self)
  141. [component.stop() for component in reversed(self.components)]
  142. self._state = "STOP"