Procházet zdrojové kódy

Result: Fixing memory leak in master for group(). Closes #3205

Ask Solem před 8 roky
rodič
revize
7be35b96c1

+ 3 - 1
celery/backends/amqp.py

@@ -134,12 +134,14 @@ class AMQPBackend(base.Backend, AsyncBackendMixin):
             'x-expires': maybe_s_to_ms(self.expires),
         })
         self.result_consumer = self.ResultConsumer(
-            self, self.app, self.accept, self._pending_results)
+            self, self.app, self.accept,
+            self._pending_results, self._weak_pending_results)
         if register_after_fork is not None:
             register_after_fork(self, _on_after_fork_cleanup_backend)
 
     def _after_fork(self):
         self._pending_results.clear()
+        self._weak_pending_results.clear()
         self.result_consumer._after_fork()
 
     def _create_exchange(self, name, type='direct', delivery_mode=2):

+ 19 - 5
celery/backends/async.py

@@ -130,14 +130,23 @@ class AsyncBackendMixin(object):
             node = bucket.popleft()
             yield node.id, node._cache
 
-    def add_pending_result(self, result):
-        if result.id not in self._pending_results:
-            self._pending_results[result.id] = result
+    def add_pending_result(self, result, weak=False):
+        if weak:
+            dest, alt = self._weak_pending_results, self._pending_results
+        else:
+            dest, alt = self._pending_results, self._weak_pending_results
+        if result.id not in dest and result.id not in alt:
+            dest[result.id] = result
             self.result_consumer.consume_from(result.id)
         return result
 
+    def add_pending_results(self, results, weak=False):
+        return [self.add_pending_result(result, weak=weak)
+                for result in results]
+
     def remove_pending_result(self, result):
         self._pending_results.pop(result.id, None)
+        self._weak_pending_results.pop(result.id, None)
         self.on_result_fulfilled(result)
         return result
 
@@ -166,11 +175,13 @@ class AsyncBackendMixin(object):
 
 class BaseResultConsumer(object):
 
-    def __init__(self, backend, app, accept, pending_results):
+    def __init__(self, backend, app, accept, pending_results,
+                 weak_pending_results):
         self.backend = backend
         self.app = app
         self.accept = accept
         self._pending_results = pending_results
+        self._weak_pending_results = weak_pending_results
         self.on_message = None
         self.buckets = WeakKeyDictionary()
         self.drainer = drainers[detect_environment()](self)
@@ -232,7 +243,10 @@ class BaseResultConsumer(object):
             try:
                 result = self._pending_results[meta['task_id']]
             except KeyError:
-                return
+                try:
+                    result = self._weak_pending_results[meta['task_id']]
+                except KeyError:
+                    return
             result._maybe_set_cache(meta)
             buckets = self.buckets
             try:

+ 2 - 0
celery/backends/base.py

@@ -17,6 +17,7 @@ import sys
 import time
 
 from datetime import timedelta
+from weakref import WeakValueDictionary
 
 from billiard.einfo import ExceptionInfo
 from kombu.serialization import (
@@ -109,6 +110,7 @@ class Backend(object):
             conf.accept_content if accept is None else accept,
         )
         self._pending_results = {}
+        self._weak_pending_results = WeakValueDictionary()
         self.url = url
 
     def as_uri(self, include_password=False):

+ 10 - 3
celery/canvas.py

@@ -753,9 +753,16 @@ class group(Signature):
                 sig.apply_async(producer=producer, add_to_parent=False,
                                 chord=sig.options.get('chord') or chord,
                                 **options)
-                if p:
-                    p.add(res)
-                    res.backend.add_pending_result(res)
+
+                # adding callback to result, such that it will gradually
+                # fulfill the barrier.
+                #
+                # Using barrier.add would use result.then, but we need
+                # to add the weak argument here to only create a weak
+                # reference to the object.
+                if p and not p.cancelled and not p.ready:
+                    p.size += 1
+                    res.then(p, weak=True)
                 yield res  # <-- r.parent, etc set in the frozen result.
 
     def _freeze_gid(self, options):

+ 2 - 8
celery/result.py

@@ -100,8 +100,8 @@ class AsyncResult(ResultBase):
         self.on_ready = promise(self._on_fulfilled)
         self._cache = None
 
-    def then(self, callback, on_error=None):
-        self.backend.add_pending_result(self)
+    def then(self, callback, on_error=None, weak=False):
+        self.backend.add_pending_result(self, weak=weak)
         return self.on_ready.then(callback, on_error)
 
     def _on_fulfilled(self, result):
@@ -343,9 +343,6 @@ class AsyncResult(ResultBase):
     def __reduce_args__(self):
         return self.id, self.backend, None, None, self.parent
 
-    def __del__(self):
-        self._cache = None
-
     @cached_property
     def graph(self):
         return self.build_graph()
@@ -904,9 +901,6 @@ class EagerResult(AsyncResult):
     def _get_task_meta(self):
         return self._cache
 
-    def __del__(self):
-        pass
-
     def __reduce__(self):
         return self.__class__, self.__reduce_args__()