Quellcode durchsuchen

Fixes AMQP result backend leak. Depends on celery/kombu@e506b1f59afd4ecf7f8927b0d84ff41f4961b334

Ask Solem vor 11 Jahren
Ursprung
Commit
6f5b7dace9
1 geänderte Dateien mit 15 neuen und 12 gelöschten Zeilen
  1. 15 12
      celery/backends/amqp.py

+ 15 - 12
celery/backends/amqp.py

@@ -37,10 +37,14 @@ def repair_uuid(s):
     return '%s-%s-%s-%s-%s' % (s[:8], s[8:12], s[12:16], s[16:20], s[20:])
 
 
+class NoCacheQueue(Queue):
+    can_cache_declaration = False
+
+
 class AMQPBackend(BaseDictBackend):
     """Publishes results by sending messages."""
     Exchange = Exchange
-    Queue = Queue
+    Queue = NoCacheQueue
     Consumer = Consumer
     Producer = Producer
 
@@ -118,17 +122,16 @@ class AMQPBackend(BaseDictBackend):
 
     def _store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
-        with self.mutex:
-            with self.app.amqp.producer_pool.acquire(block=True) as pub:
-                pub.publish({'task_id': task_id, 'status': status,
-                             'result': self.encode_result(result, status),
-                             'traceback': traceback,
-                             'children': self.current_task_children()},
-                            exchange=self.exchange,
-                            routing_key=task_id.replace('-', ''),
-                            serializer=self.serializer,
-                            retry=True, retry_policy=self.retry_policy,
-                            declare=[self._create_binding(task_id)])
+        with self.app.amqp.producer_pool.acquire(block=True) as producer:
+            producer.publish({'task_id': task_id, 'status': status,
+                              'result': self.encode_result(result, status),
+                              'traceback': traceback,
+                              'children': self.current_task_children()},
+                             exchange=self.exchange,
+                             routing_key=task_id.replace('-', ''),
+                             serializer=self.serializer,
+                             retry=True, retry_policy=self.retry_policy,
+                             declare=[self._create_binding(task_id)])
         return result
 
     def wait_for(self, task_id, timeout=None, cache=True, propagate=True,