processes.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.processes
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Pool implementation using :mod:`multiprocessing`.
  6. We use the billiard fork of multiprocessing which contains
  7. numerous improvements.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. from celery import platforms
  12. from celery import signals
  13. from celery._state import set_default_app
  14. from celery.concurrency.base import BasePool
  15. from celery.task import trace
  16. from billiard.pool import Pool, RUN, CLOSE
  17. #: List of signals to reset when a child process starts.
  18. WORKER_SIGRESET = frozenset(['SIGTERM',
  19. 'SIGHUP',
  20. 'SIGTTIN',
  21. 'SIGTTOU',
  22. 'SIGUSR1'])
  23. #: List of signals to ignore when a child process starts.
  24. WORKER_SIGIGNORE = frozenset(['SIGINT'])
  25. def process_initializer(app, hostname):
  26. """Initializes the process so it can be used to process tasks."""
  27. app.set_current()
  28. set_default_app(app)
  29. trace._tasks = app._tasks # make sure this optimization is set.
  30. platforms.signals.reset(*WORKER_SIGRESET)
  31. platforms.signals.ignore(*WORKER_SIGIGNORE)
  32. platforms.set_mp_process_title('celeryd', hostname=hostname)
  33. # This is for Windows and other platforms not supporting
  34. # fork(). Note that init_worker makes sure it's only
  35. # run once per process.
  36. app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0)),
  37. os.environ.get('CELERY_LOG_FILE') or None,
  38. bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
  39. str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
  40. app.loader.init_worker()
  41. app.loader.init_worker_process()
  42. app.finalize()
  43. from celery.task.trace import build_tracer
  44. for name, task in app.tasks.iteritems():
  45. task.__trace__ = build_tracer(name, task, app.loader, hostname)
  46. signals.worker_process_init.send(sender=None)
  47. class TaskPool(BasePool):
  48. """Multiprocessing Pool implementation."""
  49. Pool = Pool
  50. requires_mediator = True
  51. uses_semaphore = True
  52. def on_start(self):
  53. """Run the task pool.
  54. Will pre-fork all workers so they're ready to accept tasks.
  55. """
  56. P = self._pool = self.Pool(processes=self.limit,
  57. initializer=process_initializer,
  58. **self.options)
  59. self.on_apply = P.apply_async
  60. self.on_soft_timeout = P._timeout_handler.on_soft_timeout
  61. self.on_hard_timeout = P._timeout_handler.on_hard_timeout
  62. self.maintain_pool = P.maintain_pool
  63. self.maybe_handle_result = P._result_handler.handle_event
  64. def did_start_ok(self):
  65. return self._pool.did_start_ok()
  66. def on_stop(self):
  67. """Gracefully stop the pool."""
  68. if self._pool is not None and self._pool._state in (RUN, CLOSE):
  69. self._pool.close()
  70. self._pool.join()
  71. self._pool = None
  72. def on_terminate(self):
  73. """Force terminate the pool."""
  74. if self._pool is not None:
  75. self._pool.terminate()
  76. self._pool = None
  77. def on_close(self):
  78. if self._pool is not None and self._pool._state == RUN:
  79. self._pool.close()
  80. def terminate_job(self, pid, signal=None):
  81. return self._pool.terminate_job(pid, signal)
  82. def grow(self, n=1):
  83. return self._pool.grow(n)
  84. def shrink(self, n=1):
  85. return self._pool.shrink(n)
  86. def restart(self):
  87. self._pool.restart()
  88. def _get_info(self):
  89. return {'max-concurrency': self.limit,
  90. 'processes': [p.pid for p in self._pool._pool],
  91. 'max-tasks-per-child': self._pool._maxtasksperchild,
  92. 'put-guarded-by-semaphore': self.putlocks,
  93. 'timeouts': (self._pool.soft_timeout, self._pool.timeout)}
  94. def init_callbacks(self, **kwargs):
  95. for k, v in kwargs.iteritems():
  96. setattr(self._pool, k, v)
  97. def handle_timeouts(self):
  98. if self._pool._timeout_handler:
  99. self._pool._timeout_handler.handle_event()
  100. @property
  101. def num_processes(self):
  102. return self._pool._processes
  103. @property
  104. def readers(self):
  105. return self._pool.readers
  106. @property
  107. def writers(self):
  108. return self._pool.writers
  109. @property
  110. def timers(self):
  111. return {self.maintain_pool: 30.0}