|
@@ -18,7 +18,7 @@ up and running.
|
|
consumer (+ QoS), and the broadcast remote control command consumer.
|
|
consumer (+ QoS), and the broadcast remote control command consumer.
|
|
|
|
|
|
Also if events are enabled it configures the event dispatcher and starts
|
|
Also if events are enabled it configures the event dispatcher and starts
|
|
- up the hartbeat thread.
|
|
|
|
|
|
+ up the heartbeat thread.
|
|
|
|
|
|
* Finally it can consume messages. :meth:`~Consumer.consume_messages`
|
|
* Finally it can consume messages. :meth:`~Consumer.consume_messages`
|
|
is simply an infinite loop waiting for events on the AMQP channels.
|
|
is simply an infinite loop waiting for events on the AMQP channels.
|
|
@@ -60,7 +60,7 @@ up and running.
|
|
|
|
|
|
* Notice that when the connection is lost all internal queues are cleared
|
|
* Notice that when the connection is lost all internal queues are cleared
|
|
because we can no longer ack the messages reserved in memory.
|
|
because we can no longer ack the messages reserved in memory.
|
|
- Hoever, this is not dangerous as the broker will resend them
|
|
|
|
|
|
+ However, this is not dangerous as the broker will resend them
|
|
to another worker when the channel is closed.
|
|
to another worker when the channel is closed.
|
|
|
|
|
|
* **WARNING**: :meth:`~Consumer.stop` does not close the connection!
|
|
* **WARNING**: :meth:`~Consumer.stop` does not close the connection!
|
|
@@ -194,7 +194,7 @@ class QoS(object):
|
|
|
|
|
|
class Consumer(object):
|
|
class Consumer(object):
|
|
"""Listen for messages received from the broker and
|
|
"""Listen for messages received from the broker and
|
|
- move them the the ready queue for task processing.
|
|
|
|
|
|
+ move them to the ready queue for task processing.
|
|
|
|
|
|
:param ready_queue: See :attr:`ready_queue`.
|
|
:param ready_queue: See :attr:`ready_queue`.
|
|
:param eta_schedule: See :attr:`eta_schedule`.
|
|
:param eta_schedule: See :attr:`eta_schedule`.
|
|
@@ -226,7 +226,7 @@ class Consumer(object):
|
|
|
|
|
|
#: The thread that sends event heartbeats at regular intervals.
|
|
#: The thread that sends event heartbeats at regular intervals.
|
|
#: The heartbeats are used by monitors to detect that a worker
|
|
#: The heartbeats are used by monitors to detect that a worker
|
|
- #: went offline/disappeared.
|
|
|
|
|
|
+ #: went off-line/disappeared.
|
|
heart = None
|
|
heart = None
|
|
|
|
|
|
#: The logger instance to use. Defaults to the default Celery logger.
|
|
#: The logger instance to use. Defaults to the default Celery logger.
|
|
@@ -289,7 +289,7 @@ class Consumer(object):
|
|
def start(self):
|
|
def start(self):
|
|
"""Start the consumer.
|
|
"""Start the consumer.
|
|
|
|
|
|
- Automatically surivives intermittent connection failure,
|
|
|
|
|
|
+ Automatically survives intermittent connection failure,
|
|
and will retry establishing the connection and restart
|
|
and will retry establishing the connection and restart
|
|
consuming messages.
|
|
consuming messages.
|
|
|
|
|
|
@@ -348,7 +348,7 @@ class Consumer(object):
|
|
eta = timer2.to_timestamp(task.eta)
|
|
eta = timer2.to_timestamp(task.eta)
|
|
except OverflowError, exc:
|
|
except OverflowError, exc:
|
|
self.logger.error(
|
|
self.logger.error(
|
|
- "Couldn't convert eta %s to timestamp: %r. Task: %r" % (
|
|
|
|
|
|
+ "Couldn't convert eta %s to time stamp: %r. Task: %r" % (
|
|
task.eta, exc, task.info(safe=True)),
|
|
task.eta, exc, task.info(safe=True)),
|
|
exc_info=sys.exc_info())
|
|
exc_info=sys.exc_info())
|
|
task.acknowledge()
|
|
task.acknowledge()
|
|
@@ -392,7 +392,7 @@ class Consumer(object):
|
|
:param message: The kombu message object.
|
|
:param message: The kombu message object.
|
|
|
|
|
|
"""
|
|
"""
|
|
- # need to guard against errors occuring while acking the message.
|
|
|
|
|
|
+ # need to guard against errors occurring while acking the message.
|
|
def ack():
|
|
def ack():
|
|
try:
|
|
try:
|
|
message.ack()
|
|
message.ack()
|
|
@@ -558,7 +558,7 @@ class Consumer(object):
|
|
self.initial_prefetch_count, self.logger)
|
|
self.initial_prefetch_count, self.logger)
|
|
self.qos.update()
|
|
self.qos.update()
|
|
|
|
|
|
- # receive_message handles incomsing messages.
|
|
|
|
|
|
+ # receive_message handles incoming messages.
|
|
self.task_consumer.register_callback(self.receive_message)
|
|
self.task_consumer.register_callback(self.receive_message)
|
|
|
|
|
|
# Setup the process mailbox.
|
|
# Setup the process mailbox.
|
|
@@ -583,7 +583,7 @@ class Consumer(object):
|
|
"""Restart the heartbeat thread.
|
|
"""Restart the heartbeat thread.
|
|
|
|
|
|
This thread sends heartbeat events at intervals so monitors
|
|
This thread sends heartbeat events at intervals so monitors
|
|
- can tell if the worker is offline/missing.
|
|
|
|
|
|
+ can tell if the worker is off-line/missing.
|
|
|
|
|
|
"""
|
|
"""
|
|
self.heart = Heart(self.priority_timer, self.event_dispatcher)
|
|
self.heart = Heart(self.priority_timer, self.event_dispatcher)
|