components.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.components
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. Default worker bootsteps.
  6. """
  7. from __future__ import absolute_import
  8. import atexit
  9. from functools import partial
  10. from celery import bootsteps
  11. from celery.exceptions import ImproperlyConfigured
  12. from celery.five import string_t
  13. from celery.utils.log import worker_logger as logger
  14. from celery.utils.timer2 import Schedule
  15. from . import hub
  16. __all__ = ['Timer', 'Hub', 'Queues', 'Pool', 'Beat', 'StateDB', 'Consumer']
  17. ERR_B_GREEN = """\
  18. -B option doesn't work with eventlet/gevent pools: \
  19. use standalone beat instead.\
  20. """
  21. class Timer(bootsteps.Step):
  22. """This step initializes the internal timer used by the worker."""
  23. def create(self, w):
  24. if w.use_eventloop:
  25. # does not use dedicated timer thread.
  26. w.timer = Schedule(max_interval=10.0)
  27. else:
  28. if not w.timer_cls:
  29. # Default Timer is set by the pool, as e.g. eventlet
  30. # needs a custom implementation.
  31. w.timer_cls = w.pool_cls.Timer
  32. w.timer = self.instantiate(w.timer_cls,
  33. max_interval=w.timer_precision,
  34. on_timer_error=self.on_timer_error,
  35. on_timer_tick=self.on_timer_tick)
  36. def on_timer_error(self, exc):
  37. logger.error('Timer error: %r', exc, exc_info=True)
  38. def on_timer_tick(self, delay):
  39. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  40. class Hub(bootsteps.StartStopStep):
  41. requires = (Timer, )
  42. def __init__(self, w, **kwargs):
  43. w.hub = None
  44. def include_if(self, w):
  45. return w.use_eventloop
  46. def create(self, w):
  47. w.hub = hub.Hub(w.timer)
  48. # make clock use dummy lock
  49. w.app.clock.lock = hub.DummyLock()
  50. return w.hub
  51. class Queues(bootsteps.Step):
  52. """This bootstep initializes the internal queues
  53. used by the worker."""
  54. label = 'Queues (intra)'
  55. requires = (Hub, )
  56. def create(self, w):
  57. w.process_task = w._process_task
  58. if w.use_eventloop:
  59. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  60. w.process_task = w._process_task_sem
  61. class Pool(bootsteps.StartStopStep):
  62. """Bootstep managing the worker pool.
  63. Describes how to initialize the worker pool, and starts and stops
  64. the pool during worker startup/shutdown.
  65. Adds attributes:
  66. * autoscale
  67. * pool
  68. * max_concurrency
  69. * min_concurrency
  70. """
  71. requires = (Queues, )
  72. def __init__(self, w, autoscale=None, autoreload=None,
  73. no_execv=False, **kwargs):
  74. if isinstance(autoscale, string_t):
  75. max_c, _, min_c = autoscale.partition(',')
  76. autoscale = [int(max_c), min_c and int(min_c) or 0]
  77. w.autoscale = autoscale
  78. w.pool = None
  79. w.max_concurrency = None
  80. w.min_concurrency = w.concurrency
  81. w.no_execv = no_execv
  82. if w.autoscale:
  83. w.max_concurrency, w.min_concurrency = w.autoscale
  84. self.autoreload_enabled = autoreload
  85. def close(self, w):
  86. if w.pool:
  87. w.pool.close()
  88. def terminate(self, w):
  89. if w.pool:
  90. w.pool.terminate()
  91. def create(self, w, semaphore=None, max_restarts=None):
  92. threaded = not w.use_eventloop
  93. procs = w.min_concurrency
  94. forking_enable = not threaded or (w.no_execv or not w.force_execv)
  95. if not threaded:
  96. semaphore = w.semaphore = hub.BoundedSemaphore(procs)
  97. w._quick_acquire = w.semaphore.acquire
  98. w._quick_release = w.semaphore.release
  99. max_restarts = 100
  100. allow_restart = self.autoreload_enabled or w.pool_restarts
  101. pool = w.pool = self.instantiate(
  102. w.pool_cls, w.min_concurrency,
  103. initargs=(w.app, w.hostname),
  104. maxtasksperchild=w.max_tasks_per_child,
  105. timeout=w.task_time_limit,
  106. soft_timeout=w.task_soft_time_limit,
  107. putlocks=w.pool_putlocks and threaded,
  108. lost_worker_timeout=w.worker_lost_wait,
  109. threads=threaded,
  110. max_restarts=max_restarts,
  111. allow_restart=allow_restart,
  112. forking_enable=forking_enable,
  113. semaphore=semaphore,
  114. )
  115. if w.hub:
  116. w.hub.on_init.append(partial(pool.on_poll_init, w))
  117. return pool
  118. def info(self, w):
  119. return {'pool': w.pool.info}
  120. class Beat(bootsteps.StartStopStep):
  121. """Step used to embed a beat process.
  122. This will only be enabled if the ``beat``
  123. argument is set.
  124. """
  125. label = 'Beat'
  126. conditional = True
  127. def __init__(self, w, beat=False, **kwargs):
  128. self.enabled = w.beat = beat
  129. w.beat = None
  130. def create(self, w):
  131. from celery.beat import EmbeddedService
  132. if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
  133. raise ImproperlyConfigured(ERR_B_GREEN)
  134. b = w.beat = EmbeddedService(app=w.app,
  135. schedule_filename=w.schedule_filename,
  136. scheduler_cls=w.scheduler_cls)
  137. return b
  138. class StateDB(bootsteps.Step):
  139. """This bootstep sets up the workers state db if enabled."""
  140. def __init__(self, w, **kwargs):
  141. self.enabled = w.state_db
  142. w._persistence = None
  143. def create(self, w):
  144. w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
  145. atexit.register(w._persistence.save)
  146. class Consumer(bootsteps.StartStopStep):
  147. last = True
  148. def create(self, w):
  149. prefetch_count = w.concurrency * w.prefetch_multiplier
  150. c = w.consumer = self.instantiate(
  151. w.consumer_cls, w.process_task,
  152. hostname=w.hostname,
  153. send_events=w.send_events,
  154. init_callback=w.ready_callback,
  155. initial_prefetch_count=prefetch_count,
  156. pool=w.pool,
  157. timer=w.timer,
  158. app=w.app,
  159. controller=w,
  160. hub=w.hub,
  161. worker_options=w.options,
  162. disable_rate_limits=w.disable_rate_limits,
  163. )
  164. return c