浏览代码

Spawn broadcast consumer in separate thread if running green

Ask Solem 14 年之前
父节点
当前提交
85c5318ecb
共有 4 个文件被更改,包括 25 次插入3 次删除
  1. 1 0
      celery/concurrency/base.py
  2. 3 0
      celery/concurrency/evg.py
  3. 4 2
      celery/concurrency/evlet.py
  4. 17 1
      celery/worker/consumer.py

+ 1 - 0
celery/concurrency/base.py

@@ -24,6 +24,7 @@ class BasePool(object):
     Timer = timer2.Timer
 
     signal_safe = True
+    is_green = False
 
     _state = None
     _pool = None

+ 3 - 0
celery/concurrency/evg.py

@@ -9,10 +9,13 @@ from celery.concurrency.base import apply_target, BasePool
 
 class TaskPool(BasePool):
     signal_safe = False
+    is_green = True
 
     def __init__(self, *args, **kwargs):
+        from gevent import spawn_raw
         from gevent.pool import Pool
         self.Pool = Pool
+        self.spawn_n = spawn_raw
 
     def on_start(self):
         self._pool = self.Pool(self.limit)

+ 4 - 2
celery/concurrency/evlet.py

@@ -93,12 +93,14 @@ class TaskPool(base.BasePool):
     Timer = Timer
 
     signal_safe = False
+    is_green = True
 
     def __init__(self, *args, **kwargs):
         from eventlet import greenthread
         from eventlet.greenpool import GreenPool
         self.Pool = GreenPool
-        self.greenthread = greenthread
+        self.getcurrent = greenthread.getcurrent
+        self.spawn_n = greenthread.spawn_n
 
         super(TaskPool, self).__init__(*args, **kwargs)
 
@@ -113,4 +115,4 @@ class TaskPool(base.BasePool):
             accept_callback=None, **_):
         self._pool.spawn_n(apply_target, target, args, kwargs,
                            callback, accept_callback,
-                           self.greenthread.getcurrent)
+                           self.getcurrent)

+ 17 - 1
celery/worker/consumer.py

@@ -270,7 +270,6 @@ class Consumer(object):
         """Consume messages forever (or until an exception is raised)."""
         self.logger.debug("Consumer: Starting message consumer...")
         self.task_consumer.consume()
-        self.broadcast_consumer.consume()
         self.logger.debug("Consumer: Ready to accept tasks!")
 
         while 1:
@@ -440,9 +439,26 @@ class Consumer(object):
             except self.connection_errors + self.channel_errors:
                 pass
 
+        if self.pool.is_green:
+            print("USING GREENLET NODE")
+            return self.pool.spawn_n(self._green_pidbox_node)
         self.pidbox_node.channel = self.connection.channel()
         self.broadcast_consumer = self.pidbox_node.listen(
                                         callback=self.on_control)
+        self.broadcast_consumer.consume()
+
+    def _green_pidbox_node(self):
+        conn = self._open_connection()
+        self.pidbox_node.channel = conn.channel()
+        self.broadcast_consumer = self.pidbox_node.listen(
+                                        callback=self.on_control)
+        self.broadcast_consumer.consume()
+
+        try:
+            while self.connection:  # main connection still open?
+                conn.drain_events()
+        finally:
+            conn.close()
 
     def reset_connection(self):
         """Re-establish connection and set up consumers."""