AMQP Primer
+A message consists of headers and a body. Celery uses headers to store
+the content type of the message and its content encoding. In Celery the
+content type is usually the serialization format used to serialize the
+message, and the body contains the name of the task to execute, the
+task id (UUID), the arguments to execute it with and some additional
+metadata - like the number of retries and its ETA if any.
+This is an example task message represented as a Python dictionary:
+.. code-block:: python
+ {"task": "myapp.tasks.add",
+ "id":
+ "args": [4, 4],
+ "kwargs": {}}
+Producers, consumers and brokers
+The client sending messages is typically called a *publisher*, or
+a *producer*, while the entity receiving messages is called
+a *consumer*.
+The *broker* is the message server, routing messages from producers
+to consumers.
+You are likely to see these terms used a lot in AMQP related material.
Exchanges, queues and routing keys.
+1. Messages are sent to exchanges.
+2. An exchange routes messages to one or more queues. Several exchange types
+ exists, providing different ways to do routing.
+3. The message waits in the queue until someone consumes from it.
+4. The message is deleted from the queue when it has been acknowledged.
The steps required to send and receive messages are:
- 1. Create an exchange
- 2. Create a queue
- 3. Bind the queue to the exchange.
+1. Create an exchange
+2. Create a queue
+3. Bind the queue to the exchange.
-Exchange type
+Celery automatically creates the entities necessary for the queues in
+``CELERY_QUEUES`` to work (unless the queue's ``auto_declare`` setting
+is set)
+Here's an example queue configuration with three queues;
+One for video, one for images and one default queue for everything else:
+.. code-block:: python
+ "default": {
+ "exchange": "default",
+ "binding_key": "default"},
+ "videos": {
+ "exchange": "media",
+ "binding_key": "media.video",
+ },
+ "images": {
+ "exchange": "media",
+ "binding_key": "media.image",
+ }
+ }
+**NOTE**: In Celery the ``routing_key`` is the key used to send the message,
+while ``binding_key`` is the key the queue is bound with. In the AMQP API
+they are both referred to as a routing key.
+Exchange types
The exchange type defines how the messages are routed through the exchange.
The exchange types defined in the standard are ``direct``, ``topic``,
``fanout`` and ``headers``. Also non-standard exchange types are available
-as plugins to RabbitMQ, like the ``last-value-cache plug-in`` by Michael
+as plugins to RabbitMQ, like the `last-value-cache plug-in`_ by Michael
.. _`last-value-cache plug-in``:
+Direct exchanges
+Direct exchanges match by exact routing keys, so a queue bound with
+the routing key ``video`` only receives messages with the same routing key.
+Topic exchanges
+Topic exchanges matches routing keys using dot-separated words, and can
+include wildcard characters: ``*`` matches a single word, ``#`` matches
+zero or more words.
+With routing keys like ``usa.news``, ``usa.weather``, ``norway.news`` and
+``norway.weather``, bindings could be ``*.news`` (all news), ``usa.#`` (all
+items in the USA) or ``usa.weather`` (all USA weather items).
-Consumers and Producers
Related API commands
Related API commands
$ camqadm
-> connecting to amqp://guest@localhost:5672/.
-> connected.
-> connected.
- -->
+ 1>
-Here ``-->`` is the prompt. Type ``help`` for a list of commands, there's
-also autocomplete so you can start typing a command then hit ``tab`` to show a
-list of possible matches.
+Here ``1>`` is the prompt. The number is counting the number of commands you
+have executed. Type ``help`` for a list of commands. It also has
+autocompletion, so you can start typing a command and then hit the
+``tab`` key to show a list of possible matches.
Now let's create a queue we can send messages to::
- --> exchange.declare testexchange direct
+ 1> exchange.declare testexchange direct
- --> queue.declare testqueue
+ 2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
- --> queue.bind testqueue testexchange testkey
+ 3> queue.bind testqueue testexchange testkey
This created the direct exchange ``testexchange``, and a queue
@@ -120,7 +202,7 @@ From now on all messages sent to the exchange ``testexchange`` with routing
key ``testkey`` will be moved to this queue. We can send a message by
using the ``basic.publish`` command::
- --> basic.publish "This is a message!" testexchange testkey
+ 4> basic.publish "This is a message!" testexchange testkey
@@ -131,7 +213,7 @@ real application would declare consumers instead.
Pop a message off the queue::
- --> basic.get testqueue
+ 5> basic.get testqueue
{'body': 'This is a message!',
'delivery_info': {'delivery_tag': 1,
'exchange': u'testexchange',
@@ -153,12 +235,12 @@ the delivery tag ``1`` might point to a different message than in this channel.
You can acknowledge the message we received using ``basic.ack``::
- --> basic.ack 1
+ 6> basic.ack 1
To clean up after our test session we should delete the entities we created::
- --> queue.delete testqueue
+ 7> queue.delete testqueue
ok. 0 messages deleted.
- --> exchange.delete testexchange
+ 8> exchange.delete testexchange