Sfoglia il codice sorgente

Use multiprocessing.register_after_fork to deal with fork cleanup

Ask Solem 14 anni fa
parent
commit
711d349996
1 ha cambiato i file con 10 aggiunte e 11 eliminazioni
  1. 10 11
      celery/backends/amqp.py

+ 10 - 11
celery/backends/amqp.py

@@ -275,19 +275,18 @@ class AMQPBackend(BaseDictBackend):
         raise NotImplementedError(
                 "restore_taskset is not supported by this backend.")
 
-    def _set_pool(self):
-        self._pool = self.app.broker_connection().Pool(self.connection_max)
-        self._pool_owner_pid = os.getpid()
-
-    def _reset_after_fork(self):
-        self._pool.force_close_all()
+    def _reset_after_fork(self, *args):
+        if self._pool:
+            self._pool.force_close_all()
+            self._pool = None
 
     @property
     def pool(self):
         if self._pool is None:
-            self._set_pool()
-        elif os.getpid() != self._pool_owner_pid:
-            print("--- RESET POOL AFTER FORK --- ")
-            self._reset_after_fork()
-            self._set_pool()
+            self._pool = self.app.broker_connection().Pool(self.connection_max)
+            try:
+                from multiprocessing.util import register_after_fork
+                register_after_fork(self, self._reset_after_fork)
+            except ImportError:
+                pass
         return self._pool