浏览代码

Better fix for #496.

   *  The previous fix (14c23c47b93f5d205703d4d6de2ed558c382df45)  did
      not solve the problem.
   1  now really call the re-init of the logging system _every_ fork.
      (moved init code to worker function)
   2  Add initialisation of 2 more locks.  (root logger and logging
      global lock)

Closes #502
Ask Solem 13 年之前
父节点
当前提交
be69f899df
共有 2 个文件被更改,包括 12 次插入10 次删除
  1. 0 10
      celery/apps/worker.py
  2. 12 0
      celery/concurrency/processes/pool.py

+ 0 - 10
celery/apps/worker.py

@@ -176,16 +176,6 @@ class Worker(object):
         print("discard: Erased %d %s from the queue.\n" % (count, what))
 
     def worker_init(self):
-        # Re-init the logging system.
-        # Workaround for http://bugs.python.org/issue6721#msg140215
-        # In short: Pythons logging module uses RLock() objects.
-        # These are broken after fork.
-        # and can cause a deadlock (Issue #496).
-        logger_names = logging.Logger.manager.loggerDict.keys()
-        for logger_name in logger_names:
-            for handler in logging.getLogger(logger_name).handlers:
-                handler.createLock()
-
         # Run the worker init handler.
         # (Usually imports task modules and such.)
         self.loader.init_worker()

+ 12 - 0
celery/concurrency/processes/pool.py

@@ -23,6 +23,7 @@ import collections
 import time
 import signal
 import warnings
+import logging
 
 from multiprocessing import Process, cpu_count, TimeoutError
 from multiprocessing import util
@@ -135,6 +136,17 @@ def soft_timeout_sighandler(signum, frame):
 
 
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
+    # Re-init logging system.
+    # Workaround for http://bugs.python.org/issue6721#msg140215
+    # Python logging module uses RLock() objects which are broken after
+    # fork. This can result in a deadlock (Issue #496).
+    logger_names = logging.Logger.manager.loggerDict.keys()
+    logger_names.append(None)  # for root logger
+    for name in logger_names:
+        for handler in logging.getLogger(name).handlers:
+            handler.createLock()
+    logging._lock = threading.RLock()
+
     pid = os.getpid()
     assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
     put = outqueue.put