Browse Source

Need to reset _write_to and _scheduled_for when reusing a socket

Ask Solem 12 years ago
parent
commit
d3a053150d
1 changed files with 13 additions and 0 deletions
  1. 13 0
      celery/concurrency/processes.py

+ 13 - 0
celery/concurrency/processes.py

@@ -544,6 +544,7 @@ class TaskPool(BasePool):
         self._pool.on_timeout_cancel = on_timeout_cancel
 
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
+        pool = self._pool
         hub_add, hub_remove = hub.add, hub.remove
         all_inqueues = self._pool._all_inqueues
         fileno_to_inq = self._pool._fileno_to_inq
@@ -554,6 +555,18 @@ class TaskPool(BasePool):
         process_flush_queues = self._pool.process_flush_queues
 
         def on_process_up(proc):
+            # If we got the same fd as a previous process then we will also
+            # receive jobs in the old buffer, so we need to reset the
+            # _write_to and _scheduled_for tracking values used to recover
+            # message boundaries when processes exit.
+            infd = proc.inqW_fd
+            for job in values(pool._cache):
+                if job._write_to and job._write_to.inqW_fd == infd:
+                    assert job._write_to.exitcode is not None
+                    job._write_to = proc
+                elif job._scheduled_for and job._scheduled_for.inqW_fd == infd:
+                    assert job._scheduled_for.exitcode is not None
+                    job._scheduled_for = proc
             fileno_to_outq[proc.outqR_fd] = proc
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
             hub_add(proc.outqR_fd, handle_result_event, READ | ERR)