Browse Source

Use kombu.pools for connection/producer pools

Ask Solem 9 years ago
parent
commit
f3ac3173ec
3 changed files with 19 additions and 41 deletions
  1. 3 6
      celery/app/amqp.py
  2. 12 18
      celery/app/base.py
  3. 4 17
      celery/tests/app/test_app.py

+ 3 - 6
celery/app/amqp.py

@@ -15,9 +15,9 @@ from collections import Mapping, namedtuple
 from datetime import timedelta
 from weakref import WeakValueDictionary
 
+from kombu import pools
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import Broadcast
-from kombu.pools import ProducerPool
 from kombu.utils import cached_property
 from kombu.utils.functional import maybe_list
 
@@ -567,11 +567,8 @@ class AMQP(object):
     @property
     def producer_pool(self):
         if self._producer_pool is None:
-            self._producer_pool = ProducerPool(
-                self.app.pool,
-                limit=self.app.pool.limit,
-                Producer=self.Producer,
-            )
+            self._producer_pool = pools.producers[self.app.connection()]
+            self._producer_pool.limit = self.app.pool.limit
         return self._producer_pool
     publisher_pool = producer_pool  # compat alias
 

+ 12 - 18
celery/app/base.py

@@ -17,10 +17,7 @@ from operator import attrgetter
 from functools import wraps
 
 from amqp import starpromise
-try:
-    from billiard.util import register_after_fork
-except ImportError:  # pragma: no cover
-    register_after_fork = None
+from kombu import pools
 from kombu.clocks import LamportClock
 from kombu.common import oid_from
 from kombu.utils import cached_property, uuid
@@ -56,6 +53,11 @@ from .utils import (
 # Load all builtin tasks
 from . import builtins  # noqa
 
+try:
+    from billiard.util import register_after_fork
+except ImportError:  # pragma: no cover
+    register_after_fork = None
+
 __all__ = ['Celery']
 
 _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
@@ -276,8 +278,7 @@ class Celery(object):
         self.close()
 
     def close(self):
-        """Close any open pool connections and do any other steps necessary
-        to clean up after the application.
+        """Clean up after the application.
 
         Only necessary for dynamically created apps for which you can
         use the with statement instead::
@@ -286,7 +287,7 @@ class Celery(object):
                 with app.connection() as conn:
                     pass
         """
-        self._maybe_close_pool()
+        self._pool = None
         _deregister_app(self)
 
     def on_init(self):
@@ -828,16 +829,8 @@ class Celery(object):
         return self._conf
 
     def _after_fork(self, obj_):
-        self._maybe_close_pool()
-
-    def _maybe_close_pool(self):
-        if self._pool:
-            self._pool.force_close_all()
-            self._pool = None
-            amqp = self.__dict__.get('amqp')
-            if amqp is not None and amqp._producer_pool is not None:
-                amqp._producer_pool.force_close_all()
-                amqp._producer_pool = None
+        self._pool = None
+        pools.reset()
 
     def signature(self, *args, **kwargs):
         """Return a new :class:`~celery.canvas.Signature` bound to this app.
@@ -1016,7 +1009,8 @@ class Celery(object):
         if self._pool is None:
             _ensure_after_fork()
             limit = self.conf.broker_pool_limit
-            self._pool = self.connection().Pool(limit=limit)
+            pools.set_limit(limit)
+            self._pool = pools.connections[self.connection()]
         return self._pool
 
     @property

+ 4 - 17
celery/tests/app/test_app.py

@@ -196,20 +196,6 @@ class test_App(AppCase):
         with self.app.connection_or_acquire(pool=False):
             self.assertFalse(self.app.pool._dirty)
 
-    def test_maybe_close_pool(self):
-        cpool = self.app._pool = Mock()
-        amqp = self.app.__dict__['amqp'] = Mock()
-        ppool = amqp._producer_pool
-        self.app._maybe_close_pool()
-        cpool.force_close_all.assert_called_with()
-        ppool.force_close_all.assert_called_with()
-        self.assertIsNone(self.app._pool)
-        self.assertIsNone(self.app.__dict__['amqp']._producer_pool)
-
-        self.app._pool = Mock()
-        self.app._maybe_close_pool()
-        self.app._maybe_close_pool()
-
     def test_using_v1_reduce(self):
         self.app._using_v1_reduce = True
         self.assertTrue(loads(dumps(self.app)))
@@ -790,11 +776,12 @@ class test_App(AppCase):
             my_failover_strategy,
         )
 
-    def test_after_fork(self):
-        p = self.app._pool = Mock()
+    @patch('kombu.pools.reset')
+    def test_after_fork(self, reset):
+        self.app._pool = Mock()
         self.app._after_fork(self.app)
-        p.force_close_all.assert_called_with()
         self.assertIsNone(self.app._pool)
+        reset.assert_called_with()
         self.app._after_fork(self.app)
 
     def test_global_after_fork(self):