|
@@ -58,6 +58,8 @@ and as such the configuration could not be loaded.
|
|
|
Please set this variable and make it point to
|
|
|
a configuration module."""
|
|
|
|
|
|
+_after_fork_registered = False
|
|
|
+
|
|
|
|
|
|
def app_has_custom(app, attr):
|
|
|
return mro_lookup(app.__class__, attr, stop=(Celery, object),
|
|
@@ -70,6 +72,29 @@ def _unpickle_appattr(reverse_name, args):
|
|
|
return get_current_app()._rgetattr(reverse_name)(*args)
|
|
|
|
|
|
|
|
|
+def _global_after_fork():
|
|
|
+ # Previously every app would call:
|
|
|
+ # `register_after_fork(app, app._after_fork)`
|
|
|
+ # but this created a leak as `register_after_fork` stores concrete object
|
|
|
+ # references and once registered an object cannot be removed without
|
|
|
+ # touching and iterating over the private afterfork registry list.
|
|
|
+ #
|
|
|
+ # See Issue #1949
|
|
|
+ from celery import _state
|
|
|
+ from multiprocessing.util import info
|
|
|
+ for app in _state.apps:
|
|
|
+ try:
|
|
|
+ app._after_fork()
|
|
|
+ except Exception as exc:
|
|
|
+ info('after forker raised exception: %r' % (exc, ), exc_info=1)
|
|
|
+
|
|
|
+
|
|
|
+def _ensure_after_fork():
|
|
|
+ global _after_fork_registered
|
|
|
+ _after_fork_registered = True
|
|
|
+ register_after_fork(_global_after_fork, _global_after_fork)
|
|
|
+
|
|
|
+
|
|
|
class Celery(object):
|
|
|
#: This is deprecated, use :meth:`reduce_keys` instead
|
|
|
Pickler = AppPickler
|
|
@@ -590,7 +615,7 @@ class Celery(object):
|
|
|
@property
|
|
|
def pool(self):
|
|
|
if self._pool is None:
|
|
|
- register_after_fork(self, self._after_fork)
|
|
|
+ _ensure_after_fork()
|
|
|
limit = self.conf.BROKER_POOL_LIMIT
|
|
|
self._pool = self.connection().Pool(limit=limit)
|
|
|
return self._pool
|