瀏覽代碼

Fixes deadlock when producer_pool inherits connection pool from parent.

Ask Solem 12 年之前
父節點
當前提交
970b113b27
共有 2 個文件被更改,包括 15 次插入3 次删除
  1. 11 3
      celery/app/amqp.py
  2. 4 0
      celery/app/base.py

+ 11 - 3
celery/app/amqp.py

@@ -282,6 +282,10 @@ class AMQP(object):
     #: Cached and prepared routing table.
     _rtable = None
 
+    #: Underlying producer pool instance automatically
+    #: set by the :attr:`producer_pool`.
+    _producer_pool = None
+
     def __init__(self, app):
         self.app = app
 
@@ -358,10 +362,14 @@ class AMQP(object):
     def router(self):
         return self.Router()
 
-    @cached_property
+    @property
     def producer_pool(self):
-        return ProducerPool(self.app.pool, limit=self.app.pool.limit,
-                            Producer=self.TaskProducer)
+        if self._producer_pool is None:
+            self._producer_pool = ProducerPool(self.app.pool,
+                limit=self.app.pool.limit,
+                Producer=self.TaskProducer,
+            )
+        return self._producer_pool
     publisher_pool = producer_pool  # compat alias
 
     @cached_property

+ 4 - 0
celery/app/base.py

@@ -336,6 +336,10 @@ class Celery(object):
         if self._pool:
             self._pool.force_close_all()
             self._pool = None
+            amqp = self.amqp
+            if amqp._producer_pool:
+                amqp._producer_pool.force_close_all()
+                amqp._producer_pool = None
 
     def create_task_cls(self):
         """Creates a base task class using default configuration