components.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.components
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. Default worker bootsteps.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. import atexit
  9. import warnings
  10. from kombu.async import Hub as _Hub, get_event_loop, set_event_loop
  11. from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
  12. from kombu.async.timer import Timer as _Timer
  13. from celery import bootsteps
  14. from celery._state import _set_task_join_will_block
  15. from celery.exceptions import ImproperlyConfigured
  16. from celery.five import string_t
  17. from celery.platforms import IS_WINDOWS
  18. from celery.utils.log import worker_logger as logger
  19. __all__ = ['Timer', 'Hub', 'Pool', 'Beat', 'StateDB', 'Consumer']
  20. ERR_B_GREEN = """\
  21. -B option doesn't work with eventlet/gevent pools: \
  22. use standalone beat instead.\
  23. """
  24. W_POOL_SETTING = """
  25. The worker_pool setting should not be used to select the eventlet/gevent
  26. pools, instead you *must use the -P* argument so that patches are applied
  27. as early as possible.
  28. """
  29. class Timer(bootsteps.Step):
  30. """This step initializes the internal timer used by the worker."""
  31. def create(self, w):
  32. if w.use_eventloop:
  33. # does not use dedicated timer thread.
  34. w.timer = _Timer(max_interval=10.0)
  35. else:
  36. if not w.timer_cls:
  37. # Default Timer is set by the pool, as e.g. eventlet
  38. # needs a custom implementation.
  39. w.timer_cls = w.pool_cls.Timer
  40. w.timer = self.instantiate(w.timer_cls,
  41. max_interval=w.timer_precision,
  42. on_timer_error=self.on_timer_error,
  43. on_timer_tick=self.on_timer_tick)
  44. def on_timer_error(self, exc):
  45. logger.error('Timer error: %r', exc, exc_info=True)
  46. def on_timer_tick(self, delay):
  47. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  48. class Hub(bootsteps.StartStopStep):
  49. requires = (Timer,)
  50. def __init__(self, w, **kwargs):
  51. w.hub = None
  52. def include_if(self, w):
  53. return w.use_eventloop
  54. def create(self, w):
  55. w.hub = get_event_loop()
  56. if w.hub is None:
  57. required_hub = getattr(w._conninfo, 'requires_hub', None)
  58. w.hub = set_event_loop((
  59. required_hub if required_hub else _Hub)(w.timer))
  60. self._patch_thread_primitives(w)
  61. return self
  62. def start(self, w):
  63. pass
  64. def stop(self, w):
  65. w.hub.close()
  66. def terminate(self, w):
  67. w.hub.close()
  68. def _patch_thread_primitives(self, w):
  69. # make clock use dummy lock
  70. w.app.clock.mutex = DummyLock()
  71. # multiprocessing's ApplyResult uses this lock.
  72. try:
  73. from billiard import pool
  74. except ImportError: # pragma: no cover
  75. pass
  76. else:
  77. pool.Lock = DummyLock
  78. class Pool(bootsteps.StartStopStep):
  79. """Bootstep managing the worker pool.
  80. Describes how to initialize the worker pool, and starts and stops
  81. the pool during worker startup/shutdown.
  82. Adds attributes:
  83. * autoscale
  84. * pool
  85. * max_concurrency
  86. * min_concurrency
  87. """
  88. requires = (Hub,)
  89. def __init__(self, w, autoscale=None, autoreload=None,
  90. no_execv=False, optimization=None, **kwargs):
  91. if isinstance(autoscale, string_t):
  92. max_c, _, min_c = autoscale.partition(',')
  93. autoscale = [int(max_c), min_c and int(min_c) or 0]
  94. w.autoscale = autoscale
  95. w.pool = None
  96. w.max_concurrency = None
  97. w.min_concurrency = w.concurrency
  98. w.no_execv = no_execv
  99. if w.autoscale:
  100. w.max_concurrency, w.min_concurrency = w.autoscale
  101. self.autoreload_enabled = autoreload
  102. self.optimization = optimization
  103. def close(self, w):
  104. if w.pool:
  105. w.pool.close()
  106. def terminate(self, w):
  107. if w.pool:
  108. w.pool.terminate()
  109. def create(self, w, semaphore=None, max_restarts=None,
  110. green_pools={'eventlet', 'gevent'}):
  111. if w.app.conf.worker_pool in green_pools: # pragma: no cover
  112. warnings.warn(UserWarning(W_POOL_SETTING))
  113. threaded = not w.use_eventloop or IS_WINDOWS
  114. procs = w.min_concurrency
  115. forking_enable = w.no_execv if w.force_execv else True
  116. w.process_task = w._process_task
  117. if not threaded:
  118. semaphore = w.semaphore = LaxBoundedSemaphore(procs)
  119. w._quick_acquire = w.semaphore.acquire
  120. w._quick_release = w.semaphore.release
  121. max_restarts = 100
  122. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  123. w.process_task = w._process_task_sem
  124. allow_restart = self.autoreload_enabled or w.pool_restarts
  125. pool = w.pool = self.instantiate(
  126. w.pool_cls, w.min_concurrency,
  127. initargs=(w.app, w.hostname),
  128. maxtasksperchild=w.max_tasks_per_child,
  129. max_memory_per_child=w.max_memory_per_child,
  130. timeout=w.task_time_limit,
  131. soft_timeout=w.task_soft_time_limit,
  132. putlocks=w.pool_putlocks and threaded,
  133. lost_worker_timeout=w.worker_lost_wait,
  134. threads=threaded,
  135. max_restarts=max_restarts,
  136. allow_restart=allow_restart,
  137. forking_enable=forking_enable,
  138. semaphore=semaphore,
  139. sched_strategy=self.optimization,
  140. app=w.app,
  141. )
  142. _set_task_join_will_block(pool.task_join_will_block)
  143. return pool
  144. def info(self, w):
  145. return {'pool': w.pool.info if w.pool else 'N/A'}
  146. def register_with_event_loop(self, w, hub):
  147. w.pool.register_with_event_loop(hub)
  148. class Beat(bootsteps.StartStopStep):
  149. """Step used to embed a beat process.
  150. This will only be enabled if the ``beat``
  151. argument is set.
  152. """
  153. label = 'Beat'
  154. conditional = True
  155. def __init__(self, w, beat=False, **kwargs):
  156. self.enabled = w.beat = beat
  157. w.beat = None
  158. def create(self, w):
  159. from celery.beat import EmbeddedService
  160. if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
  161. raise ImproperlyConfigured(ERR_B_GREEN)
  162. b = w.beat = EmbeddedService(w.app,
  163. schedule_filename=w.schedule_filename,
  164. scheduler_cls=w.scheduler_cls)
  165. return b
  166. class StateDB(bootsteps.Step):
  167. """This bootstep sets up the workers state db if enabled."""
  168. def __init__(self, w, **kwargs):
  169. self.enabled = w.state_db
  170. w._persistence = None
  171. def create(self, w):
  172. w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
  173. atexit.register(w._persistence.save)
  174. class Consumer(bootsteps.StartStopStep):
  175. last = True
  176. def create(self, w):
  177. if w.max_concurrency:
  178. prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier
  179. else:
  180. prefetch_count = w.concurrency * w.prefetch_multiplier
  181. c = w.consumer = self.instantiate(
  182. w.consumer_cls, w.process_task,
  183. hostname=w.hostname,
  184. send_events=w.send_events,
  185. init_callback=w.ready_callback,
  186. initial_prefetch_count=prefetch_count,
  187. pool=w.pool,
  188. timer=w.timer,
  189. app=w.app,
  190. controller=w,
  191. hub=w.hub,
  192. worker_options=w.options,
  193. disable_rate_limits=w.disable_rate_limits,
  194. prefetch_multiplier=w.prefetch_multiplier,
  195. )
  196. return c