Browse Source

Use drain_events with amqplib/pika

Ask Solem 15 years ago
parent
commit
e7a17c61d0
2 changed files with 14 additions and 15 deletions
  1. 4 4
      celery/tests/test_worker.py
  2. 10 11
      celery/worker/listener.py

+ 4 - 4
celery/tests/test_worker.py

@@ -110,17 +110,17 @@ class TestCarrotListener(unittest.TestCase):
                            send_events=False)
 
         l.reset_connection()
-        self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
+        self.assertTrue(isinstance(l.connection, BrokerConnection))
 
         l.close_connection()
-        self.assertTrue(l.amqp_connection is None)
+        self.assertTrue(l.connection is None)
         self.assertTrue(l.task_consumer is None)
 
         l.reset_connection()
-        self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
+        self.assertTrue(isinstance(l.connection, BrokerConnection))
 
         l.stop()
-        self.assertTrue(l.amqp_connection is None)
+        self.assertTrue(l.connection is None)
         self.assertTrue(l.task_consumer is None)
 
     def test_receieve_message(self):

+ 10 - 11
celery/worker/listener.py

@@ -44,7 +44,7 @@ class CarrotListener(object):
 
     def __init__(self, ready_queue, eta_schedule, logger,
             send_events=False, initial_prefetch_count=2):
-        self.amqp_connection = None
+        self.connection = None
         self.task_consumer = None
         self.ready_queue = ready_queue
         self.eta_schedule = eta_schedule
@@ -161,35 +161,34 @@ class CarrotListener(object):
                                     self.event_dispatcher.close()
         self.logger.debug(
                 "CarrotListener: Closing connection to broker...")
-        self.amqp_connection = self.amqp_connection and \
-                                    self.amqp_connection.close()
+        self.connection = self.connection and self.connection.close()
 
     def reset_connection(self):
         self.logger.debug(
                 "CarrotListener: Re-establishing connection to the broker...")
         self.close_connection()
-        self.amqp_connection = self._open_connection()
+        self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
-        self.task_consumer = get_consumer_set(connection=self.amqp_connection)
-        self.broadcast_consumer = BroadcastConsumer(self.amqp_connection)
+        self.task_consumer = get_consumer_set(connection=self.connection)
+        self.broadcast_consumer = BroadcastConsumer(self.connection)
         self.task_consumer.register_callback(self.receive_message)
-        self.event_dispatcher = EventDispatcher(self.amqp_connection,
+        self.event_dispatcher = EventDispatcher(self.connection,
                                                 enabled=self.send_events)
         self.heart = Heart(self.event_dispatcher)
         self.heart.start()
 
         self._state = RUN
 
-    def _amqplib_iterconsume(self, **kwargs):
+    def _mainloop(self, **kwargs):
         while 1:
-            yield self.amqp_connection.connection.wait_any()
+            yield self.connection.connection.drain_events()
 
     def _detect_wait_method(self):
-        if hasattr(self.amqp_connection.connection, "wait_any"):
+        if hasattr(self.connection.connection, "drain_events"):
             self.broadcast_consumer.register_callback(self.receive_message)
             self.task_consumer.iterconsume()
             self.broadcast_consumer.iterconsume()
-            return self._amqplib_iterconsume
+            return self._mainloop
         else:
             self.task_consumer.add_consumer(self.broadcast_consumer)
             return self.task_consumer.iterconsume