فهرست منبع

Reusing queues with maxtasksperchild too error-prone

Ask Solem 11 سال پیش
والد
کامیت
d0217776cc
1فایلهای تغییر یافته به همراه22 افزوده شده و 23 حذف شده
  1. 22 23
      celery/concurrency/asynpool.py

+ 22 - 23
celery/concurrency/asynpool.py

@@ -33,7 +33,7 @@ from time import sleep
 from weakref import WeakValueDictionary, ref
 
 from amqp.utils import promise
-from billiard.pool import RUN, TERMINATE, ACK, NACK, EX_RECYCLE, WorkersJoined
+from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
 from billiard import pool as _pool
 from billiard.einfo import ExceptionInfo
 from billiard.queues import _SimpleQueue
@@ -999,28 +999,27 @@ class AsynPool(_pool.Pool):
         # we cannot reuse the sockets again, because we don't know if
         # the process wrote/read anything frmo them, and if so we cannot
         # restore the message boundaries.
-        if proc.exitcode != EX_RECYCLE:
-            if not job._accepted:
-                # job was not acked, so find another worker to send it to.
-                self._put_back(job)
-            writer = _get_job_writer(job)
-            if writer:
-                self._active_writers.discard(writer)
-                del(writer)
-
-            if not proc.dead:
-                proc.dead = True
-                # Replace queues to avoid reuse
-                before = len(self._queues)
-                try:
-                    queues = self._find_worker_queues(proc)
-                    if self.destroy_queues(queues, proc):
-                        self._queues[self.create_process_queues()] = None
-                except ValueError:
-                    pass
-                    # Not in queue map, make sure sockets are closed.
-                    #self.destroy_queues((proc.inq, proc.outq, proc.synq))
-                assert len(self._queues) == before
+        if not job._accepted:
+            # job was not acked, so find another worker to send it to.
+            self._put_back(job)
+        writer = _get_job_writer(job)
+        if writer:
+            self._active_writers.discard(writer)
+            del(writer)
+
+        if not proc.dead:
+            proc.dead = True
+            # Replace queues to avoid reuse
+            before = len(self._queues)
+            try:
+                queues = self._find_worker_queues(proc)
+                if self.destroy_queues(queues, proc):
+                    self._queues[self.create_process_queues()] = None
+            except ValueError:
+                pass
+                # Not in queue map, make sure sockets are closed.
+                #self.destroy_queues((proc.inq, proc.outq, proc.synq))
+            assert len(self._queues) == before
 
     def destroy_queues(self, queues, proc):
         """Destroy queues that can no longer be used, so that they