Selaa lähdekoodia

AMQP Result backend: Use connection.ensure when retrying publishing results, and don't create a new channel for every publish

Ask Solem 14 vuotta sitten
vanhempi
commit
a5e15c7ecd
1 muutettua tiedostoa jossa 34 lisäystä ja 18 poistoa
  1. 34 18
      celery/backends/amqp.py

+ 34 - 18
celery/backends/amqp.py

@@ -96,8 +96,25 @@ class AMQPBackend(BaseDictBackend):
     def _create_consumer(self, bindings, channel):
         return self.Consumer(channel, bindings, no_ack=True)
 
-    def store_result(self, task_id, result, status, traceback=None,
-            max_retries=20, retry_delay=0.2):
+    def _publish_result(self, connection, task_id, meta):
+        if hasattr(connection, "_result_producer_chan") and \
+                connection._result_producer_chan is not None and \
+                connection._result_producer_chan.connection is not None:
+            channel = connection._result_producer_chan
+        else:
+            channel = connection_result_producer_chan = connection.channel()
+
+        try:
+            self._create_producer(task_id, channel).publish(meta)
+        finally:
+            channel.close()
+
+    def revive(self, channel):
+        pass
+
+    def _store_result(self, task_id, result, status, traceback=None,
+            max_retries=20, interval_start=0.2, interval_step=1,
+            interval_max=1):
         """Send task return value and status."""
         result = self.encode_result(result, status)
 
@@ -106,23 +123,22 @@ class AMQPBackend(BaseDictBackend):
                 "status": status,
                 "traceback": traceback}
 
-        for i in xrange((max_retries or 0) + 1):
-            conn = self.pool.acquire(block=True)
-            channel = conn.channel()
+        conn = self.pool.acquire(block=True)
+        try:
             try:
-                try:
-                    self._create_producer(task_id, channel).publish(meta)
-                except Exception, exc:
-                    if not max_retries:
-                        raise
-                    warnings.warn(AMQResultWarning(
-                        "Error sending result %s: %r" % (task_id, exc)))
-                    time.sleep(retry_delay)
-                else:
-                    break
-            finally:
-                channel.close()
-                conn.release()
+                conn.ensure(self, self._publish_result,
+                        max_retries=max_retries,
+                        interval_start=interval_start,
+                        interval_step=interval_step,
+                        interval_max=interval_max)(conn, task_id, meta)
+            except Exception, exc:
+                raise
+                if not max_retries:
+                    raise
+                warnings.warn(AMQResultWarning(
+                    "Error sending result %s: %r" % (task_id, exc)))
+        finally:
+            conn.release()
 
         return result