Browse Source

Renamed AMQPListener -> CarrotListener (amqp_listener -> broker_listener)

Ask Solem 15 years ago
parent
commit
3d0ce03091
2 changed files with 22 additions and 22 deletions
  1. 7 7
      celery/tests/test_worker.py
  2. 15 15
      celery/worker/__init__.py

+ 7 - 7
celery/tests/test_worker.py

@@ -3,7 +3,7 @@ from Queue import Queue, Empty
 from carrot.connection import BrokerConnection
 from celery.messaging import TaskConsumer
 from celery.worker.job import TaskWrapper
-from celery.worker import AMQPListener, WorkController
+from celery.worker import CarrotListener, WorkController
 from multiprocessing import get_logger
 from carrot.backends.base import BaseMessage
 from celery import registry
@@ -81,7 +81,7 @@ def create_message(backend, **data):
                        content_encoding="binary")
 
 
-class TestAMQPListener(unittest.TestCase):
+class TestCarrotListener(unittest.TestCase):
 
     def setUp(self):
         self.bucket_queue = Queue()
@@ -90,7 +90,7 @@ class TestAMQPListener(unittest.TestCase):
         self.logger.setLevel(0)
 
     def test_connection(self):
-        l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
 
         c = l.reset_connection()
         self.assertTrue(isinstance(l.amqp_connection, BrokerConnection))
@@ -107,7 +107,7 @@ class TestAMQPListener(unittest.TestCase):
         self.assertTrue(l.task_consumer is None)
 
     def test_receieve_message(self):
-        l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={})
 
@@ -120,7 +120,7 @@ class TestAMQPListener(unittest.TestCase):
         self.assertTrue(self.eta_scheduler.empty())
 
     def test_receieve_message_not_registered(self):
-        l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
 
@@ -129,7 +129,7 @@ class TestAMQPListener(unittest.TestCase):
         self.assertTrue(self.eta_scheduler.empty())
 
     def test_receieve_message_eta(self):
-        l = AMQPListener(self.bucket_queue, self.eta_scheduler, self.logger)
+        l = CarrotListener(self.bucket_queue, self.eta_scheduler, self.logger)
         backend = MockBackend()
         m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={},
                            eta=datetime.now() + timedelta(days=1))
@@ -159,7 +159,7 @@ class TestWorkController(unittest.TestCase):
         self.assertTrue(isinstance(worker.eta_scheduler, Scheduler))
         self.assertTrue(worker.schedule_controller)
         self.assertTrue(worker.pool)
-        self.assertTrue(worker.amqp_listener)
+        self.assertTrue(worker.broker_listener)
         self.assertTrue(worker.mediator)
         self.assertTrue(worker.components)
 

+ 15 - 15
celery/worker/__init__.py

@@ -22,7 +22,7 @@ import logging
 import socket
 
 
-class AMQPListener(object):
+class CarrotListener(object):
     """Listen for messages received from the AMQP broker and
     move them the the bucket queue for task processing.
 
@@ -66,17 +66,17 @@ class AMQPListener(object):
             try:
                 self.consume_messages()
             except (socket.error, AMQPConnectionException, IOError):
-                self.logger.error("AMQPListener: Connection to broker lost. "
-                                + "Trying to re-establish connection...")
+                self.logger.error("CarrotListener: Connection to broker lost."
+                                + " Trying to re-establish connection...")
 
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
         task_consumer = self.task_consumer
 
-        self.logger.debug("AMQPListener: Starting message consumer...")
+        self.logger.debug("CarrotListener: Starting message consumer...")
         it = task_consumer.iterconsume(limit=None)
 
-        self.logger.debug("AMQPListener: Ready to accept tasks!")
+        self.logger.debug("CarrotListener: Ready to accept tasks!")
 
         while True:
             self.task_consumer.qos(prefetch_count=int(self.prefetch_count))
@@ -121,7 +121,7 @@ class AMQPListener(object):
             self.task_consumer = None
         if self.amqp_connection:
             self.logger.debug(
-                    "AMQPListener: Closing connection to the broker...")
+                    "CarrotListener: Closing connection to the broker...")
             self.amqp_connection.close()
             self.amqp_connection = None
 
@@ -133,7 +133,7 @@ class AMQPListener(object):
 
         """
         self.logger.debug(
-                "AMQPListener: Re-establishing connection to the broker...")
+                "CarrotListener: Re-establishing connection to the broker...")
         self.close_connection()
         self.amqp_connection = self._open_connection()
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
@@ -163,7 +163,7 @@ class AMQPListener(object):
         conn = retry_over_time(_establish_connection, (socket.error, IOError),
                                errback=_connection_error_handler,
                                max_retries=AMQP_CONNECTION_MAX_RETRIES)
-        self.logger.debug("AMQPListener: Connection Established.")
+        self.logger.debug("CarrotListener: Connection Established.")
         return conn
 
 
@@ -220,9 +220,9 @@ class WorkController(object):
 
         Instance of :class:`celery.worker.controllers.Mediator`.
 
-    .. attribute:: amqp_listener
+    .. attribute:: broker_listener
 
-        Instance of :class:`AMQPListener`.
+        Instance of :class:`CarrotListener`.
 
     """
     loglevel = logging.ERROR
@@ -251,10 +251,10 @@ class WorkController(object):
         # Threads+Pool
         self.schedule_controller = ScheduleController(self.eta_scheduler)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.amqp_listener = AMQPListener(self.bucket_queue,
-                                          self.eta_scheduler,
-                                          logger=self.logger,
-                                          initial_prefetch_count=concurrency)
+        self.broker_listener = CarrotListener(self.bucket_queue,
+                                        self.eta_scheduler,
+                                        logger=self.logger,
+                                        initial_prefetch_count=concurrency)
         self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
 
         self.clockservice = None
@@ -269,7 +269,7 @@ class WorkController(object):
                                         self.mediator,
                                         self.schedule_controller,
                                         self.clockservice,
-                                        self.amqp_listener))
+                                        self.broker_listener))
 
     def start(self):
         """Starts the workers main loop."""