ソースを参照

The restart_pool remote control command now requires the CELERYD_POOL_RESTARTS setting to be enabled. Disabling the sentinel event by default reduces the number of sempahores required to run the pool.

Ask Solem 12 年 前
コミット
db1efae951
4 ファイル変更19 行追加1 行削除
  1. 1 0
      celery/app/defaults.py
  2. 6 1
      celery/worker/__init__.py
  3. 10 0
      docs/configuration.rst
  4. 2 0
      docs/userguide/workers.rst

+ 1 - 0
celery/app/defaults.py

@@ -177,6 +177,7 @@ NAMESPACES = {
         'MAX_TASKS_PER_CHILD': Option(type='int'),
         'MAX_TASKS_PER_CHILD': Option(type='int'),
         'POOL': Option(DEFAULT_POOL),
         'POOL': Option(DEFAULT_POOL),
         'POOL_PUTLOCKS': Option(True, type='bool'),
         'POOL_PUTLOCKS': Option(True, type='bool'),
+        'POOL_RESTARTS': Option(False, type='bool'),
         'PREFETCH_MULTIPLIER': Option(4, type='int'),
         'PREFETCH_MULTIPLIER': Option(4, type='int'),
         'STATE_DB': Option(),
         'STATE_DB': Option(),
         'TASK_LOG_FORMAT': Option(DEFAULT_TASK_LOG_FMT),
         'TASK_LOG_FORMAT': Option(DEFAULT_TASK_LOG_FMT),

+ 6 - 1
celery/worker/__init__.py

@@ -84,7 +84,8 @@ class Pool(bootsteps.StartStopComponent):
     name = 'worker.pool'
     name = 'worker.pool'
     requires = ('queues', 'beat', )
     requires = ('queues', 'beat', )
 
 
-    def __init__(self, w, autoscale=None, no_execv=False, **kwargs):
+    def __init__(self, w, autoscale=None, autoreload=False,
+            no_execv=False, **kwargs):
         w.autoscale = autoscale
         w.autoscale = autoscale
         w.pool = None
         w.pool = None
         w.max_concurrency = None
         w.max_concurrency = None
@@ -92,6 +93,7 @@ class Pool(bootsteps.StartStopComponent):
         w.no_execv = no_execv
         w.no_execv = no_execv
         if w.autoscale:
         if w.autoscale:
             w.max_concurrency, w.min_concurrency = w.autoscale
             w.max_concurrency, w.min_concurrency = w.autoscale
+        self.autoreload_enabled = autoreload
 
 
     def on_poll_init(self, pool, hub):
     def on_poll_init(self, pool, hub):
         apply_after = hub.timer.apply_after
         apply_after = hub.timer.apply_after
@@ -148,6 +150,7 @@ class Pool(bootsteps.StartStopComponent):
         if not threaded:
         if not threaded:
             semaphore = w.semaphore = BoundedSemaphore(procs)
             semaphore = w.semaphore = BoundedSemaphore(procs)
             max_restarts = 100
             max_restarts = 100
+        allow_restart = self.autoreload_enabled or w.pool_restarts
         pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
         pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
                             initargs=(w.app, w.hostname),
                             initargs=(w.app, w.hostname),
                             maxtasksperchild=w.max_tasks_per_child,
                             maxtasksperchild=w.max_tasks_per_child,
@@ -157,6 +160,7 @@ class Pool(bootsteps.StartStopComponent):
                             lost_worker_timeout=w.worker_lost_wait,
                             lost_worker_timeout=w.worker_lost_wait,
                             threads=threaded,
                             threads=threaded,
                             max_restarts=max_restarts,
                             max_restarts=max_restarts,
+                            allow_restart=allow_restart,
                             semaphore=semaphore)
                             semaphore=semaphore)
         if w.hub:
         if w.hub:
             w.hub.on_init.append(partial(self.on_poll_init, pool))
             w.hub.on_init.append(partial(self.on_poll_init, pool))
@@ -287,6 +291,7 @@ class WorkController(configurated):
     task_soft_time_limit = from_config()
     task_soft_time_limit = from_config()
     max_tasks_per_child = from_config()
     max_tasks_per_child = from_config()
     pool_putlocks = from_config()
     pool_putlocks = from_config()
+    pool_restarts = from_config()
     force_execv = from_config()
     force_execv = from_config()
     prefetch_multiplier = from_config()
     prefetch_multiplier = from_config()
     state_db = from_config()
     state_db = from_config()

+ 10 - 0
docs/configuration.rst

@@ -1429,6 +1429,16 @@ the built-in aliases: ``processes``, ``eventlet``, ``gevent``.
 
 
 Default is ``processes``.
 Default is ``processes``.
 
 
+.. setting:: CELERYD_POOL_RESTARTS
+
+CELERYD_POOL_RESTARTS
+~~~~~~~~~~~~~~~~~~~~~
+
+If enabled the worker pool can be restarted using the
+:control:`pool_restart` remote control command.
+
+Disabled by default.
+
 .. setting:: CELERYD_AUTOSCALER
 .. setting:: CELERYD_AUTOSCALER
 
 
 CELERYD_AUTOSCALER
 CELERYD_AUTOSCALER

+ 2 - 0
docs/userguide/workers.rst

@@ -575,6 +575,8 @@ Pool Restart Command
 
 
 .. versionadded:: 2.5
 .. versionadded:: 2.5
 
 
+Requires the :setting:`CELERYD_POOL_RESTARTS` setting to be enabled.
+
 The remote control command :control:`pool_restart` sends restart requests to
 The remote control command :control:`pool_restart` sends restart requests to
 the workers child processes.  It is particularly useful for forcing
 the workers child processes.  It is particularly useful for forcing
 the worker to import new modules, or for reloading already imported
 the worker to import new modules, or for reloading already imported