Forráskód Böngészése

Fixes(?) memory leak with time-limits. Issue #1129, Issue #1427

Ask Solem 11 éve
szülő
commit
36177eff72
1 módosított fájl, 42 hozzáadás és 17 törlés
  1. 42 17
      celery/concurrency/processes.py

+ 42 - 17
celery/concurrency/processes.py

@@ -12,6 +12,7 @@
 from __future__ import absolute_import
 
 import errno
+import gc
 import os
 import select
 import socket
@@ -20,7 +21,7 @@ import struct
 from collections import deque, namedtuple
 from pickle import HIGHEST_PROTOCOL
 from time import sleep, time
-from weakref import ref
+from weakref import WeakValueDictionary, ref
 
 from amqp.utils import promise
 from billiard import forking_enable
@@ -519,32 +520,56 @@ class TaskPool(BasePool):
             'all': ', '.join(per(v, total) for v in vals)
         }
 
+    def _on_soft_timeout(self, job, soft, hard, hub, now=time):
+        if hard:
+            self._tref_for_id[job] = hub.timer.apply_at(
+                now() + (hard - soft),
+                self._on_hard_timeout, (job, soft, hard, hub))
+        try:
+            result = self._pool.cache[job]
+        except KeyError:
+            pass  # job ready
+        else:
+            self.on_soft_timeout(result)
+
+    def _on_hard_timeout(self, job):
+        try:
+            result = self._pool.cache[job]
+        except KeyError:
+            pass  # job ready
+        else:
+            self.on_hard_timeout(result)
+
     def _create_timelimit_handlers(self, hub, now=time):
         apply_after = hub.timer.apply_after
-        apply_at = hub.timer.apply_at
         on_soft_timeout = self.on_soft_timeout
         on_hard_timeout = self.on_hard_timeout
+        trefs = self._tref_for_id = WeakValueDictionary()
 
-        def on_timeout_set(R, soft, hard):
-
-            def _on_soft_timeout():
-                if hard:
-                    R._tref = apply_at(now() + (hard - soft),
-                                       on_hard_timeout, (R, ))
-                on_soft_timeout(R)
+        def on_timeout_set(job, soft, hard):
             if soft:
-                R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
+                trefs[job] = apply_after(
+                    soft * 1000.0,
+                    self._on_soft_timeout, (job, soft, hard, hub),
+                )
             elif hard:
-                R._tref = apply_after(hard * 1000.0,
-                                      on_hard_timeout, (R, ))
+                trefs[job] = apply_after(
+                    hard * 1000.0,
+                    self._on_hard_timeout, (job, )
+                )
         self._pool.on_timeout_set = on_timeout_set
 
-        def on_timeout_cancel(result):
+        def on_timeout_cancel(job):
             try:
-                result._tref.cancel()
-                delattr(result, '_tref')
-            except AttributeError:
-                pass
+                tref = trefs.pop(job)
+                tref.cancel()
+                del(tref)
+                # Will not be reclaimed quickly enough on some platforms,
+                # so the memory is growing and still not released back to the
+                # OS.
+                gc.collect()
+            except (KeyError, AttributeError):
+                pass  # out of scope
         self._pool.on_timeout_cancel = on_timeout_cancel
 
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):