Kaynağa Gözat

AMQP Backend: Close connections after fork

Ask Solem 14 yıl önce
ebeveyn
işleme
73f31f295b
1 değiştirilmiş dosya ile 17 ekleme ve 2 silme
  1. 17 2
      celery/backends/amqp.py

+ 17 - 2
celery/backends/amqp.py

@@ -1,4 +1,5 @@
 # -*- coding: utf-8 -*-
+import os
 import socket
 import time
 
@@ -29,6 +30,7 @@ class AMQPBackend(BaseDictBackend):
     Producer = Producer
 
     _pool = None
+    _pool_owner_pid = None
 
     def __init__(self, connection=None, exchange=None, exchange_type=None,
             persistent=None, serializer=None, auto_delete=True,
@@ -253,6 +255,19 @@ class AMQPBackend(BaseDictBackend):
         raise NotImplementedError(
                 "restore_taskset is not supported by this backend.")
 
-    @cached_property
+    def _set_pool(self):
+        self._pool = self.app.broker_connection().Pool(self.connection_max)
+        self._pool_owner_pid = os.getpid()
+
+    def _reset_after_fork(self):
+        self._pool.force_close_all()
+
+    @property
     def pool(self):
-        return self.app.broker_connection().Pool(self.connection_max)
+        if self._pool is None:
+            self._set_pool()
+        elif os.getpid() != self._pool_owner_pid:
+            print("--- RESET POOL AFTER FORK --- ")
+            self._reset_after_fork()
+            self._set_pool()
+        return self._pool