ソースを参照

Backend: Async: Start eventlet/gevent drainer when .then() callback is registered

Ask Solem 8 年 前
コミット
7b14ad3969
2 ファイル変更27 行追加9 行削除
  1. 21 8
      celery/backends/async.py
  2. 6 1
      celery/backends/rpc.py

+ 21 - 8
celery/backends/async.py

@@ -2,6 +2,7 @@
 from __future__ import absolute_import, unicode_literals
 
 import socket
+import threading
 
 from collections import deque
 from time import sleep
@@ -13,6 +14,7 @@ from kombu.utils.objects import cached_property
 from celery import states
 from celery.exceptions import TimeoutError
 from celery.five import Empty, monotonic
+from celery.utils.threads import THREAD_TIMEOUT_MAX
 
 drainers = {}
 
@@ -56,25 +58,33 @@ class Drainer(object):
 class greenletDrainer(Drainer):
     spawn = None
     _g = None
-    _stopped = False
+
+    def __init__(self, *args, **kwargs):
+        super(greenletDrainer, self).__init__(*args, **kwargs)
+        self._started = threading.Event()
+        self._stopped = threading.Event()
+        self._shutdown = threading.Event()
 
     def run(self):
-        while not self._stopped:
+        self._started.set()
+        while not self._stopped.is_set():
             try:
                 self.result_consumer.drain_events(timeout=1)
             except socket.timeout:
                 pass
+        self._shutdown.set()
 
     def start(self):
-        if self._g is None:
+        if not self._started.is_set():
             self._g = self.spawn(self.run)
+            self._started.wait()
 
     def stop(self):
-        self._stopped = True
+        self._stopped.set()
+        self._shutdown.wait(THREAD_TIMEOUT_MAX)
 
     def wait_for(self, p, wait, timeout=None):
-        if self._g is None:
-            self.start()
+        self.start()
         if not p.ready:
             sleep(0)
 
@@ -124,7 +134,9 @@ class AsyncBackendMixin(object):
             node = bucket.popleft()
             yield node.id, node._cache
 
-    def add_pending_result(self, result, weak=False):
+    def add_pending_result(self, result, weak=False, start_drainer=True):
+        if start_drainer:
+            self.result_consumer.drainer.start()
         try:
             self._maybe_resolve_from_buffer(result)
         except Empty:
@@ -141,7 +153,8 @@ class AsyncBackendMixin(object):
             self.result_consumer.consume_from(task_id)
 
     def add_pending_results(self, results, weak=False):
-        return [self.add_pending_result(result, weak=weak)
+        self.result_consumer.drainer.start()
+        return [self.add_pending_result(result, weak=weak, start_drainer=False)
                 for result in results]
 
     def remove_pending_result(self, result):

+ 6 - 1
celery/backends/rpc.py

@@ -5,6 +5,8 @@ RPC-style result backend, using reply-to and one queue per client.
 """
 from __future__ import absolute_import, unicode_literals
 
+import time
+
 from kombu import Consumer, Exchange, Producer, Queue
 from kombu.common import maybe_declare
 from kombu.utils.compat import register_after_fork
@@ -55,7 +57,10 @@ class ResultConsumer(BaseResultConsumer):
         self._consumer.consume()
 
     def drain_events(self, timeout=None):
-        return self._connection.drain_events(timeout=timeout)
+        if self._connection:
+            return self._connection.drain_events(timeout=timeout)
+        elif timeout:
+            time.sleep(timeout)
 
     def stop(self):
         try: