Explorar o código

AMQP result backend: Reset cached chan on producer when connection lost

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
ab301c2004
Modificáronse 1 ficheiros con 23 adicións e 10 borrados
  1. 23 10
      celery/backends/amqp.py

+ 23 - 10
celery/backends/amqp.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 import os
 import socket
+import threading
 import time
 
 from datetime import timedelta
@@ -69,6 +70,7 @@ class AMQPBackend(BaseDictBackend):
             self.queue_arguments["x-expires"] = int(self.expires * 1000.0)
         self.connection_max = (connection_max or
                                conf.CELERY_AMQP_TASK_RESULT_CONNECTION_MAX)
+        self.mutex = threading.Lock()
 
     def _create_binding(self, task_id):
         name = task_id.replace("-", "")
@@ -106,18 +108,29 @@ class AMQPBackend(BaseDictBackend):
             max_retries=20, interval_start=0, interval_step=1,
             interval_max=1):
         """Send task return value and status."""
-        conn = self.pool.acquire(block=True)
+        self.mutex.acquire()
         try:
-            send = conn.ensure(self, self._publish_result,
-                        max_retries=max_retries,
-                        interval_start=interval_start,
-                        interval_step=interval_step,
-                        interval_max=interval_max)
-            send(conn, task_id, {"task_id": task_id, "status": status,
-                                 "result": self.encode_result(result, status),
-                                 "traceback": traceback})
+            conn = self.pool.acquire(block=True)
+            try:
+
+                def errback(error, delay):
+                    conn._result_producer_chan = None
+                    print("Couldn't send result for %r: %r. Retry in %rs." % (
+                            task_id, error, delay))
+
+                send = conn.ensure(self, self._publish_result,
+                            max_retries=max_retries,
+                            errback=errback,
+                            interval_start=interval_start,
+                            interval_step=interval_step,
+                            interval_max=interval_max)
+                send(conn, task_id, {"task_id": task_id, "status": status,
+                                "result": self.encode_result(result, status),
+                                "traceback": traceback})
+            finally:
+                conn.release()
         finally:
-            conn.release()
+            self.mutex.release()
 
         return result