Browse Source

Process is also dead if .popen is None. Depends on celery/billiard@911f7c2c8f04ed6cfb078637325c436a3561fe75

Ask Solem 11 years ago
parent
commit
00ca6b9006
1 changed files with 7 additions and 7 deletions
  1. 7 7
      celery/concurrency/asynpool.py

+ 7 - 7
celery/concurrency/asynpool.py

@@ -493,7 +493,7 @@ class AsynPool(_pool.Pool):
         waiting_to_start = self._waiting_to_start
         waiting_to_start = self._waiting_to_start
 
 
         def verify_process_alive(proc):
         def verify_process_alive(proc):
-            if proc.exitcode is None and proc in waiting_to_start:
+            if proc._is_alive() and proc in waiting_to_start:
                 assert proc.outqR_fd in fileno_to_outq
                 assert proc.outqR_fd in fileno_to_outq
                 assert fileno_to_outq[proc.outqR_fd] is proc
                 assert fileno_to_outq[proc.outqR_fd] is proc
                 assert proc.outqR_fd in hub.readers
                 assert proc.outqR_fd in hub.readers
@@ -728,7 +728,7 @@ class AsynPool(_pool.Pool):
 
 
         def on_not_recovering(proc, fd, job):
         def on_not_recovering(proc, fd, job):
             error('Process inqueue damaged: %r %r' % (proc, proc.exitcode))
             error('Process inqueue damaged: %r %r' % (proc, proc.exitcode))
-            if proc.exitcode is not None:
+            if proc._is_alive():
                 proc.terminate()
                 proc.terminate()
             hub.remove(fd)
             hub.remove(fd)
             self._put_back(job)
             self._put_back(job)
@@ -886,7 +886,7 @@ class AsynPool(_pool.Pool):
                                 pass
                                 pass
                             else:
                             else:
                                 job_proc = job._write_to
                                 job_proc = job._write_to
-                                if job_proc.exitcode is None:
+                                if job_proc._is_alive():
                                     self._flush_writer(job_proc, gen)
                                     self._flush_writer(job_proc, gen)
                     # workers may have exited in the meantime.
                     # workers may have exited in the meantime.
                     self.maintain_pool()
                     self.maintain_pool()
@@ -901,7 +901,7 @@ class AsynPool(_pool.Pool):
         fds = set([proc.inq._writer])
         fds = set([proc.inq._writer])
         try:
         try:
             while fds:
             while fds:
-                if proc.exitcode:
+                if not proc._is_alive():
                     break  # process exited
                     break  # process exited
                 readable, writable, again = _select(
                 readable, writable, again = _select(
                     writers=fds, err=fds, timeout=0.5,
                     writers=fds, err=fds, timeout=0.5,
@@ -973,10 +973,10 @@ class AsynPool(_pool.Pool):
     def on_job_process_down(self, job, pid_gone):
     def on_job_process_down(self, job, pid_gone):
         """Handler called for each job when the process it was assigned to
         """Handler called for each job when the process it was assigned to
         exits."""
         exits."""
-        if job._write_to and job._write_to.exitcode:
+        if job._write_to and not job._write_to._is_alive():
             # job was partially written
             # job was partially written
             self.on_partial_read(job, job._write_to)
             self.on_partial_read(job, job._write_to)
-        elif job._scheduled_for and job._scheduled_for.exitcode:
+        elif job._scheduled_for and not job._scheduled_for._is_alive():
             # job was only scheduled to be written to this process,
             # job was only scheduled to be written to this process,
             # but no data was sent so put it back on the outbound_buffer.
             # but no data was sent so put it back on the outbound_buffer.
             self._put_back(job)
             self._put_back(job)
@@ -1131,7 +1131,7 @@ class AsynPool(_pool.Pool):
     def destroy_queues(self, queues, proc):
     def destroy_queues(self, queues, proc):
         """Destroy queues that can no longer be used, so that they
         """Destroy queues that can no longer be used, so that they
         be replaced by new sockets."""
         be replaced by new sockets."""
-        assert proc.exitcode is not None
+        assert not proc._is_alive()
         self._waiting_to_start.discard(proc)
         self._waiting_to_start.discard(proc)
         removed = 1
         removed = 1
         try:
         try: