__init__.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import os
  4. import platform
  5. import signal as _signal
  6. from celery import platforms
  7. from celery import signals
  8. from celery.app import app_or_default
  9. from celery.concurrency.base import BasePool
  10. from billiard.pool import Pool, RUN, CLOSE
  11. if platform.system() == "Windows": # pragma: no cover
  12. # On Windows os.kill calls TerminateProcess which cannot be
  13. # handled by # any process, so this is needed to terminate the task
  14. # *and its children* (if any).
  15. from ._win import kill_processtree as _kill # noqa
  16. else:
  17. from os import kill as _kill # noqa
  18. #: List of signals to reset when a child process starts.
  19. WORKER_SIGRESET = frozenset(["SIGTERM",
  20. "SIGHUP",
  21. "SIGTTIN",
  22. "SIGTTOU",
  23. "SIGUSR1"])
  24. #: List of signals to ignore when a child process starts.
  25. WORKER_SIGIGNORE = frozenset(["SIGINT"])
  26. def process_initializer(app, hostname):
  27. """Initializes the process so it can be used to process tasks."""
  28. app = app_or_default(app)
  29. app.set_current()
  30. platforms.signals.reset(*WORKER_SIGRESET)
  31. platforms.signals.ignore(*WORKER_SIGIGNORE)
  32. platforms.set_mp_process_title("celeryd", hostname=hostname)
  33. # This is for Windows and other platforms not supporting
  34. # fork(). Note that init_worker makes sure it's only
  35. # run once per process.
  36. app.log.setup(int(os.environ.get("CELERY_LOG_LEVEL", 0)),
  37. os.environ.get("CELERY_LOG_FILE") or None,
  38. bool(os.environ.get("CELERY_LOG_REDIRECT", False)),
  39. str(os.environ.get("CELERY_LOG_REDIRECT_LEVEL")))
  40. app.loader.init_worker()
  41. app.loader.init_worker_process()
  42. app.finalize()
  43. signals.worker_process_init.send(sender=None)
  44. class TaskPool(BasePool):
  45. """Multiprocessing Pool implementation."""
  46. Pool = Pool
  47. requires_mediator = True
  48. uses_semaphore = True
  49. def on_start(self):
  50. """Run the task pool.
  51. Will pre-fork all workers so they're ready to accept tasks.
  52. """
  53. self._pool = self.Pool(processes=self.limit,
  54. initializer=process_initializer,
  55. **self.options)
  56. self.on_apply = self._pool.apply_async
  57. def did_start_ok(self):
  58. return self._pool.did_start_ok()
  59. def on_stop(self):
  60. """Gracefully stop the pool."""
  61. if self._pool is not None and self._pool._state in (RUN, CLOSE):
  62. self._pool.close()
  63. self._pool.join()
  64. self._pool = None
  65. def on_terminate(self):
  66. """Force terminate the pool."""
  67. if self._pool is not None:
  68. self._pool.terminate()
  69. self._pool = None
  70. def on_close(self):
  71. if self._pool is not None and self._pool._state == RUN:
  72. self._pool.close()
  73. def terminate_job(self, pid, signal=None):
  74. _kill(pid, signal or _signal.SIGTERM)
  75. def grow(self, n=1):
  76. return self._pool.grow(n)
  77. def shrink(self, n=1):
  78. return self._pool.shrink(n)
  79. def restart(self):
  80. self._pool.restart()
  81. def _get_info(self):
  82. return {"max-concurrency": self.limit,
  83. "processes": [p.pid for p in self._pool._pool],
  84. "max-tasks-per-child": self._pool._maxtasksperchild,
  85. "put-guarded-by-semaphore": self.putlocks,
  86. "timeouts": (self._pool.soft_timeout, self._pool.timeout)}
  87. def set_on_process_started(self, callback):
  88. self._pool.on_process_created
  89. def _get_on_process_started(self):
  90. return self._pool.on_process_started
  91. def _set_on_process_started(self, fun):
  92. self._pool.on_process_started = fun
  93. on_process_started = property(_get_on_process_started,
  94. _set_on_process_started)
  95. def _get_on_process_down(self):
  96. return self._pool.on_process_down
  97. def _set_on_process_down(self, fun):
  98. self._pool.on_process_down = fun
  99. on_process_down = property(_get_on_process_down,
  100. _set_on_process_down)
  101. @property
  102. def num_processes(self):
  103. return self._pool._processes
  104. @property
  105. def readers(self):
  106. return self._pool.readers
  107. @property
  108. def writers(self):
  109. return self._pool.writers
  110. @property
  111. def timers(self):
  112. return {self._pool.maintain_pool: 30}