prefork.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # -*- coding: utf-8 -*-
  2. """Prefork execution pool.
  3. Pool implementation using :mod:`multiprocessing`.
  4. """
  5. from __future__ import absolute_import, unicode_literals
  6. import os
  7. from billiard.common import REMAP_SIGTERM, TERM_SIGNAME
  8. from billiard import forking_enable
  9. from billiard.pool import RUN, CLOSE, Pool as BlockingPool
  10. from celery import platforms
  11. from celery import signals
  12. from celery._state import set_default_app, _set_task_join_will_block
  13. from celery.app import trace
  14. from celery.concurrency.base import BasePool
  15. from celery.five import items
  16. from celery.utils.functional import noop
  17. from celery.utils.log import get_logger
  18. from .asynpool import AsynPool
  19. __all__ = ['TaskPool', 'process_initializer', 'process_destructor']
  20. #: List of signals to reset when a child process starts.
  21. WORKER_SIGRESET = {
  22. 'SIGTERM', 'SIGHUP', 'SIGTTIN', 'SIGTTOU', 'SIGUSR1',
  23. }
  24. #: List of signals to ignore when a child process starts.
  25. if REMAP_SIGTERM:
  26. WORKER_SIGIGNORE = {'SIGINT', TERM_SIGNAME}
  27. else:
  28. WORKER_SIGIGNORE = {'SIGINT'}
  29. logger = get_logger(__name__)
  30. warning, debug = logger.warning, logger.debug
  31. def process_initializer(app, hostname):
  32. """Pool child process initializer.
  33. Initialize the child pool process to ensure the correct
  34. app instance is used and things like logging works.
  35. """
  36. _set_task_join_will_block(True)
  37. platforms.signals.reset(*WORKER_SIGRESET)
  38. platforms.signals.ignore(*WORKER_SIGIGNORE)
  39. platforms.set_mp_process_title('celeryd', hostname=hostname)
  40. # This is for Windows and other platforms not supporting
  41. # fork(). Note that init_worker makes sure it's only
  42. # run once per process.
  43. app.loader.init_worker()
  44. app.loader.init_worker_process()
  45. logfile = os.environ.get('CELERY_LOG_FILE') or None
  46. if logfile and '%i' in logfile.lower():
  47. # logfile path will differ so need to set up logging again.
  48. app.log.already_setup = False
  49. app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0),
  50. logfile,
  51. bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
  52. str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')),
  53. hostname=hostname)
  54. if os.environ.get('FORKED_BY_MULTIPROCESSING'):
  55. # pool did execv after fork
  56. trace.setup_worker_optimizations(app, hostname)
  57. else:
  58. app.set_current()
  59. set_default_app(app)
  60. app.finalize()
  61. trace._tasks = app._tasks # enables fast_trace_task optimization.
  62. # rebuild execution handler for all tasks.
  63. from celery.app.trace import build_tracer
  64. for name, task in items(app.tasks):
  65. task.__trace__ = build_tracer(name, task, app.loader, hostname,
  66. app=app)
  67. from celery.worker import state as worker_state
  68. worker_state.reset_state()
  69. signals.worker_process_init.send(sender=None)
  70. def process_destructor(pid, exitcode):
  71. """Pool child process destructor
  72. Dispatch the :signal:`worker_process_shutdown` signal.
  73. """
  74. signals.worker_process_shutdown.send(
  75. sender=None, pid=pid, exitcode=exitcode,
  76. )
  77. class TaskPool(BasePool):
  78. """Multiprocessing Pool implementation."""
  79. Pool = AsynPool
  80. BlockingPool = BlockingPool
  81. uses_semaphore = True
  82. write_stats = None
  83. def on_start(self):
  84. forking_enable(self.forking_enable)
  85. Pool = (self.BlockingPool if self.options.get('threads', True)
  86. else self.Pool)
  87. P = self._pool = Pool(processes=self.limit,
  88. initializer=process_initializer,
  89. on_process_exit=process_destructor,
  90. enable_timeouts=True,
  91. synack=False,
  92. **self.options)
  93. # Create proxy methods
  94. self.on_apply = P.apply_async
  95. self.maintain_pool = P.maintain_pool
  96. self.terminate_job = P.terminate_job
  97. self.grow = P.grow
  98. self.shrink = P.shrink
  99. self.flush = getattr(P, 'flush', None) # FIXME add to billiard
  100. def restart(self):
  101. self._pool.restart()
  102. self._pool.apply_async(noop)
  103. def did_start_ok(self):
  104. return self._pool.did_start_ok()
  105. def register_with_event_loop(self, loop):
  106. try:
  107. reg = self._pool.register_with_event_loop
  108. except AttributeError:
  109. return
  110. return reg(loop)
  111. def on_stop(self):
  112. """Gracefully stop the pool."""
  113. if self._pool is not None and self._pool._state in (RUN, CLOSE):
  114. self._pool.close()
  115. self._pool.join()
  116. self._pool = None
  117. def on_terminate(self):
  118. """Force terminate the pool."""
  119. if self._pool is not None:
  120. self._pool.terminate()
  121. self._pool = None
  122. def on_close(self):
  123. if self._pool is not None and self._pool._state == RUN:
  124. self._pool.close()
  125. def _get_info(self):
  126. write_stats = getattr(self._pool, 'human_write_stats', None)
  127. return {
  128. 'max-concurrency': self.limit,
  129. 'processes': [p.pid for p in self._pool._pool],
  130. 'max-tasks-per-child': self._pool._maxtasksperchild or 'N/A',
  131. 'put-guarded-by-semaphore': self.putlocks,
  132. 'timeouts': (self._pool.soft_timeout or 0,
  133. self._pool.timeout or 0),
  134. 'writes': write_stats() if write_stats is not None else 'N/A',
  135. }
  136. @property
  137. def num_processes(self):
  138. return self._pool._processes