Преглед на файлове

events.EventReceiver.capture now uses connection.drain_events, and supports the timeout argument.

Ask Solem преди 14 години
родител
ревизия
a2701d2e3a
променени са 1 файла, в които са добавени 11 реда и са изтрити 4 реда
  1. 11 4
      celery/events/__init__.py

+ 11 - 4
celery/events/__init__.py

@@ -2,6 +2,8 @@ import time
 import socket
 import threading
 
+from itertools import count
+
 from celery.messaging import EventPublisher, EventConsumer
 
 
@@ -106,7 +108,7 @@ class EventReceiver(object):
         consumer.register_callback(self._receive)
         return consumer
 
-    def capture(self, limit=None):
+    def capture(self, limit=None, timeout=None):
         """Open up a consumer capturing events.
 
         This has to run in the main process, and it will never
@@ -114,9 +116,14 @@ class EventReceiver(object):
 
         """
         consumer = self.consumer()
-        it = consumer.iterconsume(limit=limit)
-        while True:
-            it.next()
+        consumer.consume()
+        try:
+            for iteration in count(0):
+                if limit and iteration > limit:
+                    break
+                consumer.connection.drain_events(timeout=timeout)
+        finally:
+            consumer.close()
 
     def _receive(self, message_data, message):
         type = message_data.pop("type").lower()