浏览代码

Fixes leak after broker restart. Closes #1382

Ask Solem 12 年之前
父节点
当前提交
cdd0d8bef8
共有 2 个文件被更改,包括 18 次插入4 次删除
  1. 16 2
      celery/concurrency/processes.py
  2. 2 2
      celery/worker/loops.py

+ 16 - 2
celery/concurrency/processes.py

@@ -69,6 +69,11 @@ warning, debug = logger.warning, logger.debug
 Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
 
 
+def gen_not_started(gen):
+    # gi_frame is None when generator stopped.
+    return gen.gi_frame and gen.gi_frame.f_lasti == -1
+
+
 def process_initializer(app, hostname):
     """Pool child process initializer."""
     platforms.signals.reset(*WORKER_SIGRESET)
@@ -828,9 +833,18 @@ class TaskPool(BasePool):
                     writers = list(self._active_writers)
                     for gen in writers:
                         if (gen.__name__ == '_write_job' and
-                                gen.gi_frame and gen.gi_frame.f_lasti != -1):
+                                gen_not_started(gen)):
                             # has not started writing the job so can
-                            # safely discard
+                            # discard the task, but we must also remove
+                            # it from the Pool._cache.
+                            job_to_discard = None
+                            for job in values(self._pool._cache):
+                                if job._writer() is gen:  # _writer is saferef
+                                    # removes from Pool._cache
+                                    job_to_discard = job
+                                    break
+                            if job_to_discard:
+                                job_to_discard.discard()
                             self._active_writers.discard(gen)
                         else:
                             try:

+ 2 - 2
celery/worker/loops.py

@@ -87,7 +87,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
             if qos.prev != qos.value:
                 update_qos()
 
-            #print('REG: %s' % (hub.repr_active(), ))
+            #print('[[[HUB]]]: %s' % (hub.repr_active(), ))
 
             update_readers(conn_poll_start())
             pool_poll_start(hub)
@@ -96,7 +96,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                 while connection.more_to_read:
                     try:
                         events = poll(poll_timeout)
-                        #print('EVENTS: %s' % (hub.repr_events(events), ))
+                        #print('[[[EV]]]: %s' % (hub.repr_events(events), ))
                     except ValueError:  # Issue 882
                         return
                     if not events: