Browse Source

Adds the CELERYD_MAX_MEMORY_PER_CHILD setting

This allows users to specify the maximum amount of resident
memory that may be consumed by a child process before it will be
replaced by a new child process. If a single task causes a child
process to exceed this limit, the task will be completed and the
child process will be replaced afterwards.

This commit depends on the corresponding commit in the
billiard project that enables this setting.
Dave Smith 10 years ago
parent
commit
5cae0e7541

+ 1 - 0
celery/app/defaults.py

@@ -179,6 +179,7 @@ NAMESPACES = {
         'LOG_FORMAT': Option(DEFAULT_PROCESS_LOG_FMT),
         'LOG_COLOR': Option(type='bool'),
         'MAX_TASKS_PER_CHILD': Option(type='int'),
+        'MAX_MEMORY_PER_CHILD': Option(type='int'),
         'POOL': Option(DEFAULT_POOL),
         'POOL_PUTLOCKS': Option(True, type='bool'),
         'POOL_RESTARTS': Option(False, type='bool'),

+ 10 - 0
celery/bin/worker.py

@@ -109,6 +109,14 @@ The :program:`celery worker` command (previously known as ``celeryd``)
     Maximum number of tasks a pool worker can execute before it's
     terminated and replaced by a new worker.
 
+.. cmdoption:: --maxmemperchild
+
+    Maximum amount of resident memory, in KiB, that may be consumed by a
+    child process before it will be replaced by a new one. If a single
+    task causes a child process to exceed this limit, the task will be
+    completed and the child process will be replaced afterwards.
+    Default: no limit.
+
 .. cmdoption:: --pidfile
 
     Optional file used to store the workers pid.
@@ -244,6 +252,8 @@ class worker(Command):
                    default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
             Option('--prefetch-multiplier', dest='prefetch_multiplier',
                    default=conf.CELERYD_PREFETCH_MULTIPLIER, type='int'),
+            Option('--maxmemperchild', dest='max_memory_per_child',
+                   default=conf.CELERYD_MAX_MEMORY_PER_CHILD, type='int'),
             Option('--queues', '-Q', default=[]),
             Option('--exclude-queues', '-X', default=[]),
             Option('--include', '-I', default=[]),

+ 12 - 0
celery/tests/worker/test_components.py

@@ -34,3 +34,15 @@ class test_Pool(AppCase):
         w.pool = Mock()
         comp.create(w)
         self.assertIs(w.process_task, w._process_task_sem)
+
+    def test_create_calls_instantiate_with_max_memory(self):
+        w = Mock()
+        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
+        comp = Pool(w)
+        comp.instantiate = Mock()
+        w.max_memory_per_child = 32
+
+        comp.create(w)
+
+        self.assertEqual(
+            comp.instantiate.call_args[1]['max_memory_per_child'], 32)

+ 5 - 1
celery/worker/__init__.py

@@ -351,7 +351,8 @@ class WorkController(object):
                        schedule_filename=None, scheduler_cls=None,
                        task_time_limit=None, task_soft_time_limit=None,
                        max_tasks_per_child=None, prefetch_multiplier=None,
-                       disable_rate_limits=None, worker_lost_wait=None, **_kw):
+                       disable_rate_limits=None, worker_lost_wait=None,
+                       max_memory_per_child=None, **_kw):
         self.loglevel = loglevel
         self.logfile = logfile
         self.concurrency = self._getopt('concurrency', concurrency)
@@ -381,6 +382,9 @@ class WorkController(object):
         self.max_tasks_per_child = self._getopt(
             'max_tasks_per_child', max_tasks_per_child,
         )
+        self.max_memory_per_child = self._getopt(
+            'max_memory_per_child', max_memory_per_child,
+        )
         self.prefetch_multiplier = int(self._getopt(
             'prefetch_multiplier', prefetch_multiplier,
         ))

+ 1 - 0
celery/worker/components.py

@@ -156,6 +156,7 @@ class Pool(bootsteps.StartStopStep):
             w.pool_cls, w.min_concurrency,
             initargs=(w.app, w.hostname),
             maxtasksperchild=w.max_tasks_per_child,
+            max_memory_per_child=w.max_memory_per_child,
             timeout=w.task_time_limit,
             soft_timeout=w.task_soft_time_limit,
             putlocks=w.pool_putlocks and threaded,

+ 11 - 0
docs/configuration.rst

@@ -1665,6 +1665,17 @@ CELERYD_MAX_TASKS_PER_CHILD
 Maximum number of tasks a pool worker process can execute before
 it's replaced with a new one.  Default is no limit.
 
+.. setting:: CELERYD_MAX_MEMORY_PER_CHILD
+
+CELERYD_MAX_MEMORY_PER_CHILD
+~~~~~~~~~~~~~~~~~~~~~
+
+Maximum amount of resident memory that may be consumed by a
+worker before it will be replaced by a new worker. If a single
+task causes a worker to exceed this limit, the task will be
+completed, and the worker will be replaced afterwards. Default:
+no limit.
+
 .. setting:: CELERYD_TASK_TIME_LIMIT
 
 CELERYD_TASK_TIME_LIMIT

+ 16 - 0
docs/userguide/workers.rst

@@ -528,6 +528,22 @@ for example from closed source C extensions.
 The option can be set using the workers `--maxtasksperchild` argument
 or using the :setting:`CELERYD_MAX_TASKS_PER_CHILD` setting.
 
+Max memory per child setting
+============================
+
+.. versionadded:: TODO
+
+pool support: *prefork*
+
+With this option you can configure the maximum amount of resident
+memory a worker can execute before it's replaced by a new process.
+
+This is useful if you have memory leaks you have no control over
+for example from closed source C extensions.
+
+The option can be set using the workers `--maxmemperchild` argument
+or using the :setting:`CELERYD_MAX_MEMORY_PER_CHILD` setting.
+
 .. _worker-autoscaling:
 
 Autoscaling