Forráskód Böngészése

The worker gets deadlocked waiting for zombie processes, this is an experimental patch to change the way we join processes.

Ask Solem 16 éve
szülő
commit
903b2e7d6e
1 módosított fájl, 32 hozzáadás és 13 törlés
  1. 32 13
      celery/datastructures.py

+ 32 - 13
celery/datastructures.py

@@ -3,7 +3,7 @@
 Custom Datastructures
 
 """
-
+import multiprocessing
 from UserList import UserList
 
 
@@ -73,18 +73,20 @@ class TaskProcessQueue(UserList):
 
     """
 
-    def __init__(self, limit, logger=None, done_msg=None):
+    def __init__(self, limit, process_timeout=None, logger=None,
+            done_msg=None):
         self.limit = limit
         self.logger = logger
         self.done_msg = done_msg
+        self.process_timeout = process_timeout
         self.data = []
 
     def add(self, result, task_name, task_id):
         """Add a process to the queue.
 
-        If the queue is full, it will start to collect return values from
-        the tasks executed. When all return values has been collected,
-        it deletes the current queue and is ready to accept new processes.
+        If the queue is full, it will wait for the first task to finish,
+        collects its result and remove it from the queue, so it's ready
+        to accept new processes.
 
         :param result: A :class:`multiprocessing.AsyncResult` instance, as
             returned by :meth:`multiprocessing.Pool.apply_async`.
@@ -94,14 +96,31 @@ class TaskProcessQueue(UserList):
         :param task_id: Id of the task executed.
 
         """
+
         self.data.append([result, task_name, task_id])
 
         if self.data and len(self.data) >= self.limit:
-            for result, task_name, task_id in self.data:
-                ret_value = result.get()
-                if self.done_msg and self.logger:
-                    self.logger.info(self.done_msg % {
-                        "name": task_name,
-                        "id": task_id,
-                        "return_value": ret_value})
-            self.data = []
+            self.collect()
+
+    def collect(self):
+        """Collect results from processes that are ready."""
+        processes_joined = 0
+        while not processes_joined:
+            for process_no, process_info in enumerate(self.data):
+                result, task_name, task_id = process_info
+                if result.ready():
+                    try:
+                        self.on_ready(result, task_name, task_id)
+                    except multiprocessing.TimeoutError:
+                        pass
+                    else:
+                        del(self[i])
+                        processed_join += 1
+
+    def on_ready(self, result, task_name, task_id):
+        ret_value = result.get(timeout=self.process_timeout)
+        if self.done_msg and self.logger:
+            self.logger_info(self.done_msg % {
+                "name": task_name,
+                "id": task_id,
+                "return_value": ret_value})