瀏覽代碼

[backend][async] Implements gevent result drainer

Ask Solem 9 年之前
父節點
當前提交
7604070c95
共有 1 個文件被更改,包括 23 次插入5 次删除
  1. 23 5
      celery/backends/async.py

+ 23 - 5
celery/backends/async.py

@@ -14,6 +14,7 @@ from collections import deque
 from weakref import WeakKeyDictionary
 
 from kombu.syn import detect_environment
+from kombu.utils import cached_property
 
 from celery import states
 from celery.exceptions import TimeoutError
@@ -58,22 +59,21 @@ class Drainer(object):
         wait(timeout=timeout)
 
 
-@register_drainer('eventlet')
-class EventletDrainer(Drainer):
+class greenletDrainer(Drainer):
+    spawn = None
     _g = None
     _stopped = False
 
     def run(self):
         while not self._stopped:
             try:
-                self.result_consumer.drain_events(timeout=10)
+                self.result_consumer.drain_events(timeout=1)
             except socket.timeout:
                 pass
 
     def start(self):
-        from eventlet import spawn
         if self._g is None:
-            self._g = spawn(self.run)
+            self._g = self.spawn(self.run)
 
     def stop(self):
         self._stopped = True
@@ -85,6 +85,24 @@ class EventletDrainer(Drainer):
             time.sleep(0)
 
 
+@register_drainer('eventlet')
+class eventletDrainer(greenletDrainer):
+
+    @cached_property
+    def spawn(self):
+        from eventlet import spawn
+        return spawn
+
+
+@register_drainer('gevent')
+class geventDrainer(greenletDrainer):
+
+    @cached_property
+    def spawn(self):
+        from gevent import spawn
+        return spawn
+
+
 class AsyncBackendMixin(object):
 
     def _collect_into(self, result, bucket):