Bläddra i källkod

Fixes compat with billiard

Ask Solem 11 år sedan
förälder
incheckning
f0d194f5cb
1 ändrade filer med 8 tillägg och 5 borttagningar
  1. 8 5
      celery/concurrency/asynpool.py

+ 8 - 5
celery/concurrency/asynpool.py

@@ -113,6 +113,7 @@ def _select(readers=None, writers=None, err=None, timeout=0):
 
 class Worker(_pool.Worker):
     """Pool worker process."""
+    dead = False
 
     def on_loop_start(self, pid):
         # our version sends a WORKER_UP message when the process is ready
@@ -439,7 +440,8 @@ class AsynPool(_pool.Pool):
 
         def on_process_down(proc):
             """Called when a worker process exits."""
-            assert not proc.dead
+            if proc.dead:
+                return
             process_flush_queues(proc)
             _remove_from_index(proc.outq._reader, proc, fileno_to_outq)
             if proc.synq:
@@ -819,10 +821,11 @@ class AsynPool(_pool.Pool):
     def _process_cleanup_queues(self, proc):
         """Handler called to clean up a processes queues after process
         exit."""
-        try:
-            self._queues[self._find_worker_queues(proc)] = None
-        except (KeyError, ValueError):
-            pass
+        if not proc.dead:
+            try:
+                self._queues[self._find_worker_queues(proc)] = None
+            except (KeyError, ValueError):
+                pass
 
     @staticmethod
     def _stop_task_handler(task_handler):