瀏覽代碼

Added CELERYD_MAX_TASKS_PER_CHILD / --maxtasksperchild

Maximum number of tasks a pool worker can process before it's terminated
and replaced with a new one.
Ask Solem 15 年之前
父節點
當前提交
5f9e0cae21
共有 5 個文件被更改,包括 50 次插入13 次删除
  1. 8 0
      celery/bin/celeryd.py
  2. 4 1
      celery/worker/__init__.py
  3. 4 12
      celery/worker/pool.py
  4. 29 0
      docs/configuration.rst
  5. 5 0
      docs/reference/celery.conf.rst

+ 8 - 0
celery/bin/celeryd.py

@@ -118,6 +118,11 @@ OPTION_LIST = (
             default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             action="store", type="int", dest="task_soft_time_limit",
             help="Enables a soft time limit for task run times."),
+    optparse.make_option('--maxtasksperchild',
+            default=conf.CELERYD_MAX_TASKS_PER_CHILD,
+            action="store", type="int", dest="max_tasks_per_child",
+            help="Maximum number of tasks a pool worker can execute"
+                 "before it's replaced with a new one."),
 )
 
 
@@ -129,6 +134,7 @@ class Worker(object):
             schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
             task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
             task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
+            max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
             events=False, **kwargs):
         self.concurrency = concurrency or multiprocessing.cpu_count()
         self.loglevel = loglevel
@@ -140,6 +146,7 @@ class Worker(object):
         self.events = events
         self.task_time_limit = task_time_limit
         self.task_soft_time_limit = task_soft_time_limit
+        self.max_tasks_per_child = max_tasks_per_child
         if not isinstance(self.loglevel, int):
             self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
 
@@ -228,6 +235,7 @@ class Worker(object):
                                 embed_clockservice=self.run_clockservice,
                                 schedule_filename=self.schedule,
                                 send_events=self.events,
+                                max_tasks_per_child=self.max_tasks_per_child,
                                 task_time_limit=self.task_time_limit,
                                 task_soft_time_limit=self.task_soft_time_limit)
 

+ 4 - 1
celery/worker/__init__.py

@@ -111,7 +111,8 @@ class WorkController(object):
             eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
             schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
             task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
-            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT):
+            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
+            max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -124,6 +125,7 @@ class WorkController(object):
         self.send_events = send_events
         self.task_time_limit = task_time_limit
         self.task_soft_time_limit = task_soft_time_limit
+        self.max_tasks_per_child = max_tasks_per_child
         self._finalize = Finalize(self, self.stop, exitpriority=20)
 
         # Queues
@@ -139,6 +141,7 @@ class WorkController(object):
         self.pool = instantiate(pool_cls, self.concurrency,
                                 logger=self.logger,
                                 initializer=process_initializer,
+                                maxtasksperchild=self.max_tasks_per_child,
                                 timeout=self.task_time_limit,
                                 soft_timeout=self.task_soft_time_limit)
         self.mediator = instantiate(mediator_cls, self.ready_queue,

+ 4 - 12
celery/worker/pool.py

@@ -28,10 +28,11 @@ class TaskPool(object):
     """
 
     def __init__(self, limit, logger=None, initializer=None,
-            timeout=None, soft_timeout=None):
+            maxtasksperchild=None, timeout=None, soft_timeout=None):
         self.limit = limit
         self.logger = logger or log.get_default_logger()
         self.initializer = initializer
+        self.maxtasksperchild = maxtasksperchild
         self.timeout = timeout
         self.soft_timeout = soft_timeout
         self._pool = None
@@ -45,7 +46,8 @@ class TaskPool(object):
         self._pool = DynamicPool(processes=self.limit,
                                  initializer=self.initializer,
                                  timeout=self.timeout,
-                                 soft_timeout=self.soft_timeout)
+                                 soft_timeout=self.soft_timeout,
+                                 maxtasksperchild=self.maxtasksperchild)
 
     def stop(self):
         """Terminate the pool."""
@@ -53,14 +55,6 @@ class TaskPool(object):
         self._pool.join()
         self._pool = None
 
-    def replace_dead_workers(self):
-        self.logger.debug("TaskPool: Finding dead pool processes...")
-        dead_count = self._pool.replace_dead_workers()
-        if dead_count: # pragma: no cover
-            self.logger.info(
-                "TaskPool: Replaced %d dead pool workers..." % (
-                    dead_count))
-
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, accept_callback=None, timeout_callback=None,
             **compat):
@@ -80,8 +74,6 @@ class TaskPool(object):
         self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
             target, args, kwargs))
 
-        self.replace_dead_workers()
-
         return self._pool.apply_async(target, args, kwargs,
                                       callback=on_ready,
                                       accept_callback=accept_callback,

+ 29 - 0
docs/configuration.rst

@@ -456,6 +456,35 @@ Worker: celeryd
     useful to add tasks if you are not using django or cannot use task
     auto-discovery.
 
+* CELERYD_MAX_TASKS_PER_CHILD
+
+  Maximum number of tasks a pool worker process can execute before
+  it's replaced with a new one. Default is no limit.
+
+* CELERYD_TASK_TIME_LIMIT
+
+    Task hard time limit in seconds. The worker processing the task will
+    be killed and replaced with a new one when this is exceeded.
+
+* CELERYD_SOFT_TASK_TIME_LIMIT
+
+    Task soft time limit in seconds.
+    The :exc:`celery.exceptions.SoftTimeLimitExceeded` exception will be
+    raised when this is exceeded. The task can catch this to
+    e.g. clean up before the hard time limit comes.
+
+    .. code-block:: python
+
+        from celery.decorators import task
+        from celery.exceptions import SoftTimeLimitExceeded
+
+        @task()
+        def mytask():
+            try:
+                return do_work()
+            except SoftTimeLimitExceeded:
+                cleanup_in_a_hurry()
+
 * CELERY_SEND_TASK_ERROR_EMAILS
 
     If set to ``True``, errors in tasks will be sent to admins by e-mail.

+ 5 - 0
docs/reference/celery.conf.rst

@@ -86,6 +86,11 @@ Configuration - celery.conf
 
     Always execute tasks locally, don't send to the queue.
 
+.. data:: EAGER_PROPAGATES_EXCEPTIONS
+
+    If set to ``True``, :func:`celery.execute.apply` will re-raise task exceptions.
+    It's the same as always running apply with ``throw=True``.
+
 .. data:: TASK_RESULT_EXPIRES
 
     Task tombstone expire time in seconds.