|
@@ -10,18 +10,20 @@ up and running.
|
|
|
connection is lost) it calls :meth:`~CarrotListener.reset_connection`,
|
|
|
and starts the consumer by calling :meth:`~CarrotListener.consume_messages`.
|
|
|
|
|
|
-* :meth:`~CarrotListener.reset_connection``, clears the internal queues,
|
|
|
- establishes a new connection to the broker, sets up the task queues (+ QoS),
|
|
|
- the broadcast remote control command consumer, the event dispatcher and the
|
|
|
- heartbeat.
|
|
|
+* :meth:`~CarrotListener.reset_connection`, clears the internal queues,
|
|
|
+ establishes a new connection to the broker, sets up the task
|
|
|
+ consumer (+ QoS), and the broadcast remote control command consumer.
|
|
|
+
|
|
|
+ Also if events are enabled it configures the event dispatcher and starts
|
|
|
+ up the hartbeat thread.
|
|
|
|
|
|
* Finally it can consume messages. :meth:`~CarrotListener.consume_messages`
|
|
|
is simply an infinite loop waiting for events on the AMQP channels.
|
|
|
|
|
|
Both the task consumer and the broadcast consumer uses the same
|
|
|
callback: :meth:`~CarrotListener.receive_message`.
|
|
|
- The reason for this is that not all carrot backends supports receiving
|
|
|
- on different channels, so we use a little nasty trick
|
|
|
+ The reason is that some carrot backends doesn't support consuming
|
|
|
+ from several channels simultaneously, so we use a little nasty trick
|
|
|
(:meth:`~CarrotListener._detect_wait_method`) to select the best
|
|
|
possible channel distribution depending on the functionality supported
|
|
|
by the carrot backend.
|
|
@@ -36,21 +38,21 @@ up and running.
|
|
|
|
|
|
If the message is a control command the message is passed to
|
|
|
:meth:`~CarrotListener.on_control`, which in turn dispatches
|
|
|
- the control command using the :attr:`~CarrotListener.control_dispatcher`.
|
|
|
+ the control command using the control dispatcher.
|
|
|
|
|
|
- It also tried to handle malformed or invalid messages properly,
|
|
|
+ It also tries to handle malformed or invalid messages properly,
|
|
|
so the worker doesn't choke on them and die. Any invalid messages
|
|
|
are acknowledged immediately and logged, so the message is not resent
|
|
|
- again and again.
|
|
|
+ again, and again.
|
|
|
|
|
|
* If the task has an ETA/countdown, the task is moved to the ``eta_schedule``
|
|
|
- so the :class:`celery.worker.scheduler.Scheduler` can schedule it at its
|
|
|
- deadline. Tasks without eta are moved immediately to the ``ready_queue``,
|
|
|
- so it can be picked up by the :class:`celery.worker.controllers.Mediator`
|
|
|
- and sent to the pool.
|
|
|
+ so the :class:`~celery.worker.scheduler.Scheduler` can schedule it at its
|
|
|
+ deadline. Tasks without an eta are moved immediately to the ``ready_queue``,
|
|
|
+ so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
|
|
|
+ to be sent to the pool.
|
|
|
|
|
|
* When a task with an ETA is received the QoS prefetch count is also
|
|
|
- incremented so we can reserve another message. When the ETA is met
|
|
|
+ incremented, so another message can be reserved. When the ETA is met
|
|
|
the prefetch count is decremented again, though this cannot happen
|
|
|
immediately because amqplib doesn't support doing broker requests
|
|
|
across threads. Instead the current prefetch count is kept as a
|