components.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # -*- coding: utf-8 -*-
  2. """Worker-level Bootsteps."""
  3. from __future__ import absolute_import, unicode_literals
  4. import atexit
  5. import warnings
  6. from kombu.async import Hub as _Hub, get_event_loop, set_event_loop
  7. from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
  8. from kombu.async.timer import Timer as _Timer
  9. from celery import bootsteps
  10. from celery._state import _set_task_join_will_block
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.platforms import IS_WINDOWS
  13. from celery.utils.log import worker_logger as logger
  14. __all__ = ['Timer', 'Hub', 'Pool', 'Beat', 'StateDB', 'Consumer']
  15. ERR_B_GREEN = """\
  16. -B option doesn't work with eventlet/gevent pools: \
  17. use standalone beat instead.\
  18. """
  19. W_POOL_SETTING = """
  20. The worker_pool setting shouldn't be used to select the eventlet/gevent
  21. pools, instead you *must use the -P* argument so that patches are applied
  22. as early as possible.
  23. """
  24. class Timer(bootsteps.Step):
  25. """This step initializes the internal timer used by the worker."""
  26. def create(self, w):
  27. if w.use_eventloop:
  28. # does not use dedicated timer thread.
  29. w.timer = _Timer(max_interval=10.0)
  30. else:
  31. if not w.timer_cls:
  32. # Default Timer is set by the pool, as e.g. eventlet
  33. # needs a custom implementation.
  34. w.timer_cls = w.pool_cls.Timer
  35. w.timer = self.instantiate(w.timer_cls,
  36. max_interval=w.timer_precision,
  37. on_error=self.on_timer_error,
  38. on_tick=self.on_timer_tick)
  39. def on_timer_error(self, exc):
  40. logger.error('Timer error: %r', exc, exc_info=True)
  41. def on_timer_tick(self, delay):
  42. logger.debug('Timer wake-up! Next ETA %s secs.', delay)
  43. class Hub(bootsteps.StartStopStep):
  44. requires = (Timer,)
  45. def __init__(self, w, **kwargs):
  46. w.hub = None
  47. def include_if(self, w):
  48. return w.use_eventloop
  49. def create(self, w):
  50. w.hub = get_event_loop()
  51. if w.hub is None:
  52. required_hub = getattr(w._conninfo, 'requires_hub', None)
  53. w.hub = set_event_loop((
  54. required_hub if required_hub else _Hub)(w.timer))
  55. self._patch_thread_primitives(w)
  56. return self
  57. def start(self, w):
  58. pass
  59. def stop(self, w):
  60. w.hub.close()
  61. def terminate(self, w):
  62. w.hub.close()
  63. def _patch_thread_primitives(self, w):
  64. # make clock use dummy lock
  65. w.app.clock.mutex = DummyLock()
  66. # multiprocessing's ApplyResult uses this lock.
  67. try:
  68. from billiard import pool
  69. except ImportError: # pragma: no cover
  70. pass
  71. else:
  72. pool.Lock = DummyLock
  73. class Pool(bootsteps.StartStopStep):
  74. """Bootstep managing the worker pool.
  75. Describes how to initialize the worker pool, and starts and stops
  76. the pool during worker start-up/shutdown.
  77. Adds attributes:
  78. * pool
  79. * max_concurrency
  80. * min_concurrency
  81. """
  82. requires = (Hub,)
  83. def __init__(self, w, optimization=None, **kwargs):
  84. w.pool = None
  85. w.max_concurrency = None
  86. w.min_concurrency = w.concurrency
  87. self.optimization = optimization
  88. def close(self, w):
  89. if w.pool:
  90. w.pool.close()
  91. def terminate(self, w):
  92. if w.pool:
  93. w.pool.terminate()
  94. def create(self, w, semaphore=None, max_restarts=None,
  95. green_pools={'eventlet', 'gevent'}):
  96. if w.app.conf.worker_pool in green_pools: # pragma: no cover
  97. warnings.warn(UserWarning(W_POOL_SETTING))
  98. threaded = not w.use_eventloop or IS_WINDOWS
  99. procs = w.min_concurrency
  100. w.process_task = w._process_task
  101. if not threaded:
  102. semaphore = w.semaphore = LaxBoundedSemaphore(procs)
  103. w._quick_acquire = w.semaphore.acquire
  104. w._quick_release = w.semaphore.release
  105. max_restarts = 100
  106. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  107. w.process_task = w._process_task_sem
  108. allow_restart = w.pool_restarts
  109. pool = w.pool = self.instantiate(
  110. w.pool_cls, w.min_concurrency,
  111. initargs=(w.app, w.hostname),
  112. maxtasksperchild=w.max_tasks_per_child,
  113. max_memory_per_child=w.max_memory_per_child,
  114. timeout=w.task_time_limit,
  115. soft_timeout=w.task_soft_time_limit,
  116. putlocks=w.pool_putlocks and threaded,
  117. lost_worker_timeout=w.worker_lost_wait,
  118. threads=threaded,
  119. max_restarts=max_restarts,
  120. allow_restart=allow_restart,
  121. forking_enable=True,
  122. semaphore=semaphore,
  123. sched_strategy=self.optimization,
  124. app=w.app,
  125. )
  126. _set_task_join_will_block(pool.task_join_will_block)
  127. return pool
  128. def info(self, w):
  129. return {'pool': w.pool.info if w.pool else 'N/A'}
  130. def register_with_event_loop(self, w, hub):
  131. w.pool.register_with_event_loop(hub)
  132. class Beat(bootsteps.StartStopStep):
  133. """Step used to embed a beat process.
  134. This will only be enabled if the ``beat`` argument is set.
  135. """
  136. label = 'Beat'
  137. conditional = True
  138. def __init__(self, w, beat=False, **kwargs):
  139. self.enabled = w.beat = beat
  140. w.beat = None
  141. def create(self, w):
  142. from celery.beat import EmbeddedService
  143. if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
  144. raise ImproperlyConfigured(ERR_B_GREEN)
  145. b = w.beat = EmbeddedService(w.app,
  146. schedule_filename=w.schedule_filename,
  147. scheduler_cls=w.scheduler_cls)
  148. return b
  149. class StateDB(bootsteps.Step):
  150. """This bootstep sets up the workers state db if enabled."""
  151. def __init__(self, w, **kwargs):
  152. self.enabled = w.state_db
  153. w._persistence = None
  154. def create(self, w):
  155. w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
  156. atexit.register(w._persistence.save)
  157. class Consumer(bootsteps.StartStopStep):
  158. last = True
  159. def create(self, w):
  160. if w.max_concurrency:
  161. prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier
  162. else:
  163. prefetch_count = w.concurrency * w.prefetch_multiplier
  164. c = w.consumer = self.instantiate(
  165. w.consumer_cls, w.process_task,
  166. hostname=w.hostname,
  167. send_events=w.send_events,
  168. init_callback=w.ready_callback,
  169. initial_prefetch_count=prefetch_count,
  170. pool=w.pool,
  171. timer=w.timer,
  172. app=w.app,
  173. controller=w,
  174. hub=w.hub,
  175. worker_options=w.options,
  176. disable_rate_limits=w.disable_rate_limits,
  177. prefetch_multiplier=w.prefetch_multiplier,
  178. )
  179. return c