Browse Source

Now requires kombu 1.2.1

Ask Solem 13 years ago
parent
commit
fec8cc48c9
5 changed files with 13 additions and 34 deletions
  1. 9 30
      celery/app/amqp.py
  2. 1 1
      celery/tests/test_app/test_app_amqp.py
  3. 1 1
      requirements/default.txt
  4. 1 1
      setup.cfg
  5. 1 1
      setup.py

+ 9 - 30
celery/app/amqp.py

@@ -14,12 +14,12 @@ from datetime import datetime, timedelta
 from kombu import BrokerConnection, Exchange
 from kombu.connection import Resource
 from kombu import compat as messaging
+from kombu.pools import ProducerPool
 from kombu.utils import cached_property
 
 from celery import routes as _routes
 from celery import signals
 from celery.utils import gen_unique_id, textindent
-from celery.utils import promise, maybe_promise
 
 #: List of known options to a Kombu producers send method.
 #: Used to extract the message related options out of any `dict`.
@@ -252,40 +252,19 @@ class TaskPublisher(messaging.Publisher):
             self.close()
 
 
-class PublisherPool(Resource):
+class PublisherPool(ProducerPool):
 
-    def __init__(self, app=None):
+    def __init__(self, app):
         self.app = app
-        super(PublisherPool, self).__init__(limit=self.app.pool.limit)
+        super(PublisherPool, self).__init__(self.app.pool,
+                                            limit=self.app.pool.limit)
 
-    def create_publisher(self):
-        conn = self.app.pool.acquire(block=True)
+    def create_producer(self):
+        conn = self.connections.acquire(block=True)
         pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
-        conn._publisher_chan = pub.channel
-        return pub
-
-    def new(self):
-        return promise(self.create_publisher)
-
-    def setup(self):
-        if self.limit:
-            for _ in xrange(self.limit):
-                self._resource.put_nowait(self.new())
-
-    def prepare(self, publisher):
-        pub = maybe_promise(publisher)
-        if not pub.connection:
-            pub.connection = self.app.pool.acquire(block=True)
-            if not getattr(pub.connection, "_publisher_chan", None):
-                pub.connection._publisher_chan = pub.connection.channel()
-            pub.revive(pub.connection._publisher_chan)
+        conn._producer_chan = pub.channel
         return pub
 
-    def release(self, resource):
-        resource.connection.release()
-        resource.connection = None
-        super(PublisherPool, self).release(resource)
-
 
 class AMQP(object):
     BrokerConnection = BrokerConnection
@@ -371,4 +350,4 @@ class AMQP(object):
 
     @cached_property
     def publisher_pool(self):
-        return PublisherPool(app=self.app)
+        return PublisherPool(self.app)

+ 1 - 1
celery/tests/test_app/test_app_amqp.py

@@ -90,7 +90,7 @@ class test_PublisherPool(AppCase):
 
             p1 = r1 = pool.acquire()
             p2 = r2 = pool.acquire()
-            delattr(r1.connection, "_publisher_chan")
+            delattr(r1.connection, "_producer_chan")
             r1.release()
             r2.release()
             r1 = pool.acquire()

+ 1 - 1
requirements/default.txt

@@ -1,4 +1,4 @@
 python-dateutil>=1.5.0,<2.0.0
 anyjson>=0.3.1
-kombu>=1.2.0,<2.0.0
+kombu>=1.2.1,<2.0.0
 pyparsing>=1.5.0,<2.0.0

+ 1 - 1
setup.cfg

@@ -44,5 +44,5 @@ requires = uuid
            multiprocessing == 2.6.2.1
            python-dateutil <= 1.5.0
            anyjson >= 0.3.1
-           kombu >= 1.2.0
+           kombu >= 1.2.1
            pyparsing >= 1.5.0

+ 1 - 1
setup.py

@@ -49,7 +49,7 @@ except ImportError:
 install_requires.extend([
     "python-dateutil>=1.5.0,<2.0.0",
     "anyjson>=0.3.1",
-    "kombu>=1.2.0,<2.0.0",
+    "kombu>=1.2.1,<2.0.0",
     "pyparsing>=1.5.0,<2.0.0",
 ])
 py_version = sys.version_info