Преглед изворни кода

celery.worker.listener renamed to celery.worker.consumer, and .CarrotListener to .Consumer

Ask Solem пре 14 година
родитељ
комит
88a69b51e2

+ 3 - 3
Changelog

@@ -1747,7 +1747,7 @@ Remote control commands
 
         @Panel.register
         def reset_broker_connection(panel, **kwargs):
-            panel.listener.reset_connection()
+            panel.consumer.reset_connection()
             return {"ok": "connection re-established"}
 
     With this module imported in the worker, you can launch the command
@@ -1861,7 +1861,7 @@ Fixes
     See: http://bit.ly/94fwdd
 
 * celeryd: The worker components are now configurable: :setting:`CELERYD_POOL`,
-  :setting:`CELERYD_LISTENER`, :setting:`CELERYD_MEDIATOR`, and
+  :setting:`CELERYD_CONSUMER`, :setting:`CELERYD_MEDIATOR`, and
   :setting:`CELERYD_ETA_SCHEDULER`.
 
     The default configuration is as follows:
@@ -1871,7 +1871,7 @@ Fixes
         CELERYD_POOL = "celery.concurrency.processes.TaskPool"
         CELERYD_MEDIATOR = "celery.worker.controllers.Mediator"
         CELERYD_ETA_SCHEDULER = "celery.worker.controllers.ScheduleController"
-        CELERYD_LISTENER = "celery.worker.listener.CarrotListener"
+        CELERYD_CONSUMER = "celery.worker.consumer.Consumer"
 
     The :setting:`CELERYD_POOL` setting makes it easy to swap out the
     multiprocessing pool with a threaded pool, or how about a

+ 1 - 1
celery/app/defaults.py

@@ -100,7 +100,7 @@ NAMESPACES = {
         "CONCURRENCY": Option(0, type="int"),
         "ETA_SCHEDULER": Option("celery.utils.timer2.Timer"),
         "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
-        "LISTENER": Option("celery.worker.listener.CarrotListener"),
+        "CONSUMER": Option("celery.worker.consumer.Consumer"),
         "LOG_FORMAT": Option(DEFAULT_PROCESS_LOG_FMT),
         "LOG_COLOR": Option(type="bool"),
         "LOG_LEVEL": Option("WARN"),

+ 3 - 3
celery/apps/worker.py

@@ -106,8 +106,8 @@ class Worker(object):
 
         self.run_worker()
 
-    def on_listener_ready(self, listener):
-        signals.worker_ready.send(sender=listener)
+    def on_consumer_ready(self, consumer):
+        signals.worker_ready.send(sender=consumer)
         print("celery@%s has started." % self.hostname)
 
     def init_queues(self):
@@ -185,7 +185,7 @@ class Worker(object):
                                 loglevel=self.loglevel,
                                 logfile=self.logfile,
                                 hostname=self.hostname,
-                                ready_callback=self.on_listener_ready,
+                                ready_callback=self.on_consumer_ready,
                                 embed_clockservice=self.run_clockservice,
                                 schedule_filename=self.schedule,
                                 send_events=self.events,

+ 1 - 1
celery/conf.py

@@ -58,7 +58,7 @@ CELERYD_PREFETCH_MULTIPLIER = conf.CELERYD_PREFETCH_MULTIPLIER
 CELERYD_POOL_PUTLOCKS = conf.CELERYD_POOL_PUTLOCKS
 
 CELERYD_POOL = conf.CELERYD_POOL
-CELERYD_LISTENER = conf.CELERYD_LISTENER
+CELERYD_LISTENER = conf.CELERYD_CONSUMER
 CELERYD_MEDIATOR = conf.CELERYD_MEDIATOR
 CELERYD_ETA_SCHEDULER = conf.CELERYD_ETA_SCHEDULER
 CELERYD_ETA_SCHEDULER_PRECISION = conf.CELERYD_ETA_SCHEDULER_PRECISION

+ 1 - 1
celery/schedules.py

@@ -134,7 +134,7 @@ class crontab_parser(object):
 
     @staticmethod
     def _ignore_comma(toks):
-        return filter(lambda x: x != ',', toks)
+        return [x for x in toks if x != ',']
 
     @staticmethod
     def _join_to_set(toks):

+ 2 - 2
celery/tests/test_bin/test_celeryd.py

@@ -153,7 +153,7 @@ class test_Worker(unittest.TestCase):
             app.amqp.queues = p
 
     @disable_stdouts
-    def test_on_listener_ready(self):
+    def test_on_consumer_ready(self):
         worker_ready_sent = [False]
 
         def on_worker_ready(**kwargs):
@@ -161,7 +161,7 @@ class test_Worker(unittest.TestCase):
 
         signals.worker_ready.connect(on_worker_ready)
 
-        self.Worker().on_listener_ready(object())
+        self.Worker().on_consumer_ready(object())
         self.assertTrue(worker_ready_sent[0])
 
 

+ 25 - 24
celery/tests/test_worker.py

@@ -16,8 +16,9 @@ from celery.utils import gen_unique_id
 from celery.worker import WorkController
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
-from celery.worker import listener
-from celery.worker.listener import CarrotListener, QoS, RUN
+from celery.worker import consumer
+from celery.worker.consumer import Consumer as MainConsumer
+from celery.worker.consumer import QoS, RUN
 
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import execute_context
@@ -45,7 +46,7 @@ class PlaceHolder(object):
         pass
 
 
-class MyCarrotListener(CarrotListener):
+class MyKombuConsumer(MainConsumer):
     broadcast_consumer = MockConsumer()
     task_consumer = MockConsumer()
 
@@ -186,7 +187,7 @@ class test_QoS(unittest.TestCase):
         self.assertEqual(consumer.prefetch_count, 9)
 
 
-class test_CarrotListener(unittest.TestCase):
+class test_Consumer(unittest.TestCase):
 
     def setUp(self):
         self.ready_queue = FastQueue()
@@ -198,7 +199,7 @@ class test_CarrotListener(unittest.TestCase):
         self.eta_schedule.stop()
 
     def test_connection(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
 
         l.reset_connection()
@@ -218,7 +219,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertIsNone(l.task_consumer)
 
     def test_receive_message_control_command(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, control={"command": "shutdown"})
@@ -228,12 +229,12 @@ class test_CarrotListener(unittest.TestCase):
         self.assertIn("shutdown", l.control_dispatch.commands)
 
     def test_close_connection(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         l._state = RUN
         l.close_connection()
 
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         eventer = l.event_dispatcher = MockEventDispatcher()
         heart = l.heart = MockHeart()
@@ -243,7 +244,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(heart.closed)
 
     def test_receive_message_unknown(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, unknown={"baz": "!!!"})
@@ -259,7 +260,7 @@ class test_CarrotListener(unittest.TestCase):
         execute_context(context, with_catch_warnings)
 
     def test_receive_message_eta_OverflowError(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                              send_events=False)
         backend = MockBackend()
         called = [False]
@@ -275,17 +276,17 @@ class test_CarrotListener(unittest.TestCase):
         l.event_dispatcher = MockEventDispatcher()
         l.control_dispatch = MockControlDispatch()
 
-        prev, listener.to_timestamp = listener.to_timestamp, to_timestamp
+        prev, consumer.to_timestamp = consumer.to_timestamp, to_timestamp
         try:
             l.receive_message(m.decode(), m)
             self.assertTrue(m.acknowledged)
             self.assertTrue(called[0])
         finally:
-            listener.to_timestamp = prev
+            consumer.to_timestamp = prev
 
     def test_receive_message_InvalidTaskError(self):
         logger = MockLogger()
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -298,7 +299,7 @@ class test_CarrotListener(unittest.TestCase):
 
     def test_on_decode_error(self):
         logger = MockLogger()
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
                            send_events=False)
 
         class MockMessage(object):
@@ -316,7 +317,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertIn("Message decoding error", logger.logged[0])
 
     def test_receieve_message(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -339,7 +340,7 @@ class test_CarrotListener(unittest.TestCase):
             def qos(self, **kwargs):
                 self.prefetch_count_incremented = True
 
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                              send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -363,7 +364,7 @@ class test_CarrotListener(unittest.TestCase):
 
     def test_revoke(self):
         ready_queue = FastQueue()
-        l = MyCarrotListener(ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         id = gen_unique_id()
@@ -380,7 +381,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(ready_queue.empty())
 
     def test_receieve_message_not_registered(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                           send_events=False)
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
@@ -391,7 +392,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(self.eta_schedule.empty())
 
     def test_receieve_message_eta(self):
-        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
                           send_events=False)
         dispatcher = l.event_dispatcher = MockEventDispatcher()
         backend = MockBackend()
@@ -430,7 +431,7 @@ class test_CarrotListener(unittest.TestCase):
             def update(self):
                 self.prev = self.next
 
-        class _Listener(MyCarrotListener):
+        class _Consumer(MyKombuConsumer):
             iterations = 0
             wait_method = None
 
@@ -440,10 +441,10 @@ class test_CarrotListener(unittest.TestCase):
 
         called_back = [False]
 
-        def init_callback(listener):
+        def init_callback(consumer):
             called_back[0] = True
 
-        l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
+        l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.task_consumer = MockConsumer()
         l.qos = _QoS()
@@ -460,7 +461,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertEqual(l.iterations, 1)
         self.assertEqual(l.qos.prev, l.qos.next)
 
-        l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
+        l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.task_consumer = MockConsumer()
@@ -493,7 +494,7 @@ class test_WorkController(unittest.TestCase):
         self.assertIsInstance(worker.scheduler, Timer)
         self.assertTrue(worker.scheduler)
         self.assertTrue(worker.pool)
-        self.assertTrue(worker.listener)
+        self.assertTrue(worker.consumer)
         self.assertTrue(worker.mediator)
         self.assertTrue(worker.components)
 

+ 25 - 25
celery/tests/test_worker_control.py

@@ -37,7 +37,7 @@ class Dispatcher(object):
         self.sent.append(event)
 
 
-class Listener(object):
+class Consumer(object):
 
     def __init__(self):
         self.ready_queue = FastQueue()
@@ -53,26 +53,26 @@ class Listener(object):
 class test_ControlPanel(unittest.TestCase):
 
     def setUp(self):
-        self.panel = self.create_panel(listener=Listener())
+        self.panel = self.create_panel(consumer=Consumer())
 
     def create_panel(self, **kwargs):
         return control.ControlDispatch(hostname=hostname, **kwargs)
 
     def test_disable_events(self):
-        listener = Listener()
-        panel = self.create_panel(listener=listener)
-        listener.event_dispatcher.enabled = True
+        consumer = Consumer()
+        panel = self.create_panel(consumer=consumer)
+        consumer.event_dispatcher.enabled = True
         panel.execute("disable_events")
-        self.assertEqual(listener.event_dispatcher.enabled, False)
-        self.assertIn("worker-offline", listener.event_dispatcher.sent)
+        self.assertEqual(consumer.event_dispatcher.enabled, False)
+        self.assertIn("worker-offline", consumer.event_dispatcher.sent)
 
     def test_enable_events(self):
-        listener = Listener()
-        panel = self.create_panel(listener=listener)
-        listener.event_dispatcher.enabled = False
+        consumer = Consumer()
+        panel = self.create_panel(consumer=consumer)
+        consumer.event_dispatcher.enabled = False
         panel.execute("enable_events")
-        self.assertEqual(listener.event_dispatcher.enabled, True)
-        self.assertIn("worker-online", listener.event_dispatcher.sent)
+        self.assertEqual(consumer.event_dispatcher.enabled, True)
+        self.assertIn("worker-online", consumer.event_dispatcher.sent)
 
     def test_dump_tasks(self):
         info = "\n".join(self.panel.execute("dump_tasks"))
@@ -80,23 +80,23 @@ class test_ControlPanel(unittest.TestCase):
         self.assertIn("rate_limit=200", info)
 
     def test_dump_schedule(self):
-        listener = Listener()
-        panel = self.create_panel(listener=listener)
+        consumer = Consumer()
+        panel = self.create_panel(consumer=consumer)
         self.assertFalse(panel.execute("dump_schedule"))
         import operator
-        listener.eta_schedule.schedule.enter(100, operator.add, (2, 2))
+        consumer.eta_schedule.schedule.enter(100, operator.add, (2, 2))
         self.assertTrue(panel.execute("dump_schedule"))
 
     def test_dump_reserved(self):
-        listener = Listener()
-        panel = self.create_panel(listener=listener)
+        consumer = Consumer()
+        panel = self.create_panel(consumer=consumer)
         response = panel.execute("dump_reserved", {"safe": True})
         self.assertDictContainsSubset({"name": mytask.name,
                                        "args": (2, 2),
                                        "kwargs": {},
                                        "hostname": socket.gethostname()},
                                        response[0])
-        listener.ready_queue = FastQueue()
+        consumer.ready_queue = FastQueue()
         self.assertFalse(panel.execute("dump_reserved"))
 
     def test_rate_limit_when_disabled(self):
@@ -116,7 +116,7 @@ class test_ControlPanel(unittest.TestCase):
 
     def test_rate_limit(self):
 
-        class Listener(object):
+        class Consumer(object):
 
             class ReadyQueue(object):
                 fresh = False
@@ -127,8 +127,8 @@ class test_ControlPanel(unittest.TestCase):
             def __init__(self):
                 self.ready_queue = self.ReadyQueue()
 
-        listener = Listener()
-        panel = self.create_panel(listener=listener)
+        consumer = Consumer()
+        panel = self.create_panel(consumer=consumer)
 
         task = tasks[PingTask.name]
         old_rate_limit = task.rate_limit
@@ -136,12 +136,12 @@ class test_ControlPanel(unittest.TestCase):
             panel.execute("rate_limit", kwargs=dict(task_name=task.name,
                                                     rate_limit="100/m"))
             self.assertEqual(task.rate_limit, "100/m")
-            self.assertTrue(listener.ready_queue.fresh)
-            listener.ready_queue.fresh = False
+            self.assertTrue(consumer.ready_queue.fresh)
+            consumer.ready_queue.fresh = False
             panel.execute("rate_limit", kwargs=dict(task_name=task.name,
                                                     rate_limit=0))
             self.assertEqual(task.rate_limit, 0)
-            self.assertTrue(listener.ready_queue.fresh)
+            self.assertTrue(consumer.ready_queue.fresh)
         finally:
             task.rate_limit = old_rate_limit
 
@@ -205,7 +205,7 @@ class test_ControlPanel(unittest.TestCase):
             def reply(self, data, exchange, routing_key, **kwargs):
                 replies.append(data)
 
-        panel = _Dispatch(hostname, listener=Listener())
+        panel = _Dispatch(hostname, consumer=Consumer())
 
         r = panel.execute("ping", reply_to={"exchange": "x",
                                             "routing_key": "x"})

+ 7 - 7
celery/worker/__init__.py

@@ -103,9 +103,9 @@ class WorkController(object):
 
         Instance of :class:`celery.worker.controllers.Mediator`.
 
-    .. attribute:: listener
+    .. attribute:: consumer
 
-        Instance of :class:`CarrotListener`.
+        Instance of :class:`celery.worker.consumer.Consumer`.
 
     """
     loglevel = logging.ERROR
@@ -114,7 +114,7 @@ class WorkController(object):
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
             send_events=None, hostname=None, ready_callback=noop,
-            embed_clockservice=False, pool_cls=None, listener_cls=None,
+            embed_clockservice=False, pool_cls=None, consumer_cls=None,
             mediator_cls=None, eta_scheduler_cls=None,
             schedule_filename=None, task_time_limit=None,
             task_soft_time_limit=None, max_tasks_per_child=None,
@@ -134,7 +134,7 @@ class WorkController(object):
             send_events = conf.CELERY_SEND_EVENTS
         self.send_events = send_events
         self.pool_cls = pool_cls or conf.CELERYD_POOL
-        self.listener_cls = listener_cls or conf.CELERYD_LISTENER
+        self.consumer_cls = consumer_cls or conf.CELERYD_CONSUMER
         self.mediator_cls = mediator_cls or conf.CELERYD_MEDIATOR
         self.eta_scheduler_cls = eta_scheduler_cls or \
                                     conf.CELERYD_ETA_SCHEDULER
@@ -205,7 +205,7 @@ class WorkController(object):
                                 schedule_filename=self.schedule_filename)
 
         prefetch_count = self.concurrency * self.prefetch_multiplier
-        self.listener = instantiate(self.listener_cls,
+        self.consumer = instantiate(self.consumer_cls,
                                     self.ready_queue,
                                     self.scheduler,
                                     logger=self.logger,
@@ -224,7 +224,7 @@ class WorkController(object):
                                         self.mediator,
                                         self.scheduler,
                                         self.beat,
-                                        self.listener))
+                                        self.consumer))
 
     def start(self):
         """Starts the workers main loop."""
@@ -275,7 +275,7 @@ class WorkController(object):
                 stop = getattr(component, "terminate", stop)
             stop()
 
-        self.listener.close_connection()
+        self.consumer.close_connection()
         self._state = TERMINATE
 
     def on_timer_error(self, exc_info):

+ 22 - 22
celery/worker/listener.py → celery/worker/consumer.py

@@ -5,34 +5,34 @@ from the broker, processing the messages and keeping the broker connections
 up and running.
 
 
-* :meth:`~CarrotListener.start` is an infinite loop, which only iterates
+* :meth:`~Consumer.start` is an infinite loop, which only iterates
   again if the connection is lost. For each iteration (at start, or if the
-  connection is lost) it calls :meth:`~CarrotListener.reset_connection`,
-  and starts the consumer by calling :meth:`~CarrotListener.consume_messages`.
+  connection is lost) it calls :meth:`~Consumer.reset_connection`,
+  and starts the consumer by calling :meth:`~Consumer.consume_messages`.
 
-* :meth:`~CarrotListener.reset_connection`, clears the internal queues,
+* :meth:`~Consumer.reset_connection`, clears the internal queues,
   establishes a new connection to the broker, sets up the task
   consumer (+ QoS), and the broadcast remote control command consumer.
 
   Also if events are enabled it configures the event dispatcher and starts
   up the hartbeat thread.
 
-* Finally it can consume messages. :meth:`~CarrotListener.consume_messages`
+* Finally it can consume messages. :meth:`~Consumer.consume_messages`
   is simply an infinite loop waiting for events on the AMQP channels.
 
   Both the task consumer and the broadcast consumer uses the same
-  callback: :meth:`~CarrotListener.receive_message`.
+  callback: :meth:`~Consumer.receive_message`.
 
-* So for each message received the :meth:`~CarrotListener.receive_message`
+* So for each message received the :meth:`~Consumer.receive_message`
   method is called, this checks the payload of the message for either
   a ``task`` key or a ``control`` key.
 
   If the message is a task, it verifies the validity of the message
   converts it to a :class:`celery.worker.job.TaskRequest`, and sends
-  it to :meth:`~CarrotListener.on_task`.
+  it to :meth:`~Consumer.on_task`.
 
   If the message is a control command the message is passed to
-  :meth:`~CarrotListener.on_control`, which in turn dispatches
+  :meth:`~Consumer.on_control`, which in turn dispatches
   the control command using the control dispatcher.
 
   It also tries to handle malformed or invalid messages properly,
@@ -51,7 +51,7 @@ up and running.
   the prefetch count is decremented again, though this cannot happen
   immediately because amqplib doesn't support doing broker requests
   across threads. Instead the current prefetch count is kept as a
-  shared counter, so as soon as  :meth:`~CarrotListener.consume_messages`
+  shared counter, so as soon as  :meth:`~Consumer.consume_messages`
   detects that the value has changed it will send out the actual
   QoS event to the broker.
 
@@ -60,7 +60,7 @@ up and running.
   Hoever, this is not dangerous as the broker will resend them
   to another worker when the channel is closed.
 
-* **WARNING**: :meth:`~CarrotListener.stop` does not close the connection!
+* **WARNING**: :meth:`~Consumer.stop` does not close the connection!
   This is because some pre-acked messages may be in processing,
   and they need to be finished before the channel is closed.
   For celeryd this means the pool must finish the tasks it has acked
@@ -140,7 +140,7 @@ class QoS(object):
         return int(self.value)
 
 
-class CarrotListener(object):
+class Consumer(object):
     """Listen for messages received from the broker and
     move them the the ready queue for task processing.
 
@@ -213,7 +213,7 @@ class CarrotListener(object):
         self.control_dispatch = ControlDispatch(app=self.app,
                                                 logger=logger,
                                                 hostname=self.hostname,
-                                                listener=self)
+                                                consumer=self)
         self.connection_errors = \
                 self.app.broker_connection().connection_errors
         self.queues = queues
@@ -233,16 +233,16 @@ class CarrotListener(object):
             try:
                 self.consume_messages()
             except self.connection_errors:
-                self.logger.error("CarrotListener: Connection to broker lost."
+                self.logger.error("Consumer: Connection to broker lost."
                                 + " Trying to re-establish connection...")
 
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
-        self.logger.debug("CarrotListener: Starting message consumer...")
+        self.logger.debug("Consumer: Starting message consumer...")
         self.task_consumer.consume()
         self.broadcast_consumer.consume()
         wait_for_message = self._mainloop().next
-        self.logger.debug("CarrotListener: Ready to accept tasks!")
+        self.logger.debug("Consumer: Ready to accept tasks!")
 
         while 1:
             if self.qos.prev != self.qos.next:
@@ -331,12 +331,12 @@ class CarrotListener(object):
             pass
 
     def close_connection(self):
-        self.logger.debug("CarrotListener: "
+        self.logger.debug("Consumer: "
                           "Closing consumer channel...")
         if self.task_consumer:
             self.task_consumer = \
                     self.maybe_conn_error(self.task_consumer.close)
-        self.logger.debug("CarrotListener: "
+        self.logger.debug("Consumer: "
                           "Closing connection to broker...")
         if self.connection:
             self.connection = self.maybe_conn_error(self.connection.close)
@@ -382,7 +382,7 @@ class CarrotListener(object):
     def reset_connection(self):
         """Re-establish connection and set up consumers."""
         self.logger.debug(
-                "CarrotListener: Re-establishing connection to the broker...")
+                "Consumer: Re-establishing connection to the broker...")
         self.stop_consumers()
 
         # Clear internal queues.
@@ -390,7 +390,7 @@ class CarrotListener(object):
         self.eta_schedule.clear()
 
         self.connection = self._open_connection()
-        self.logger.debug("CarrotListener: Connection Established.")
+        self.logger.debug("Consumer: Connection Established.")
         self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
                                                           queues=self.queues)
         # QoS: Reset prefetch window.
@@ -429,7 +429,7 @@ class CarrotListener(object):
 
         def _connection_error_handler(exc, interval):
             """Callback handler for connection errors."""
-            self.logger.error("CarrotListener: Connection Error: %s. " % exc
+            self.logger.error("Consumer: Connection Error: %s. " % exc
                      + "Trying again in %d seconds..." % interval)
 
         conn = self.app.broker_connection()
@@ -446,7 +446,7 @@ class CarrotListener(object):
         Does not close connection.
 
         """
-        self.logger.debug("CarrotListener: Stopping consumers...")
+        self.logger.debug("Consumer: Stopping consumers...")
         self.stop_consumers(close=False)
 
     @property

+ 3 - 3
celery/worker/control/__init__.py

@@ -12,12 +12,12 @@ class ControlDispatch(object):
     """Execute worker control panel commands."""
     Panel = Panel
 
-    def __init__(self, logger=None, hostname=None, listener=None, app=None):
+    def __init__(self, logger=None, hostname=None, consumer=None, app=None):
         self.app = app_or_default(app)
         self.logger = logger or self.app.log.get_default_logger()
         self.hostname = hostname or socket.gethostname()
-        self.listener = listener
-        self.panel = self.Panel(self.logger, self.listener, self.hostname,
+        self.consumer = consumer
+        self.panel = self.Panel(self.logger, self.consumer, self.hostname,
                                 app=self.app)
 
     def reply(self, data, exchange, routing_key, **kwargs):

+ 10 - 10
celery/worker/control/builtins.py

@@ -19,7 +19,7 @@ def revoke(panel, task_id, **kwargs):
 
 @Panel.register
 def enable_events(panel):
-    dispatcher = panel.listener.event_dispatcher
+    dispatcher = panel.consumer.event_dispatcher
     if not dispatcher.enabled:
         dispatcher.enable()
         dispatcher.send("worker-online")
@@ -30,7 +30,7 @@ def enable_events(panel):
 
 @Panel.register
 def disable_events(panel):
-    dispatcher = panel.listener.event_dispatcher
+    dispatcher = panel.consumer.event_dispatcher
     if dispatcher.enabled:
         dispatcher.send("worker-offline")
         dispatcher.disable()
@@ -71,11 +71,11 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
             task_name, ))
         return {"error": "unknown task"}
 
-    if not hasattr(panel.listener.ready_queue, "refresh"):
+    if not hasattr(panel.consumer.ready_queue, "refresh"):
         panel.logger.error("Rate limit attempt, but rate limits disabled.")
         return {"error": "rate limits disabled"}
 
-    panel.listener.ready_queue.refresh()
+    panel.consumer.ready_queue.refresh()
 
     if not rate_limit:
         panel.logger.warn("Disabled rate limits for tasks of type %s" % (
@@ -89,7 +89,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
 
 @Panel.register
 def dump_schedule(panel, safe=False, **kwargs):
-    schedule = panel.listener.eta_schedule.schedule
+    schedule = panel.consumer.eta_schedule.schedule
     if not schedule.queue:
         panel.logger.info("--Empty schedule--")
         return []
@@ -112,7 +112,7 @@ def dump_schedule(panel, safe=False, **kwargs):
 
 @Panel.register
 def dump_reserved(panel, safe=False, **kwargs):
-    ready_queue = panel.listener.ready_queue
+    ready_queue = panel.consumer.ready_queue
     reserved = ready_queue.items
     if not reserved:
         panel.logger.info("--Empty queue--")
@@ -132,8 +132,8 @@ def dump_active(panel, safe=False, **kwargs):
 @Panel.register
 def stats(panel, **kwargs):
     return {"total": state.total_count,
-            "listener": panel.listener.info,
-            "pool": panel.listener.pool.info}
+            "consumer": panel.consumer.info,
+            "pool": panel.consumer.pool.info}
 
 
 @Panel.register
@@ -175,7 +175,7 @@ def shutdown(panel, **kwargs):
 @Panel.register
 def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
         routing_key=None, **options):
-    cset = panel.listener.task_consumer
+    cset = panel.consumer.task_consumer
     declaration = dict(queue=queue,
                        exchange=exchange,
                        exchange_type=exchange_type,
@@ -189,6 +189,6 @@ def add_consumer(panel, queue=None, exchange=None, exchange_type="direct",
 
 @Panel.register
 def cancel_consumer(panel, queue=None, **_):
-    cset = panel.listener.task_consumer
+    cset = panel.consumer.task_consumer
     cset.cancel_by_queue(queue)
     return {"ok": "no longer consuming from %s" % (queue, )}

+ 4 - 2
celery/worker/control/registry.py

@@ -6,11 +6,13 @@ from celery.app import app_or_default
 class Panel(UserDict):
     data = dict()                               # Global registry.
 
-    def __init__(self, logger, listener, hostname=None, app=None):
+    def __init__(self, logger, consumer, hostname=None, app=None):
         self.app = app_or_default(app)
         self.logger = logger
         self.hostname = hostname
-        self.listener = listener
+        self.consumer = consumer
+        # Compat (pre 2.2)
+        self.listener = consumer
 
     @classmethod
     def register(cls, method, name=None):

+ 4 - 4
docs/configuration.rst

@@ -1069,13 +1069,13 @@ CELERYD_POOL
 Name of the task pool class used by the worker.
 Default is :class:`celery.concurrency.processes.TaskPool`.
 
-.. setting:: CELERYD_LISTENER
+.. setting:: CELERYD_CONSUMER
 
-CELERYD_LISTENER
+CELERYD_CONSUMER
 ~~~~~~~~~~~~~~~~
 
-Name of the listener class used by the worker.
-Default is :class:`celery.worker.listener.CarrotListener`.
+Name of the consumer class used by the worker.
+Default is :class:`celery.worker.consumer.Consumer`
 
 .. setting:: CELERYD_MEDIATOR
 

+ 1 - 1
docs/internals/app-overview.rst

@@ -225,7 +225,7 @@ App Dependency Tree
 * celery.bin.celeryd.WorkerCommand
     * celery.apps.worker.Worker
         * celery.worker.WorkerController
-            * celery.worker.listener.CarrotListener
+            * celery.worker.consumer.Consumer
                 * celery.worker.job.TaskRequest
                 * celery.events.EventDispatcher
                 * celery.worker.control.ControlDispatch

+ 2 - 2
docs/internals/moduleindex.rst

@@ -16,7 +16,7 @@ celery.worker
 * :class:`~celery.worker.WorkController`
 
 This is the worker's main process. It starts and stops all the components
-required by the worker: Pool, Mediator, Scheduler, ClockService, and Listener.
+required by the worker: Pool, Mediator, Scheduler, ClockService, and Consumer.
 
 * :func:`~celery.worker.process_initializer`
 
@@ -35,7 +35,7 @@ Handles acknowledgement, execution, writing results to backends and error handli
 celery.worker.pool
 ------------------
 
-celery.worker.listener
+celery.worker.consumer
 ----------------------
 
 celery.worker.controllers

+ 3 - 3
docs/internals/reference/celery.worker.listener.rst → docs/internals/reference/celery.worker.consumer.rst

@@ -1,11 +1,11 @@
 ==================================================
- Worker Message Listener - celery.worker.listener
+ Worker Message Consumer - celery.worker.consumer
 ==================================================
 
 .. contents::
     :local:
-.. currentmodule:: celery.worker.listener
+.. currentmodule:: celery.worker.consumer
 
-.. automodule:: celery.worker.listener
+.. automodule:: celery.worker.consumer
     :members:
     :undoc-members:

+ 1 - 1
docs/internals/reference/index.rst

@@ -9,7 +9,7 @@
     :maxdepth: 2
 
     celery.worker
-    celery.worker.listener
+    celery.worker.consumer
     celery.worker.job
     celery.worker.controllers
     celery.worker.buckets

+ 3 - 3
docs/internals/worker.rst

@@ -10,7 +10,7 @@
 Introduction
 ============
 
-The worker consists of 4 main components: the broker listener, the scheduler,
+The worker consists of 4 main components: the consumer, the scheduler,
 the mediator and the task pool. All these components runs in parallel working
 with two data structures: the ready queue and the ETA schedule.
 
@@ -34,8 +34,8 @@ The ETA schedule is a heap queue sorted by time.
 Components
 ==========
 
-Listener
---------------
+Consumer
+--------
 
 Receives messages from the broker using `Kombu`_.
 

+ 3 - 3
docs/reference/celery.conf.rst

@@ -289,10 +289,10 @@ Celeryd
     Name of the task pool class used by the worker.
     Default is ``"celery.concurrency.processes.TaskPool"``.
 
-.. data:: CELERYD_LISTENER
+.. data:: CELERYD_CONSUMER
 
-    Name of the listener class used by the worker.
-    Default is ``"celery.worker.listener.CarrotListener"``.
+    Name of the consumer class used by the worker.
+    Default is ``"celery.worker.consumer.Consumer"``.
 
 .. data:: CELERYD_MEDIATOR
 

+ 2 - 2
docs/userguide/workers.rst

@@ -307,7 +307,7 @@ Remote control commands are registered in the control panel and
 they take a single argument: the current
 :class:`~celery.worker.control.ControlDispatch` instance.
 From there you have access to the active
-:class:`celery.worker.listener.CarrotListener` if needed.
+:class:`~celery.worker.consumer.Consumer` if needed.
 
 Here's an example control command that restarts the broker connection:
 
@@ -318,7 +318,7 @@ Here's an example control command that restarts the broker connection:
     @Panel.register
     def reset_connection(panel):
         panel.logger.critical("Connection reset by remote control.")
-        panel.listener.reset_connection()
+        panel.consumer.reset_connection()
         return {"ok": "connection reset"}