Explorar o código

Backport of leak fix for #1129

Ask Solem %!s(int64=11) %!d(string=hai) anos
pai
achega
072b7b6777
Modificáronse 1 ficheiros con 54 adicións e 15 borrados
  1. 54 15
      celery/worker/__init__.py

+ 54 - 15
celery/worker/__init__.py

@@ -12,6 +12,7 @@
 from __future__ import absolute_import
 
 import atexit
+import gc
 import logging
 import socket
 import sys
@@ -19,6 +20,7 @@ import time
 import traceback
 
 from functools import partial
+from weakref import WeakValueDictionary
 
 from billiard.exceptions import WorkerLostError
 from billiard.util import Finalize
@@ -106,6 +108,7 @@ class Pool(bootsteps.StartStopComponent):
         add_reader = hub.add_reader
         remove = hub.remove
         now = time.time
+        cache = pool._pool._cache
 
         # did_start_ok will verify that pool processes were able to start,
         # but this will only work the first time we start, as
@@ -121,25 +124,61 @@ class Pool(bootsteps.StartStopComponent):
         for handler, interval in pool.timers.iteritems():
             hub.timer.apply_interval(interval * 1000.0, handler)
 
-        def on_timeout_set(R, soft, hard):
+        trefs = pool._tref_for_id = WeakValueDictionary()
+
+        def _discard_tref(job):
+            try:
+                tref = trefs.pop(job)
+                tref.cancel()
+                del(tref)
+                # Will not be reclaimed quickly enough on some platforms,
+                # so the memory is growing and still not released back to the
+                # OS.
+                gc.collect()
+            except (KeyError, AttributeError):
+                pass  # out of scope
+
+        def _on_hard_timeout(job):
+            try:
+                result = cache[job]
+            except KeyError:
+                pass  # job ready
+            else:
+                on_hard_timeout(result)
+            finally:
+                # remove tref
+                _discard_tref(job)
+
+        def _on_soft_timeout(job, soft, hard, hub):
+            if hard:
+                trefs[job] = apply_at(
+                    now() + (hard - soft),
+                    _on_hard_timeout, (job, soft, hard, hub))
+            try:
+                result = cache[job]
+            except KeyError:
+                pass  # job ready
+            else:
+                on_soft_timeout(result)
+            finally:
+                if not hard:
+                    # remove tref
+                    _discard_tref(job)
 
-            def _on_soft_timeout():
-                if hard:
-                    R._tref = apply_at(now() + (hard - soft),
-                                       on_hard_timeout, (R, ))
-                on_soft_timeout(R)
+        def on_timeout_set(R, soft, hard):
             if soft:
-                R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
+                trefs[R._job] = apply_after(
+                    soft * 1000.0,
+                    _on_soft_timeout, (R._job, soft, hard, hub),
+                )
             elif hard:
-                R._tref = apply_after(hard * 1000.0,
-                                      on_hard_timeout, (R, ))
+                trefs[R._job] = apply_after(
+                    hard * 1000.0,
+                    _on_hard_timeout, (R._job, )
+                )
 
-        def on_timeout_cancel(result):
-            try:
-                result._tref.cancel()
-                delattr(result, '_tref')
-            except AttributeError:
-                pass
+        def on_timeout_cancel(R):
+            _discard_tref(R._job)
 
         pool.init_callbacks(
             on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),