Browse Source

Backport of bugfix for detach

Ask Solem 15 years ago
parent
commit
405912f334
2 changed files with 14 additions and 3 deletions
  1. 4 2
      celery/pool.py
  2. 10 1
      celery/worker/__init__.py

+ 4 - 2
celery/pool.py

@@ -193,9 +193,10 @@ class TaskPool(object):
 
     """
 
-    def __init__(self, limit, logger=None):
+    def __init__(self, limit, logger=None, initializer=None):
         self.limit = limit
         self.logger = logger or multiprocessing.get_logger()
+        self.initializer = initializer
         self._pool = None
 
     def start(self):
@@ -204,7 +205,8 @@ class TaskPool(object):
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
-        self._pool = DynamicPool(processes=self.limit)
+        self._pool = DynamicPool(processes=self.limit,
+                                 initializer=self.initializer)
 
     def stop(self):
         """Terminate the pool."""

+ 10 - 1
celery/worker/__init__.py

@@ -166,6 +166,13 @@ class AMQPListener(object):
         return conn
 
 
+def process_initializer():
+    from logging import Logger
+    from multiprocessing import util as mputil
+    Logger.manager.loggerDict.clear()
+    mputil._logger = None
+
+
 class WorkController(object):
     """Executes tasks waiting in the task queue.
 
@@ -249,7 +256,9 @@ class WorkController(object):
         self.periodic_work_controller = PeriodicWorkController(
                                                     self.bucket_queue,
                                                     self.hold_queue)
-        self.pool = TaskPool(self.concurrency, logger=self.logger)
+        self.pool = TaskPool(self.concurrency,
+                             logger=self.logger,
+                             initializer=process_initializer)
         self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
                                           logger=self.logger,
                                           initial_prefetch_count=concurrency)