Browse Source

Patches leak of _write_job generators

Ask Solem 12 years ago
parent
commit
db72c551c3
3 changed files with 13 additions and 2 deletions
  1. 9 1
      celery/concurrency/processes.py
  2. 4 0
      celery/worker/hub.py
  3. 0 1
      celery/worker/loops.py

+ 9 - 1
celery/concurrency/processes.py

@@ -353,6 +353,9 @@ class AsynPool(_pool.Pool):
             # job was not acked, so find another worker to send it to.
             if not job._accepted:
                 self._put_back(job)
+            writer = getattr(job, '_writer')
+            if writer:
+                self._active_writers.discard(writer)
 
             # Replace queues to avoid reuse
             before = len(self._queues)
@@ -595,12 +598,15 @@ class TaskPool(BasePool):
         hub_add, hub_remove = hub.add, hub.remove
         mark_write_fd_as_active = active_writes.add
         mark_write_gen_as_active = self._active_writers.add
-        write_generator_done = self._active_writers.discard
+        _write_generator_done = self._active_writers.discard
         get_job = pool._cache.__getitem__
         pool._put_back = put_message
         precalc = {ACK: pool._create_payload(ACK, (0, )),
                    NACK: pool._create_payload(NACK, (0, ))}
 
+        def write_generator_done(gen):
+            _write_generator_done(gen)
+
         def on_poll_start(hub):
             # called for every eventloop iteration, and if there
             # are messages pending this will schedule writing one message
@@ -648,6 +654,7 @@ class TaskPool(BasePool):
                         # process gone since scheduled, put it back
                         return put_message(job)
                     cor = _write_job(ready_fd, job, callback=callback)
+                    job._writer = cor
                     mark_write_gen_as_active(cor)
                     mark_write_fd_as_active(ready_fd)
                     callback.args = (cor, )  # tricky as we need to pass ref
@@ -781,6 +788,7 @@ class TaskPool(BasePool):
 
     def on_poll_init(self, w, hub):
         pool = self._pool
+        pool._active_writers = self._active_writers
 
         self._create_timelimit_handlers(hub)
         self._create_process_handlers(hub)

+ 4 - 0
celery/worker/hub.py

@@ -190,6 +190,10 @@ class Hub(object):
         return min(max(delay or 0, min_delay), max_delay)
 
     def _add(self, fd, cb, flags):
+        #if flags & WRITE:
+        #    ex = self.writers.get(fd)
+        #    if ex and ex.__name__ == '_write_job':
+        #        assert not ex.gi_frame or ex.gi_frame == -1
         self.poller.register(fd, flags)
         (self.readers if flags & READ else self.writers)[fileno(fd)] = cb
 

+ 0 - 1
celery/worker/loops.py

@@ -119,7 +119,6 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                             if isinstance(cb, generator):
                                 try:
                                     next(cb)
-                                    hub_add(fileno, cb, WRITE | ERR)
                                 except StopIteration:
                                     hub_remove(fileno)
                                 except Exception: