__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. """
  2. The Multiprocessing Worker Server
  3. """
  4. import socket
  5. import logging
  6. import traceback
  7. from multiprocessing.util import Finalize
  8. from celery import beat
  9. from celery import registry
  10. from celery import platforms
  11. from celery import signals
  12. from celery.app import app_or_default
  13. from celery.log import SilenceRepeated
  14. from celery.utils import noop, instantiate
  15. from celery.worker import state
  16. from celery.worker.buckets import TaskBucket, FastQueue
  17. RUN = 0x1
  18. CLOSE = 0x2
  19. TERMINATE = 0x3
  20. WORKER_SIGRESET = frozenset(["SIGTERM",
  21. "SIGHUP",
  22. "SIGTTIN",
  23. "SIGTTOU"])
  24. WORKER_SIGIGNORE = frozenset(["SIGINT"])
  25. def process_initializer(app, hostname):
  26. """Initializes the process so it can be used to process tasks.
  27. Used for multiprocessing environments.
  28. """
  29. app = app_or_default(app)
  30. [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
  31. [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
  32. platforms.set_mp_process_title("celeryd", hostname=hostname)
  33. # This is for windows and other platforms not supporting
  34. # fork(). Note that init_worker makes sure it's only
  35. # run once per process.
  36. app.loader.init_worker()
  37. signals.worker_process_init.send(sender=None)
  38. class WorkController(object):
  39. """Executes tasks waiting in the task queue.
  40. :param concurrency: see :attr:`concurrency`.
  41. :param logfile: see :attr:`logfile`.
  42. :param loglevel: see :attr:`loglevel`.
  43. :param embed_clockservice: see :attr:`embed_clockservice`.
  44. :param send_events: see :attr:`send_events`.
  45. .. attribute:: concurrency
  46. The number of simultaneous processes doing work (default:
  47. :setting:`CELERYD_CONCURRENCY`)
  48. .. attribute:: loglevel
  49. The loglevel used (default: :const:`logging.INFO`)
  50. .. attribute:: logfile
  51. The logfile used, if no logfile is specified it uses `stderr`
  52. (default: :setting:`CELERYD_LOG_FILE`).
  53. .. attribute:: embed_clockservice
  54. If :const:`True`, celerybeat is embedded, running in the main worker
  55. process as a thread.
  56. .. attribute:: send_events
  57. Enable the sending of monitoring events, these events can be captured
  58. by monitors (celerymon).
  59. .. attribute:: logger
  60. The :class:`logging.Logger` instance used for logging.
  61. .. attribute:: pool
  62. The :class:`multiprocessing.Pool` instance used.
  63. .. attribute:: ready_queue
  64. The :class:`Queue.Queue` that holds tasks ready for immediate
  65. processing.
  66. .. attribute:: schedule_controller
  67. Instance of :class:`celery.worker.controllers.ScheduleController`.
  68. .. attribute:: mediator
  69. Instance of :class:`celery.worker.controllers.Mediator`.
  70. .. attribute:: listener
  71. Instance of :class:`CarrotListener`.
  72. """
  73. loglevel = logging.ERROR
  74. _state = None
  75. _running = 0
  76. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  77. send_events=None, hostname=None, ready_callback=noop,
  78. embed_clockservice=False, pool_cls=None, listener_cls=None,
  79. mediator_cls=None, eta_scheduler_cls=None,
  80. schedule_filename=None, task_time_limit=None,
  81. task_soft_time_limit=None, max_tasks_per_child=None,
  82. pool_putlocks=None, db=None, prefetch_multiplier=None,
  83. eta_scheduler_precision=None, queues=None,
  84. disable_rate_limits=None, autoscale=None,
  85. autoscaler_cls=None, scheduler_cls=None, app=None):
  86. self.app = app_or_default(app)
  87. conf = self.app.conf
  88. # Options
  89. self.loglevel = loglevel or self.loglevel
  90. self.concurrency = concurrency or conf.CELERYD_CONCURRENCY
  91. self.logfile = logfile or conf.CELERYD_LOG_FILE
  92. self.logger = self.app.log.get_default_logger()
  93. if send_events is None:
  94. send_events = conf.CELERY_SEND_EVENTS
  95. self.send_events = send_events
  96. self.pool_cls = pool_cls or conf.CELERYD_POOL
  97. self.listener_cls = listener_cls or conf.CELERYD_LISTENER
  98. self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
  99. self.eta_scheduler_cls = eta_scheduler_cls or \
  100. conf.CELERYD_ETA_SCHEDULER
  101. self.autoscaler_cls = autoscaler_cls or \
  102. conf.CELERYD_AUTOSCALER
  103. self.schedule_filename = schedule_filename or \
  104. conf.CELERYBEAT_SCHEDULE_FILENAME
  105. self.scheduler_cls = scheduler_cls or conf.CELERYBEAT_SCHEDULER
  106. self.hostname = hostname or socket.gethostname()
  107. self.embed_clockservice = embed_clockservice
  108. self.ready_callback = ready_callback
  109. self.task_time_limit = task_time_limit or \
  110. conf.CELERYD_TASK_TIME_LIMIT
  111. self.task_soft_time_limit = task_soft_time_limit or \
  112. conf.CELERYD_TASK_SOFT_TIME_LIMIT
  113. self.max_tasks_per_child = max_tasks_per_child or \
  114. conf.CELERYD_MAX_TASKS_PER_CHILD
  115. self.pool_putlocks = pool_putlocks or \
  116. conf.CELERYD_POOL_PUTLOCKS
  117. self.eta_scheduler_precision = eta_scheduler_precision or \
  118. conf.CELERYD_ETA_SCHEDULER_PRECISION
  119. self.prefetch_multiplier = prefetch_multiplier or \
  120. conf.CELERYD_PREFETCH_MULTIPLIER
  121. self.timer_debug = SilenceRepeated(self.logger.debug,
  122. max_iterations=10)
  123. self.db = db or conf.CELERYD_STATE_DB
  124. self.disable_rate_limits = disable_rate_limits or \
  125. conf.CELERY_DISABLE_RATE_LIMITS
  126. self.queues = queues
  127. self._finalize = Finalize(self, self.stop, exitpriority=1)
  128. if self.db:
  129. persistence = state.Persistent(self.db)
  130. Finalize(persistence, persistence.save, exitpriority=5)
  131. # Queues
  132. if disable_rate_limits:
  133. self.ready_queue = FastQueue()
  134. self.ready_queue.put = self.process_task
  135. else:
  136. self.ready_queue = TaskBucket(task_registry=registry.tasks)
  137. self.logger.debug("Instantiating thread components...")
  138. # Threads + Pool + Consumer
  139. self.autoscaler = None
  140. max_concurrency = None
  141. min_concurrency = concurrency
  142. if autoscale:
  143. max_concurrency, min_concurrency = autoscale
  144. self.pool = instantiate(self.pool_cls, min_concurrency,
  145. logger=self.logger,
  146. initializer=process_initializer,
  147. initargs=(self.app, self.hostname),
  148. maxtasksperchild=self.max_tasks_per_child,
  149. timeout=self.task_time_limit,
  150. soft_timeout=self.task_soft_time_limit,
  151. putlocks=self.pool_putlocks)
  152. if autoscale:
  153. self.autoscaler = instantiate(self.autoscaler_cls, self.pool,
  154. max_concurrency=max_concurrency,
  155. min_concurrency=min_concurrency,
  156. logger=self.logger)
  157. self.mediator = None
  158. if not disable_rate_limits:
  159. self.mediator = instantiate(self.mediator_cls, self.ready_queue,
  160. app=self.app,
  161. callback=self.process_task,
  162. logger=self.logger)
  163. self.scheduler = instantiate(self.eta_scheduler_cls,
  164. precision=eta_scheduler_precision,
  165. on_error=self.on_timer_error,
  166. on_tick=self.on_timer_tick)
  167. self.beat = None
  168. if self.embed_clockservice:
  169. self.beat = beat.EmbeddedService(app=self.app,
  170. logger=self.logger,
  171. schedule_filename=self.schedule_filename,
  172. scheduler_cls=self.scheduler_cls)
  173. prefetch_count = self.concurrency * self.prefetch_multiplier
  174. self.listener = instantiate(self.listener_cls,
  175. self.ready_queue,
  176. self.scheduler,
  177. logger=self.logger,
  178. hostname=self.hostname,
  179. send_events=self.send_events,
  180. init_callback=self.ready_callback,
  181. initial_prefetch_count=prefetch_count,
  182. pool=self.pool,
  183. queues=self.queues,
  184. app=self.app)
  185. # The order is important here;
  186. # the first in the list is the first to start,
  187. # and they must be stopped in reverse order.
  188. self.components = filter(None, (self.pool,
  189. self.mediator,
  190. self.scheduler,
  191. self.beat,
  192. self.autoscaler,
  193. self.listener))
  194. def start(self):
  195. """Starts the workers main loop."""
  196. self._state = RUN
  197. for i, component in enumerate(self.components):
  198. self.logger.debug("Starting thread %s..." % (
  199. component.__class__.__name__))
  200. self._running = i + 1
  201. component.start()
  202. def process_task(self, wrapper):
  203. """Process task by sending it to the pool of workers."""
  204. try:
  205. try:
  206. wrapper.task.execute(wrapper, self.pool,
  207. self.loglevel, self.logfile)
  208. except Exception, exc:
  209. self.logger.critical("Internal error %s: %s\n%s" % (
  210. exc.__class__, exc, traceback.format_exc()))
  211. except (SystemExit, KeyboardInterrupt):
  212. self.stop()
  213. def stop(self):
  214. """Graceful shutdown of the worker server."""
  215. self._shutdown(warm=True)
  216. def terminate(self):
  217. """Not so graceful shutdown of the worker server."""
  218. self._shutdown(warm=False)
  219. def _shutdown(self, warm=True):
  220. """Gracefully shutdown the worker server."""
  221. what = (warm and "stopping" or "terminating").capitalize()
  222. if self._state != RUN or self._running != len(self.components):
  223. # Not fully started, can safely exit.
  224. return
  225. self._state = CLOSE
  226. signals.worker_shutdown.send(sender=self)
  227. for component in reversed(self.components):
  228. self.logger.debug("%s thread %s..." % (
  229. what, component.__class__.__name__))
  230. stop = component.stop
  231. if not warm:
  232. stop = getattr(component, "terminate", stop)
  233. stop()
  234. self.listener.close_connection()
  235. self._state = TERMINATE
  236. def on_timer_error(self, exc_info):
  237. _, exc, _ = exc_info
  238. self.logger.error("Timer error: %r" % (exc, ))
  239. def on_timer_tick(self, delay):
  240. self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)