Bläddra i källkod

Result: Async: pending_results is now a tuple of (concrete, weak)

Ask Solem 8 år sedan
förälder
incheckning
ebfe0a0b2e
4 ändrade filer med 42 tillägg och 34 borttagningar
  1. 1 3
      celery/backends/amqp.py
  2. 34 27
      celery/backends/async.py
  3. 6 2
      celery/backends/base.py
  4. 1 2
      celery/backends/redis.py

+ 1 - 3
celery/backends/amqp.py

@@ -135,15 +135,13 @@ class AMQPBackend(base.Backend, AsyncBackendMixin):
         })
         self.result_consumer = self.ResultConsumer(
             self, self.app, self.accept,
-            self._pending_results, self._weak_pending_results,
-            self._pending_messages,
+            self._pending_results, self._pending_messages,
         )
         if register_after_fork is not None:
             register_after_fork(self, _on_after_fork_cleanup_backend)
 
     def _after_fork(self):
         self._pending_results.clear()
-        self._weak_pending_results.clear()
         self.result_consumer._after_fork()
 
     def _create_exchange(self, name, type='direct', delivery_mode=2):

+ 34 - 27
celery/backends/async.py

@@ -132,31 +132,33 @@ class AsyncBackendMixin(object):
 
     def add_pending_result(self, result, weak=False):
         try:
-            meta = self._pending_messages.pop(result.id)
+            self._maybe_resolve_from_buffer(result)
         except Empty:
-            pass
-        else:
-            result._maybe_set_cache(meta)
-            return result
-        if weak:
-            dest, alt = self._weak_pending_results, self._pending_results
-        else:
-            dest, alt = self._pending_results, self._weak_pending_results
-        if result.id not in dest and result.id not in alt:
-            dest[result.id] = result
-            self.result_consumer.consume_from(result.id)
+            self._add_pending_result(result.id, result, weak=weak)
         return result
 
+    def _maybe_resolve_from_buffer(self, result):
+        result._maybe_set_cache(self._pending_messages.pop(result.id))
+
+    def _add_pending_result(self, task_id, result, weak=False):
+        weak, concrete = self._pending_results
+        if task_id not in weak and result.id not in concrete:
+            (weak if weak else concrete)[task_id] = result
+            self.result_consumer.consume_from(task_id)
+
     def add_pending_results(self, results, weak=False):
         return [self.add_pending_result(result, weak=weak)
                 for result in results]
 
     def remove_pending_result(self, result):
-        self._pending_results.pop(result.id, None)
-        self._weak_pending_results.pop(result.id, None)
+        self._remove_pending_result(result.id)
         self.on_result_fulfilled(result)
         return result
 
+    def _remove_pending_result(self, task_id):
+        for map in self._pending_results:
+            map.pop(task_id, None)
+
     def on_result_fulfilled(self, result):
         self.result_consumer.cancel_for(result.id)
 
@@ -183,13 +185,11 @@ class AsyncBackendMixin(object):
 class BaseResultConsumer(object):
 
     def __init__(self, backend, app, accept,
-                 pending_results, weak_pending_results,
-                 pending_messages):
+                 pending_results, pending_messages):
         self.backend = backend
         self.app = app
         self.accept = accept
         self._pending_results = pending_results
-        self._weak_pending_results = weak_pending_results
         self._pending_messages = pending_messages
         self.on_message = None
         self.buckets = WeakKeyDictionary()
@@ -245,23 +245,30 @@ class BaseResultConsumer(object):
     def on_out_of_band_result(self, message):
         self.on_state_change(message.payload, message)
 
+    def _get_pending_result(self, task_id):
+        for mapping in self._pending_results:
+            try:
+                return mapping[task_id]
+            except KeyError:
+                pass
+        raise KeyError(task_id)
+
     def on_state_change(self, meta, message):
         if self.on_message:
             self.on_message(meta)
         if meta['status'] in states.READY_STATES:
             task_id = meta['task_id']
             try:
-                result = self._pending_results[task_id]
+                result = self._get_pending_result(task_id)
             except KeyError:
+                # send to buffer in case we received this result
+                # before it was added to _pending_results.
+                self._pending_messages.append(task_id, meta)
+            else:
+                result._maybe_set_cache(meta)
+                buckets = self.buckets
                 try:
-                    result = self._weak_pending_results[task_id]
+                    buckets.pop(result)
                 except KeyError:
-                    # send to BufferMapping
-                    self._pending_messages.append(task_id, meta)
-            result._maybe_set_cache(meta)
-            buckets = self.buckets
-            try:
-                buckets.pop(result)
-            except KeyError:
-                pass
+                    pass
         sleep(0)

+ 6 - 2
celery/backends/base.py

@@ -16,6 +16,7 @@ from __future__ import absolute_import, unicode_literals
 import sys
 import time
 
+from collections import namedtuple
 from datetime import timedelta
 from weakref import WeakValueDictionary
 
@@ -53,6 +54,10 @@ logger = get_logger(__name__)
 
 MESSAGE_BUFFER_MAX = 8192
 
+pending_results_t = namedtuple('pending_results_t', (
+    'concrete', 'weak',
+))
+
 
 def unpickle_backend(cls, args, kwargs):
     """Return an unpickled backend."""
@@ -112,8 +117,7 @@ class Backend(object):
         self.accept = prepare_accept_content(
             conf.accept_content if accept is None else accept,
         )
-        self._pending_results = {}
-        self._weak_pending_results = WeakValueDictionary()
+        self._pending_results = pending_results_t({}, WeakValueDictionary())
         self._pending_messages = BufferMap(MESSAGE_BUFFER_MAX)
         self.url = url
 

+ 1 - 2
celery/backends/redis.py

@@ -147,8 +147,7 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
             else ((), ()))
         self.result_consumer = self.ResultConsumer(
             self, self.app, self.accept,
-            self._pending_results, self._weak_pending_results,
-            self._pending_messages,
+            self._pending_results, self._pending_messages,
         )
 
     def _params_from_url(self, url, defaults):