Browse Source

Fixes leak of _write_job objects. Closes #1372

Ask Solem 12 years ago
parent
commit
70f5fc0959
1 changed files with 7 additions and 11 deletions
  1. 7 11
      celery/concurrency/processes.py

+ 7 - 11
celery/concurrency/processes.py

@@ -20,6 +20,7 @@ import struct
 from collections import deque, namedtuple
 from pickle import HIGHEST_PROTOCOL
 from time import sleep, time
+from weakref import ref
 
 from billiard import forking_enable
 from billiard import pool as _pool
@@ -354,6 +355,7 @@ class AsynPool(_pool.Pool):
             if not job._accepted:
                 self._put_back(job)
             writer = getattr(job, '_writer')
+            writer = writer and writer() or None
             if writer:
                 self._active_writers.discard(writer)
 
@@ -598,15 +600,12 @@ class TaskPool(BasePool):
         hub_add, hub_remove = hub.add, hub.remove
         mark_write_fd_as_active = active_writes.add
         mark_write_gen_as_active = self._active_writers.add
-        _write_generator_done = self._active_writers.discard
+        write_generator_done = self._active_writers.discard
         get_job = pool._cache.__getitem__
         pool._put_back = put_message
         precalc = {ACK: pool._create_payload(ACK, (0, )),
                    NACK: pool._create_payload(NACK, (0, ))}
 
-        def write_generator_done(gen):
-            _write_generator_done(gen)
-
         def on_poll_start(hub):
             # called for every eventloop iteration, and if there
             # are messages pending this will schedule writing one message
@@ -645,7 +644,6 @@ class TaskPool(BasePool):
                     hub_remove(inqfd)
             else:
                 if not job._accepted:  # job not accepted by another worker
-                    callback = promise(write_generator_done)
                     try:
                         # keep track of what process the write operation
                         # was scheduled for.
@@ -653,11 +651,10 @@ class TaskPool(BasePool):
                     except KeyError:
                         # process gone since scheduled, put it back
                         return put_message(job)
-                    cor = _write_job(ready_fd, job, callback=callback)
-                    job._writer = cor
+                    cor = _write_job(ready_fd, job)
+                    job._writer = ref(cor)
                     mark_write_gen_as_active(cor)
                     mark_write_fd_as_active(ready_fd)
-                    callback.args = (cor, )  # tricky as we need to pass ref
 
                     # Try to write immediately, in case there's an error.
                     try:
@@ -686,7 +683,7 @@ class TaskPool(BasePool):
             raise Exception(
                 'Process writable but cannot write. Contact support!')
 
-        def _write_job(fd, job, callback=None):
+        def _write_job(fd, job):
             # writes job to the worker process.
             # Operation must complete if more than one byte of data
             # was written.  If the broker connection is lost
@@ -733,11 +730,10 @@ class TaskPool(BasePool):
                         yield
                     errors = 0
             finally:
-                if callback:
-                    callback()
                 write_stats[proc.index] += 1
                 # message written, so this fd is now available
                 active_writes.discard(fd)
+                write_generator_done(job._writer())  # is a weakref
 
         def send_ack(response, pid, job, fd, WRITE=WRITE, ERR=ERR):
             # Schedule writing ack response for when the fd is writeable.