__init__.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. """
  2. The Multiprocessing Worker Server
  3. """
  4. import socket
  5. import logging
  6. import traceback
  7. from Queue import Queue
  8. from multiprocessing.util import Finalize
  9. from celery import conf
  10. from celery import registry
  11. from celery import platform
  12. from celery import signals
  13. from celery.log import setup_logger, _hijack_multiprocessing_logger
  14. from celery.beat import EmbeddedClockService
  15. from celery.utils import noop, instantiate
  16. from celery.worker.buckets import TaskBucket, FastQueue
  17. from celery.worker.scheduler import Scheduler
  18. def process_initializer():
  19. # There seems to a bug in multiprocessing (backport?)
  20. # when detached, where the worker gets EOFErrors from time to time
  21. # and the logger is left from the parent process causing a crash.
  22. _hijack_multiprocessing_logger()
  23. platform.reset_signal("SIGTERM")
  24. platform.set_mp_process_title("celeryd")
  25. # This is for windows and other platforms not supporting
  26. # fork(). Note that init_worker makes sure it's only
  27. # run once per process.
  28. from celery.loaders import current_loader
  29. current_loader().init_worker()
  30. class WorkController(object):
  31. """Executes tasks waiting in the task queue.
  32. :param concurrency: see :attr:`concurrency`.
  33. :param logfile: see :attr:`logfile`.
  34. :param loglevel: see :attr:`loglevel`.
  35. :param embed_clockservice: see :attr:`run_clockservice`.
  36. :param send_events: see :attr:`send_events`.
  37. .. attribute:: concurrency
  38. The number of simultaneous processes doing work (default:
  39. :const:`celery.conf.CELERYD_CONCURRENCY`)
  40. .. attribute:: loglevel
  41. The loglevel used (default: :const:`logging.INFO`)
  42. .. attribute:: logfile
  43. The logfile used, if no logfile is specified it uses ``stderr``
  44. (default: :const:`celery.conf.CELERYD_LOG_FILE`).
  45. .. attribute:: embed_clockservice
  46. If ``True``, celerybeat is embedded, running in the main worker
  47. process as a thread.
  48. .. attribute:: send_events
  49. Enable the sending of monitoring events, these events can be captured
  50. by monitors (celerymon).
  51. .. attribute:: logger
  52. The :class:`logging.Logger` instance used for logging.
  53. .. attribute:: pool
  54. The :class:`multiprocessing.Pool` instance used.
  55. .. attribute:: ready_queue
  56. The :class:`Queue.Queue` that holds tasks ready for immediate
  57. processing.
  58. .. attribute:: hold_queue
  59. The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
  60. back the task include waiting for ``eta`` to pass or the task is being
  61. retried.
  62. .. attribute:: schedule_controller
  63. Instance of :class:`celery.worker.controllers.ScheduleController`.
  64. .. attribute:: mediator
  65. Instance of :class:`celery.worker.controllers.Mediator`.
  66. .. attribute:: listener
  67. Instance of :class:`CarrotListener`.
  68. """
  69. loglevel = logging.ERROR
  70. concurrency = conf.CELERYD_CONCURRENCY
  71. logfile = conf.CELERYD_LOG_FILE
  72. _state = None
  73. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  74. send_events=conf.SEND_EVENTS, hostname=None,
  75. ready_callback=noop, embed_clockservice=False,
  76. pool_cls=conf.CELERYD_POOL, listener_cls=conf.CELERYD_LISTENER,
  77. mediator_cls=conf.CELERYD_MEDIATOR,
  78. eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
  79. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
  80. # Options
  81. self.loglevel = loglevel or self.loglevel
  82. self.concurrency = concurrency or self.concurrency
  83. self.logfile = logfile or self.logfile
  84. self.logger = setup_logger(loglevel, logfile)
  85. self.hostname = hostname or socket.gethostname()
  86. self.embed_clockservice = embed_clockservice
  87. self.ready_callback = ready_callback
  88. self.send_events = send_events
  89. self._finalize = Finalize(self, self.stop, exitpriority=20)
  90. # Queues
  91. if conf.DISABLE_RATE_LIMITS:
  92. self.ready_queue = FastQueue()
  93. else:
  94. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  95. self.eta_schedule = Scheduler(self.ready_queue, logger=self.logger)
  96. self.logger.debug("Instantiating thread components...")
  97. # Threads + Pool + Consumer
  98. self.pool = instantiate(pool_cls, self.concurrency,
  99. logger=self.logger,
  100. initializer=process_initializer)
  101. self.mediator = instantiate(mediator_cls, self.ready_queue,
  102. callback=self.process_task,
  103. logger=self.logger)
  104. self.scheduler = instantiate(eta_scheduler_cls,
  105. self.eta_schedule, logger=self.logger)
  106. self.clockservice = None
  107. if self.embed_clockservice:
  108. self.clockservice = EmbeddedClockService(logger=self.logger,
  109. schedule_filename=schedule_filename)
  110. prefetch_count = self.concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
  111. self.listener = instantiate(listener_cls,
  112. self.ready_queue,
  113. self.eta_schedule,
  114. logger=self.logger,
  115. hostname=self.hostname,
  116. send_events=self.send_events,
  117. init_callback=self.ready_callback,
  118. initial_prefetch_count=prefetch_count)
  119. # The order is important here;
  120. # the first in the list is the first to start,
  121. # and they must be stopped in reverse order.
  122. self.components = filter(None, (self.pool,
  123. self.mediator,
  124. self.scheduler,
  125. self.clockservice,
  126. self.listener))
  127. def start(self):
  128. """Starts the workers main loop."""
  129. self._state = "RUN"
  130. try:
  131. for component in self.components:
  132. self.logger.debug("Starting thread %s..." % \
  133. component.__class__.__name__)
  134. component.start()
  135. finally:
  136. self.stop()
  137. def process_task(self, wrapper):
  138. """Process task by sending it to the pool of workers."""
  139. try:
  140. try:
  141. wrapper.task.execute(wrapper, self.pool,
  142. self.loglevel, self.logfile)
  143. except Exception, exc:
  144. self.logger.critical("Internal error %s: %s\n%s" % (
  145. exc.__class__, exc, traceback.format_exc()))
  146. except (SystemExit, KeyboardInterrupt):
  147. self.stop()
  148. def stop(self):
  149. """Gracefully shutdown the worker server."""
  150. if self._state != "RUN":
  151. return
  152. signals.worker_shutdown.send(sender=self)
  153. for component in reversed(self.components):
  154. self.logger.debug("Stopping thread %s..." % (
  155. component.__class__.__name__))
  156. component.stop()
  157. self.listener.close_connection()
  158. self._state = "STOP"
  159. def terminate(self):
  160. """Not so gracefully shutdown the worker server."""
  161. if self._state != "RUN":
  162. return
  163. signals.worker_shutdown.send(sender=self)
  164. for component in reversed(self.components):
  165. self.logger.debug("Terminating thread %s..." % (
  166. component.__class__.__name__))
  167. terminate = getattr(component, "terminate", component.stop)
  168. terminate()
  169. self.listener.close_connection()
  170. self._state = "STOP"