|
@@ -1,7 +1,81 @@
|
|
|
+"""
|
|
|
+
|
|
|
+This module contains the component responsible for consuming messages
|
|
|
+from the broker, processing the messages and keeping the broker connections
|
|
|
+up and running.
|
|
|
+
|
|
|
+
|
|
|
+* :meth:`~CarrotListener.start` is an infinite loop, which only iterates
|
|
|
+ again if the connection is lost. For each iteration (at start, or if the
|
|
|
+ connection is lost) it calls :meth:`~CarrotListener.reset_connection`,
|
|
|
+ and starts the consumer by calling :meth:`~CarrotListener.consume_messages`.
|
|
|
+
|
|
|
+* :meth:`~CarrotListener.reset_connection``, clears the internal queues,
|
|
|
+ establishes a new connection to the broker, sets up the task queues (+ QoS),
|
|
|
+ the broadcast remote control command consumer, the event dispatcher and the
|
|
|
+ heartbeat.
|
|
|
+
|
|
|
+* Finally it can consume messages. :meth:`~CarrotListener.consume_messages`
|
|
|
+ is simply an infinite loop waiting for events on the AMQP channels.
|
|
|
+
|
|
|
+ Both the task consumer and the broadcast consumer uses the same
|
|
|
+ callback: :meth:`~CarrotListener.receive_message`.
|
|
|
+ The reason for this is that not all carrot backends supports receiving
|
|
|
+ on different channels, so we use a little nasty trick
|
|
|
+ (:meth:`~CarrotListener._detect_wait_method`) to select the best
|
|
|
+ possible channel distribution depending on the functionality supported
|
|
|
+ by the carrot backend.
|
|
|
+
|
|
|
+* So for each message received the :meth:`~CarrotListener.receive_message`
|
|
|
+ method is called, this checks the payload of the message for either
|
|
|
+ a ``task`` key or a ``control`` key.
|
|
|
+
|
|
|
+ If the message is a task, it verifies the validity of the message
|
|
|
+ converts it to a :class:`celery.worker.job.TaskRequest`, and sends
|
|
|
+ it to :meth:`~CarrotListener.on_task`.
|
|
|
+
|
|
|
+ If the message is a control command the message is passed to
|
|
|
+ :meth:`~CarrotListener.on_control`, which in turn dispatches
|
|
|
+ the control command using the :attr:`~CarrotListener.control_dispatcher`.
|
|
|
+
|
|
|
+ It also tried to handle malformed or invalid messages properly,
|
|
|
+ so the worker doesn't choke on them and die. Any invalid messages
|
|
|
+ are acknowledged immediately and logged, so the message is not resent
|
|
|
+ again and again.
|
|
|
+
|
|
|
+* If the task has an ETA/countdown, the task is moved to the ``eta_schedule``
|
|
|
+ so the :class:`celery.worker.scheduler.Scheduler` can schedule it at its
|
|
|
+ deadline. Tasks without eta are moved immediately to the ``ready_queue``,
|
|
|
+ so it can be picked up by the :class:`celery.worker.controllers.Mediator`
|
|
|
+ and sent to the pool.
|
|
|
+
|
|
|
+* When a task with an ETA is received the QoS prefetch count is also
|
|
|
+ incremented so we can reserve another message. When the ETA is met
|
|
|
+ the prefetch count is decremented again, though this cannot happen
|
|
|
+ immediately because amqplib doesn't support doing broker requests
|
|
|
+ across threads. Instead the current prefetch count is kept as a
|
|
|
+ shared counter, so as soon as :meth:`~CarrotListener.consume_messages`
|
|
|
+ detects that the value has changed it will send out the actual
|
|
|
+ QoS event to the broker.
|
|
|
+
|
|
|
+* Notice that when the connection is lost all internal queues are cleared
|
|
|
+ because we can no longer ack the messages reserved in memory.
|
|
|
+ Hoever, this is not dangerous as the broker will resend them
|
|
|
+ to another worker when the channel is closed.
|
|
|
+
|
|
|
+* **WARNING**: :meth:`~CarrotListener.stop` does not close the connection!
|
|
|
+ This is because some pre-acked messages may be in processing,
|
|
|
+ and they need to be finished before the channel is closed.
|
|
|
+ For celeryd this means the pool must finish the tasks it has acked
|
|
|
+ early, *then* close the connection.
|
|
|
+
|
|
|
+"""
|
|
|
+
|
|
|
from __future__ import generators
|
|
|
|
|
|
import socket
|
|
|
import warnings
|
|
|
+
|
|
|
from datetime import datetime
|
|
|
|
|
|
from dateutil.parser import parse as parse_iso8601
|
|
@@ -23,6 +97,15 @@ CLOSE = 0x1
|
|
|
|
|
|
|
|
|
class QoS(object):
|
|
|
+ """Quality of Service for Channel.
|
|
|
+
|
|
|
+ For thread-safe increment/decrement of a channels prefetch count value.
|
|
|
+
|
|
|
+ :param consumer: A :class:`carrot.messaging.Consumer` instance.
|
|
|
+ :param initial_value: Initial prefetch count value.
|
|
|
+ :param logger: Logger used to log debug messages.
|
|
|
+
|
|
|
+ """
|
|
|
prev = None
|
|
|
|
|
|
def __init__(self, consumer, initial_value, logger):
|
|
@@ -30,24 +113,32 @@ class QoS(object):
|
|
|
self.logger = logger
|
|
|
self.value = SharedCounter(initial_value)
|
|
|
|
|
|
- self.set(int(self.value))
|
|
|
-
|
|
|
def increment(self):
|
|
|
+ """Increment the current prefetch count value by one."""
|
|
|
return self.set(self.value.increment())
|
|
|
|
|
|
def decrement(self):
|
|
|
+ """Decrement the current prefetch count value by one."""
|
|
|
return self.set(self.value.decrement())
|
|
|
|
|
|
def decrement_eventually(self):
|
|
|
+ """Decrement the value, but do not update the qos.
|
|
|
+
|
|
|
+ The MainThread will be responsible for calling :meth:`update`
|
|
|
+ when necessary.
|
|
|
+
|
|
|
+ """
|
|
|
self.value.decrement()
|
|
|
|
|
|
def set(self, pcount):
|
|
|
+ """Set channel prefetch_count setting."""
|
|
|
self.logger.debug("basic.qos: prefetch_count->%s" % pcount)
|
|
|
self.consumer.qos(prefetch_count=pcount)
|
|
|
self.prev = pcount
|
|
|
return pcount
|
|
|
|
|
|
def update(self):
|
|
|
+ """Update prefetch count with current value."""
|
|
|
return self.set(self.next)
|
|
|
|
|
|
@property
|
|
@@ -71,11 +162,42 @@ class CarrotListener(object):
|
|
|
Scheduler for paused tasks. Reasons for being paused include
|
|
|
a countdown/eta or that it's waiting for retry.
|
|
|
|
|
|
+ .. attribute:: send_events
|
|
|
+
|
|
|
+ Is events enabled?
|
|
|
+
|
|
|
+ .. attribute:: init_callback
|
|
|
+
|
|
|
+ Callback to be called the first time the connection is active.
|
|
|
+
|
|
|
+ .. attribute:: hostname
|
|
|
+
|
|
|
+ Current hostname. Defaults to the system hostname.
|
|
|
+
|
|
|
+ .. attribute:: initial_prefetch_count
|
|
|
+
|
|
|
+ Initial QoS prefetch count for the task channel.
|
|
|
+
|
|
|
+ .. attribute:: control_dispatch
|
|
|
+
|
|
|
+ Control command dispatcher.
|
|
|
+ See :class:`celery.worker.control.ControlDispatch`.
|
|
|
+
|
|
|
+ .. attribute:: event_dispatcher
|
|
|
+
|
|
|
+ See :class:`celery.events.EventDispatcher`.
|
|
|
+
|
|
|
+ .. attribute:: hart
|
|
|
+
|
|
|
+ :class:`~celery.worker.heartbeat.Heart` sending out heart beats
|
|
|
+ if events enabled.
|
|
|
+
|
|
|
.. attribute:: logger
|
|
|
|
|
|
The logger used.
|
|
|
|
|
|
"""
|
|
|
+ _state = None
|
|
|
|
|
|
def __init__(self, ready_queue, eta_schedule, logger,
|
|
|
init_callback=noop, send_events=False, hostname=None,
|
|
@@ -89,12 +211,11 @@ class CarrotListener(object):
|
|
|
self.logger = logger
|
|
|
self.hostname = hostname or socket.gethostname()
|
|
|
self.initial_prefetch_count = initial_prefetch_count
|
|
|
+ self.event_dispatcher = None
|
|
|
+ self.heart = None
|
|
|
self.control_dispatch = ControlDispatch(logger=logger,
|
|
|
hostname=self.hostname,
|
|
|
listener=self)
|
|
|
- self.event_dispatcher = None
|
|
|
- self.heart = None
|
|
|
- self._state = None
|
|
|
|
|
|
def start(self):
|
|
|
"""Start the consumer.
|
|
@@ -155,6 +276,10 @@ class CarrotListener(object):
|
|
|
task.task_name, task.task_id))
|
|
|
self.ready_queue.put(task)
|
|
|
|
|
|
+ def on_control(self, control):
|
|
|
+ """Handle received remote control command."""
|
|
|
+ return self.control_dispatch.dispatch_from_message(control)
|
|
|
+
|
|
|
def receive_message(self, message_data, message):
|
|
|
"""The callback called when a new message is received. """
|
|
|
|
|
@@ -179,8 +304,7 @@ class CarrotListener(object):
|
|
|
# Handle control command
|
|
|
control = message_data.get("control")
|
|
|
if control:
|
|
|
- self.control_dispatch.dispatch_from_message(control)
|
|
|
- return
|
|
|
+ return self.on_control(control)
|
|
|
|
|
|
warnings.warn(RuntimeWarning(
|
|
|
"Received and deleted unknown message. Wrong destination?!? \
|
|
@@ -196,6 +320,7 @@ class CarrotListener(object):
|
|
|
self.connection = self.connection and self.connection.close()
|
|
|
|
|
|
def stop_consumers(self, close=True):
|
|
|
+ """Stop consuming."""
|
|
|
if not self._state == RUN:
|
|
|
return
|
|
|
self._state = CLOSE
|
|
@@ -229,6 +354,7 @@ class CarrotListener(object):
|
|
|
message.ack()
|
|
|
|
|
|
def reset_connection(self):
|
|
|
+ """Re-establish connection and set up consumers."""
|
|
|
self.logger.debug(
|
|
|
"CarrotListener: Re-establishing connection to the broker...")
|
|
|
self.stop_consumers()
|
|
@@ -243,6 +369,7 @@ class CarrotListener(object):
|
|
|
# QoS: Reset prefetch window.
|
|
|
self.qos = QoS(self.task_consumer,
|
|
|
self.initial_prefetch_count, self.logger)
|
|
|
+ self.qos.update() # enable prefetch_count QoS.
|
|
|
|
|
|
self.task_consumer.on_decode_error = self.on_decode_error
|
|
|
self.broadcast_consumer = BroadcastConsumer(self.connection,
|
|
@@ -297,5 +424,10 @@ class CarrotListener(object):
|
|
|
return conn
|
|
|
|
|
|
def stop(self):
|
|
|
+ """Stop consuming.
|
|
|
+
|
|
|
+ Does not close connection.
|
|
|
+
|
|
|
+ """
|
|
|
self.logger.debug("CarrotListener: Stopping consumers...")
|
|
|
self.stop_consumers(close=False)
|