__init__.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. """
  2. The Multiprocessing Worker Server
  3. """
  4. import socket
  5. import logging
  6. import traceback
  7. from multiprocessing.util import Finalize
  8. from celery import beat
  9. from celery import conf
  10. from celery import log
  11. from celery import registry
  12. from celery import platforms
  13. from celery import signals
  14. from celery.utils import noop, instantiate
  15. from celery.worker import state
  16. from celery.worker.buckets import TaskBucket, FastQueue
  17. RUN = 0x1
  18. CLOSE = 0x2
  19. TERMINATE = 0x3
  20. WORKER_SIGRESET = frozenset(["SIGTERM",
  21. "SIGHUP",
  22. "SIGTTIN",
  23. "SIGTTOU"])
  24. WORKER_SIGIGNORE = frozenset(["SIGINT"])
  25. def process_initializer():
  26. """Initializes the process so it can be used to process tasks.
  27. Used for multiprocessing environments.
  28. """
  29. map(platforms.reset_signal, WORKER_SIGRESET)
  30. map(platforms.ignore_signal, WORKER_SIGIGNORE)
  31. platforms.set_mp_process_title("celeryd")
  32. # This is for windows and other platforms not supporting
  33. # fork(). Note that init_worker makes sure it's only
  34. # run once per process.
  35. from celery.loaders import current_loader
  36. current_loader().init_worker()
  37. signals.worker_process_init.send(sender=None)
  38. class WorkController(object):
  39. """Executes tasks waiting in the task queue.
  40. :param concurrency: see :attr:`concurrency`.
  41. :param logfile: see :attr:`logfile`.
  42. :param loglevel: see :attr:`loglevel`.
  43. :param embed_clockservice: see :attr:`embed_clockservice`.
  44. :param send_events: see :attr:`send_events`.
  45. .. attribute:: concurrency
  46. The number of simultaneous processes doing work (default:
  47. ``conf.CELERYD_CONCURRENCY``)
  48. .. attribute:: loglevel
  49. The loglevel used (default: :const:`logging.INFO`)
  50. .. attribute:: logfile
  51. The logfile used, if no logfile is specified it uses ``stderr``
  52. (default: `celery.conf.CELERYD_LOG_FILE`).
  53. .. attribute:: embed_clockservice
  54. If ``True``, celerybeat is embedded, running in the main worker
  55. process as a thread.
  56. .. attribute:: send_events
  57. Enable the sending of monitoring events, these events can be captured
  58. by monitors (celerymon).
  59. .. attribute:: logger
  60. The :class:`logging.Logger` instance used for logging.
  61. .. attribute:: pool
  62. The :class:`multiprocessing.Pool` instance used.
  63. .. attribute:: ready_queue
  64. The :class:`Queue.Queue` that holds tasks ready for immediate
  65. processing.
  66. .. attribute:: schedule_controller
  67. Instance of :class:`celery.worker.controllers.ScheduleController`.
  68. .. attribute:: mediator
  69. Instance of :class:`celery.worker.controllers.Mediator`.
  70. .. attribute:: listener
  71. Instance of :class:`CarrotListener`.
  72. """
  73. loglevel = logging.ERROR
  74. concurrency = conf.CELERYD_CONCURRENCY
  75. logfile = conf.CELERYD_LOG_FILE
  76. _state = None
  77. _running = 0
  78. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  79. send_events=conf.SEND_EVENTS, hostname=None,
  80. ready_callback=noop, embed_clockservice=False,
  81. pool_cls=conf.CELERYD_POOL, listener_cls=conf.CELERYD_LISTENER,
  82. mediator_cls=conf.CELERYD_MEDIATOR,
  83. eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
  84. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
  85. task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
  86. task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
  87. max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
  88. pool_putlocks=conf.CELERYD_POOL_PUTLOCKS,
  89. db=conf.CELERYD_STATE_DB):
  90. # Options
  91. self.loglevel = loglevel or self.loglevel
  92. self.concurrency = concurrency or self.concurrency
  93. self.logfile = logfile or self.logfile
  94. self.logger = log.get_default_logger()
  95. self.hostname = hostname or socket.gethostname()
  96. self.embed_clockservice = embed_clockservice
  97. self.ready_callback = ready_callback
  98. self.send_events = send_events
  99. self.task_time_limit = task_time_limit
  100. self.task_soft_time_limit = task_soft_time_limit
  101. self.max_tasks_per_child = max_tasks_per_child
  102. self.pool_putlocks = pool_putlocks
  103. self.timer_debug = log.SilenceRepeated(self.logger.debug,
  104. max_iterations=10)
  105. self.db = db
  106. self._finalize = Finalize(self, self.stop, exitpriority=1)
  107. if self.db:
  108. persistence = state.Persistent(self.db)
  109. Finalize(persistence, persistence.save, exitpriority=5)
  110. # Queues
  111. if conf.DISABLE_RATE_LIMITS:
  112. self.ready_queue = FastQueue()
  113. else:
  114. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  115. self.logger.debug("Instantiating thread components...")
  116. # Threads + Pool + Consumer
  117. self.pool = instantiate(pool_cls, self.concurrency,
  118. logger=self.logger,
  119. initializer=process_initializer,
  120. maxtasksperchild=self.max_tasks_per_child,
  121. timeout=self.task_time_limit,
  122. soft_timeout=self.task_soft_time_limit,
  123. putlocks=self.pool_putlocks)
  124. self.mediator = instantiate(mediator_cls, self.ready_queue,
  125. callback=self.process_task,
  126. logger=self.logger)
  127. self.scheduler = instantiate(eta_scheduler_cls,
  128. precision=conf.CELERYD_ETA_SCHEDULER_PRECISION,
  129. on_error=self.on_timer_error,
  130. on_tick=self.on_timer_tick)
  131. self.beat = None
  132. if self.embed_clockservice:
  133. self.beat = beat.EmbeddedService(logger=self.logger,
  134. schedule_filename=schedule_filename)
  135. prefetch_count = self.concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
  136. self.listener = instantiate(listener_cls,
  137. self.ready_queue,
  138. self.scheduler,
  139. logger=self.logger,
  140. hostname=self.hostname,
  141. send_events=self.send_events,
  142. init_callback=self.ready_callback,
  143. initial_prefetch_count=prefetch_count,
  144. pool=self.pool)
  145. # The order is important here;
  146. # the first in the list is the first to start,
  147. # and they must be stopped in reverse order.
  148. self.components = filter(None, (self.pool,
  149. self.mediator,
  150. self.scheduler,
  151. self.beat,
  152. self.listener))
  153. def start(self):
  154. """Starts the workers main loop."""
  155. self._state = RUN
  156. for i, component in enumerate(self.components):
  157. self.logger.debug("Starting thread %s..." % (
  158. component.__class__.__name__))
  159. self._running = i + 1
  160. component.start()
  161. def process_task(self, wrapper):
  162. """Process task by sending it to the pool of workers."""
  163. try:
  164. try:
  165. wrapper.task.execute(wrapper, self.pool,
  166. self.loglevel, self.logfile)
  167. except Exception, exc:
  168. self.logger.critical("Internal error %s: %s\n%s" % (
  169. exc.__class__, exc, traceback.format_exc()))
  170. except (SystemExit, KeyboardInterrupt):
  171. self.stop()
  172. def stop(self):
  173. """Graceful shutdown of the worker server."""
  174. self._shutdown(warm=True)
  175. def terminate(self):
  176. """Not so graceful shutdown of the worker server."""
  177. self._shutdown(warm=False)
  178. def _shutdown(self, warm=True):
  179. """Gracefully shutdown the worker server."""
  180. what = (warm and "stopping" or "terminating").capitalize()
  181. if self._state != RUN or self._running != len(self.components):
  182. # Not fully started, can safely exit.
  183. return
  184. self._state = CLOSE
  185. signals.worker_shutdown.send(sender=self)
  186. for component in reversed(self.components):
  187. self.logger.debug("%s thread %s..." % (
  188. what, component.__class__.__name__))
  189. stop = component.stop
  190. if not warm:
  191. stop = getattr(component, "terminate", stop)
  192. stop()
  193. self.listener.close_connection()
  194. self._state = TERMINATE
  195. def on_timer_error(self, exc_info):
  196. _, exc, _ = exc_info
  197. self.logger.error("Timer error: %r" % (exc, ))
  198. def on_timer_tick(self, delay):
  199. self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)