|
@@ -38,7 +38,7 @@ Say you have two servers, `x`, and `y` that handles regular tasks,
|
|
|
and one server `z`, that only handles feed related tasks. You can use this
|
|
|
configuration::
|
|
|
|
|
|
- CELERY_ROUTES = {"feed.tasks.import_feed": {"queue": "feeds"}}
|
|
|
+ CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
|
|
|
|
|
|
With this route enabled import feed tasks will be routed to the
|
|
|
`"feeds"` queue, while all other tasks will be routed to the default queue
|
|
@@ -65,9 +65,9 @@ configuration:
|
|
|
|
|
|
from kombu import Exchange, Queue
|
|
|
|
|
|
- CELERY_DEFAULT_QUEUE = "default"
|
|
|
+ CELERY_DEFAULT_QUEUE = 'default'
|
|
|
CELERY_QUEUES = (
|
|
|
- Queue("default", Exchange("default"), routing_key="default"),
|
|
|
+ Queue('default', Exchange('default'), routing_key='default'),
|
|
|
)
|
|
|
|
|
|
.. _routing-autoqueue-details:
|
|
@@ -83,9 +83,9 @@ A queue named `"video"` will be created with the following settings:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- {"exchange": "video",
|
|
|
- "exchange_type": "direct",
|
|
|
- "routing_key": "video"}
|
|
|
+ {'exchange': 'video',
|
|
|
+ 'exchange_type': 'direct',
|
|
|
+ 'routing_key': 'video'}
|
|
|
|
|
|
The non-AMQP backends like `ghettoq` does not support exchanges, so they
|
|
|
require the exchange to have the same name as the queue. Using this design
|
|
@@ -104,14 +104,14 @@ configuration:
|
|
|
|
|
|
from kombu import Queue
|
|
|
|
|
|
- CELERY_DEFAULT_QUEUE = "default"
|
|
|
+ CELERY_DEFAULT_QUEUE = 'default'
|
|
|
CELERY_QUEUES = (
|
|
|
- Queue("default", routing_key="task.#"),
|
|
|
- Queue("feed_tasks", routing_key="feed.#"),
|
|
|
+ Queue('default', routing_key='task.#'),
|
|
|
+ Queue('feed_tasks', routing_key='feed.#'),
|
|
|
)
|
|
|
- CELERY_DEFAULT_EXCHANGE = "tasks"
|
|
|
- CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
|
|
|
- CELERY_DEFAULT_ROUTING_KEY = "task.default"
|
|
|
+ CELERY_DEFAULT_EXCHANGE = 'tasks'
|
|
|
+ CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
|
|
|
+ CELERY_DEFAULT_ROUTING_KEY = 'task.default'
|
|
|
|
|
|
:setting:`CELERY_QUEUES` is a list of :class:`~kombu.entitity.Queue`
|
|
|
instances.
|
|
@@ -125,9 +125,9 @@ To route a task to the `feed_tasks` queue, you can add an entry in the
|
|
|
.. code-block:: python
|
|
|
|
|
|
CELERY_ROUTES = {
|
|
|
- "feeds.tasks.import_feed": {
|
|
|
- "queue": "feed_tasks",
|
|
|
- "routing_key": "feed.import",
|
|
|
+ 'feeds.tasks.import_feed': {
|
|
|
+ 'queue': 'feed_tasks',
|
|
|
+ 'routing_key': 'feed.import',
|
|
|
},
|
|
|
}
|
|
|
|
|
@@ -136,9 +136,9 @@ You can also override this using the `routing_key` argument to
|
|
|
:meth:`Task.apply_async`, or :func:`~celery.execute.send_task`:
|
|
|
|
|
|
>>> from feeds.tasks import import_feed
|
|
|
- >>> import_feed.apply_async(args=["http://cnn.com/rss"],
|
|
|
- ... queue="feed_tasks",
|
|
|
- ... routing_key="feed.import")
|
|
|
+ >>> import_feed.apply_async(args=['http://cnn.com/rss'],
|
|
|
+ ... queue='feed_tasks',
|
|
|
+ ... routing_key='feed.import')
|
|
|
|
|
|
|
|
|
To make server `z` consume from the feed queue exclusively you can
|
|
@@ -164,10 +164,10 @@ just specify a custom exchange and exchange type:
|
|
|
from kombu import Exchange, Queue
|
|
|
|
|
|
CELERY_QUEUES = (
|
|
|
- Queue("feed_tasks", routing_key="feed.#"),
|
|
|
- Queue("regular_tasks", routing_key="task.#"),
|
|
|
- Queue("image_tasks", exchange=Exchange("mediatasks", type="direct"),
|
|
|
- routing_key="image.compress"),
|
|
|
+ Queue('feed_tasks', routing_key='feed.#'),
|
|
|
+ Queue('regular_tasks', routing_key='task.#'),
|
|
|
+ Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
|
|
|
+ routing_key='image.compress'),
|
|
|
)
|
|
|
|
|
|
If you're confused about these terms, you should read up on AMQP.
|
|
@@ -204,10 +204,10 @@ This is an example task message represented as a Python dictionary:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- {"task": "myapp.tasks.add",
|
|
|
- "id": "54086c5e-6193-4575-8308-dbab76798756",
|
|
|
- "args": [4, 4],
|
|
|
- "kwargs": {}}
|
|
|
+ {'task': 'myapp.tasks.add',
|
|
|
+ 'id': '54086c5e-6193-4575-8308-dbab76798756',
|
|
|
+ 'args': [4, 4],
|
|
|
+ 'kwargs': {}}
|
|
|
|
|
|
.. _amqp-producers-consumers-brokers:
|
|
|
|
|
@@ -253,13 +253,13 @@ One for video, one for images and one default queue for everything else:
|
|
|
from kombu import Exchange, Queue
|
|
|
|
|
|
CELERY_QUEUES = (
|
|
|
- Queue("default", Exchange("default"), routing_key="default"),
|
|
|
- Queue("videos", Exchange("media"), routing_key="media.video"),
|
|
|
- Queue("images", Exchange("media"), routing_key="media.image"),
|
|
|
+ Queue('default', Exchange('default'), routing_key='default'),
|
|
|
+ Queue('videos', Exchange('media'), routing_key='media.video'),
|
|
|
+ Queue('images', Exchange('media'), routing_key='media.image'),
|
|
|
)
|
|
|
- CELERY_DEFAULT_QUEUE = "default"
|
|
|
- CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
|
|
|
- CELERY_DEFAULT_ROUTING_KEY = "default"
|
|
|
+ CELERY_DEFAULT_QUEUE = 'default'
|
|
|
+ CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
|
|
|
+ CELERY_DEFAULT_ROUTING_KEY = 'default'
|
|
|
|
|
|
.. _amqp-exchange-types:
|
|
|
|
|
@@ -385,7 +385,7 @@ From now on all messages sent to the exchange ``testexchange`` with routing
|
|
|
key ``testkey`` will be moved to this queue. We can send a message by
|
|
|
using the ``basic.publish`` command::
|
|
|
|
|
|
- 4> basic.publish "This is a message!" testexchange testkey
|
|
|
+ 4> basic.publish 'This is a message!' testexchange testkey
|
|
|
ok.
|
|
|
|
|
|
Now that the message is sent we can retrieve it again. We use the
|
|
@@ -444,25 +444,17 @@ One for video, one for images and one default queue for everything else:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- CELERY_QUEUES = {
|
|
|
- "default": {
|
|
|
- "exchange": "default",
|
|
|
- "routing_key": "default"},
|
|
|
- "videos": {
|
|
|
- "exchange": "media",
|
|
|
- "exchange_type": "topic",
|
|
|
- "routing_key": "media.video",
|
|
|
- },
|
|
|
- "images": {
|
|
|
- "exchange": "media",
|
|
|
- "exchange_type": "topic",
|
|
|
- "routing_key": "media.image",
|
|
|
- }
|
|
|
- }
|
|
|
- CELERY_DEFAULT_QUEUE = "default"
|
|
|
- CELERY_DEFAULT_EXCHANGE = "default"
|
|
|
- CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
|
|
|
- CELERY_DEFAULT_ROUTING_KEY = "default"
|
|
|
+ default_exchange = Exchange('default', type='direct')
|
|
|
+ media_exchange = Exchange('media', type='direct')
|
|
|
+
|
|
|
+ CELERY_QUEUES = (
|
|
|
+ Queue('default', default_exchange, routing_key='default'),
|
|
|
+ Queue('videos', media_exchange', routing_key='media.video')
|
|
|
+ Queue('images', media_exchange', routing_key='media.image')
|
|
|
+ )
|
|
|
+ CELERY_DEFAULT_QUEUE = 'default'
|
|
|
+ CELERY_DEFAULT_EXCHANGE = 'default'
|
|
|
+ CELERY_DEFAULT_ROUTING_KEY = 'default'
|
|
|
|
|
|
Here, the :setting:`CELERY_DEFAULT_QUEUE` will be used to route tasks that
|
|
|
doesn't have an explicit route.
|
|
@@ -503,23 +495,23 @@ All you need to define a new router is to create a class with a
|
|
|
class MyRouter(object):
|
|
|
|
|
|
def route_for_task(self, task, args=None, kwargs=None):
|
|
|
- if task == "myapp.tasks.compress_video":
|
|
|
- return {"exchange": "video",
|
|
|
- "exchange_type": "topic",
|
|
|
- "routing_key": "video.compress"}
|
|
|
+ if task == 'myapp.tasks.compress_video':
|
|
|
+ return {'exchange': 'video',
|
|
|
+ 'exchange_type': 'topic',
|
|
|
+ 'routing_key': 'video.compress'}
|
|
|
return None
|
|
|
|
|
|
If you return the ``queue`` key, it will expand with the defined settings of
|
|
|
that queue in :setting:`CELERY_QUEUES`::
|
|
|
|
|
|
- {"queue": "video", "routing_key": "video.compress"}
|
|
|
+ {'queue': 'video', 'routing_key': 'video.compress'}
|
|
|
|
|
|
becomes -->
|
|
|
|
|
|
- {"queue": "video",
|
|
|
- "exchange": "video",
|
|
|
- "exchange_type": "topic",
|
|
|
- "routing_key": "video.compress"}
|
|
|
+ {'queue': 'video',
|
|
|
+ 'exchange': 'video',
|
|
|
+ 'exchange_type': 'topic',
|
|
|
+ 'routing_key': 'video.compress'}
|
|
|
|
|
|
|
|
|
You install router classes by adding them to the :setting:`CELERY_ROUTES`
|
|
@@ -529,7 +521,7 @@ setting::
|
|
|
|
|
|
Router classes can also be added by name::
|
|
|
|
|
|
- CELERY_ROUTES = ("myapp.routers.MyRouter", )
|
|
|
+ CELERY_ROUTES = ('myapp.routers.MyRouter', )
|
|
|
|
|
|
|
|
|
For simple task name -> route mappings like the router example above,
|
|
@@ -538,9 +530,9 @@ same behavior:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- CELERY_ROUTES = ({"myapp.tasks.compress_video": {
|
|
|
- "queue": "video",
|
|
|
- "routing_key": "video.compress"
|
|
|
+ CELERY_ROUTES = ({'myapp.tasks.compress_video': {
|
|
|
+ 'queue': 'video',
|
|
|
+ 'routing_key': 'video.compress'
|
|
|
}}, )
|
|
|
|
|
|
The routers will then be traversed in order, it will stop at the first router
|
|
@@ -556,9 +548,9 @@ Here is an example exchange ``bcast`` that uses this:
|
|
|
|
|
|
from kombu.common import Broadcast
|
|
|
|
|
|
- CELERY_QUEUES = (Broadcast("broadcast_tasks"), )
|
|
|
+ CELERY_QUEUES = (Broadcast('broadcast_tasks'), )
|
|
|
|
|
|
- CELERY_ROUTES = {"tasks.reload_cache": "broadcast_tasks"}
|
|
|
+ CELERY_ROUTES = {'tasks.reload_cache': 'broadcast_tasks'}
|
|
|
|
|
|
|
|
|
Now the ``tasks.reload_tasks`` task will be sent to every
|