Quellcode durchsuchen

Added experimental PublisherPool implementation

Ask Solem vor 14 Jahren
Ursprung
Commit
186127184e
1 geänderte Dateien mit 35 neuen und 0 gelöschten Zeilen
  1. 35 0
      celery/app/amqp.py

+ 35 - 0
celery/app/amqp.py

@@ -12,11 +12,13 @@ AMQ related functionality.
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 
 
 from kombu import BrokerConnection
 from kombu import BrokerConnection
+from kombu.connection import Resource
 from kombu import compat as messaging
 from kombu import compat as messaging
 
 
 from celery import routes
 from celery import routes
 from celery import signals
 from celery import signals
 from celery.utils import gen_unique_id, textindent, cached_property
 from celery.utils import gen_unique_id, textindent, cached_property
+from celery.utils import promise, maybe_promise
 from celery.utils.compat import UserDict
 from celery.utils.compat import UserDict
 
 
 #: List of known options to a Kombu producers send method.
 #: List of known options to a Kombu producers send method.
@@ -211,6 +213,36 @@ class TaskPublisher(messaging.Publisher):
                                                expires=expires)
                                                expires=expires)
         return task_id
         return task_id
 
 
+    def __exit__(self, *exc_info):
+        try:
+            self.release()
+        except AttributeError:
+            self.close()
+
+
+class PublisherPool(Resource):
+
+    def __init__(self, limit=None, app=None):
+        self.app = app
+        self.connections = self.app.broker_connection().Pool(limit=limit)
+        super(PublisherPool, self).__init__(limit=limit)
+
+    def create_publisher(self):
+        return self.app.amqp.TaskPublisher(self.connections.acquire(),
+                                           auto_declare=False)
+
+    def new(self):
+        return promise(self.create_publisher)
+
+    def setup(self):
+        if self.limit:
+            for _ in xrange(self.limit):
+                self._resource.put_nowait(self.new())
+
+    def prepare(self, publisher):
+        return maybe_promise(publisher)
+
+
 
 
 class AMQP(object):
 class AMQP(object):
     BrokerConnection = BrokerConnection
     BrokerConnection = BrokerConnection
@@ -272,6 +304,9 @@ class AMQP(object):
 
 
         return publisher
         return publisher
 
 
+    def PublisherPool(self, limit=None):
+        return PublisherPool(limit=limit, app=self.app)
+
     def get_task_consumer(self, connection, queues=None, **kwargs):
     def get_task_consumer(self, connection, queues=None, **kwargs):
         """Return consumer configured to consume from all known task
         """Return consumer configured to consume from all known task
         queues."""
         queues."""