|
@@ -23,7 +23,7 @@ class AMQPListener(object):
|
|
|
|
|
|
:param bucket_queue: See :attr:`bucket_queue`.
|
|
|
:param hold_queue: See :attr:`hold_queue`.
|
|
|
-
|
|
|
+
|
|
|
.. attribute:: bucket_queue
|
|
|
|
|
|
The queue that holds tasks ready for processing immediately.
|
|
@@ -50,7 +50,7 @@ class AMQPListener(object):
|
|
|
"""Start processing AMQP messages."""
|
|
|
task_consumer = self.reset_connection()
|
|
|
it = task_consumer.iterconsume(limit=None)
|
|
|
-
|
|
|
+
|
|
|
while True:
|
|
|
it.next()
|
|
|
|
|
@@ -61,16 +61,16 @@ class AMQPListener(object):
|
|
|
|
|
|
def receive_message(self, message_data, message):
|
|
|
"""The callback called when a new message is received.
|
|
|
-
|
|
|
+
|
|
|
If the message has an ``eta`` we move it to the hold queue,
|
|
|
otherwise we move it the bucket queue for immediate processing.
|
|
|
-
|
|
|
+
|
|
|
"""
|
|
|
task = TaskWrapper.from_message(message, message_data,
|
|
|
logger=self.logger)
|
|
|
eta = message_data.get("eta")
|
|
|
if eta:
|
|
|
- self.hold_queue.put((task, eta))
|
|
|
+ self.hold_queue.put((task, eta))
|
|
|
else:
|
|
|
self.bucket_queue.put(task)
|
|
|
|
|
@@ -180,10 +180,10 @@ class WorkController(object):
|
|
|
self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
|
|
|
logger=self.logger)
|
|
|
self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
|
|
|
-
|
|
|
+
|
|
|
# The order is important here;
|
|
|
# the first in the list is the first to start,
|
|
|
- # and they must be stopped in reverse order.
|
|
|
+ # and they must be stopped in reverse order.
|
|
|
self.components = [self.pool,
|
|
|
self.mediator,
|
|
|
self.periodic_work_controller,
|