|
@@ -422,7 +422,7 @@ Also, a common pattern is to use callback tasks:
|
|
|
def add(x, y, callback=None):
|
|
|
result = x + y
|
|
|
if callback:
|
|
|
- callback.delay(result)
|
|
|
+ subtask(callback).delay(result)
|
|
|
return result
|
|
|
|
|
|
|
|
@@ -431,8 +431,11 @@ Also, a common pattern is to use callback tasks:
|
|
|
logger = log_result.get_logger(**kwargs)
|
|
|
logger.info("log_result got: %s" % (result, ))
|
|
|
|
|
|
+Invocation::
|
|
|
|
|
|
- >>> add.delay(2, 2, callback=log_result)
|
|
|
+ >>> add.delay(2, 2, callback=log_result.subtask())
|
|
|
+
|
|
|
+See :doc:`userguide/tasksets` for more information.
|
|
|
|
|
|
Can I cancel the execution of a task?
|
|
|
-------------------------------------
|
|
@@ -468,120 +471,7 @@ Can I send some tasks to only some servers?
|
|
|
**Answer:** Yes. You can route tasks to an arbitrary server using AMQP,
|
|
|
and a worker can bind to as many queues as it wants.
|
|
|
|
|
|
-Say you have two servers, ``x``, and ``y`` that handles regular tasks,
|
|
|
-and one server ``z``, that only handles feed related tasks, you can use this
|
|
|
-configuration:
|
|
|
-
|
|
|
-* Servers ``x`` and ``y``: settings.py:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- CELERY_DEFAULT_QUEUE = "regular_tasks"
|
|
|
- CELERY_QUEUES = {
|
|
|
- "regular_tasks": {
|
|
|
- "binding_key": "task.#",
|
|
|
- },
|
|
|
- }
|
|
|
- CELERY_DEFAULT_EXCHANGE = "tasks"
|
|
|
- CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
|
|
|
- CELERY_DEFAULT_ROUTING_KEY = "task.regular"
|
|
|
-
|
|
|
-* Server ``z``: settings.py:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- CELERY_DEFAULT_QUEUE = "feed_tasks"
|
|
|
- CELERY_QUEUES = {
|
|
|
- "feed_tasks": {
|
|
|
- "binding_key": "feed.#",
|
|
|
- },
|
|
|
- }
|
|
|
- CELERY_DEFAULT_EXCHANGE = "tasks"
|
|
|
- CELERY_DEFAULT_ROUTING_KEY = "task.regular"
|
|
|
- CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
|
|
|
-
|
|
|
-``CELERY_QUEUES`` is a map of queue names and their exchange/type/binding_key,
|
|
|
-if you don't set exchange or exchange type, they will be taken from the
|
|
|
-``CELERY_DEFAULT_EXCHANGE``/``CELERY_DEFAULT_EXCHANGE_TYPE`` settings.
|
|
|
-
|
|
|
-Now to make a Task run on the ``z`` server you need to set its
|
|
|
-``routing_key`` attribute so it starts with the words ``"task.feed."``:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- from feedaggregator.models import Feed
|
|
|
- from celery.decorators import task
|
|
|
-
|
|
|
- @task(routing_key="feed.importer")
|
|
|
- def import_feed(feed_url):
|
|
|
- Feed.objects.import_feed(feed_url)
|
|
|
-
|
|
|
-or if subclassing the ``Task`` class directly:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- class FeedImportTask(Task):
|
|
|
- routing_key = "feed.importer"
|
|
|
-
|
|
|
- def run(self, feed_url):
|
|
|
- Feed.objects.import_feed(feed_url)
|
|
|
-
|
|
|
-
|
|
|
-You can also override this using the ``routing_key`` argument to
|
|
|
-:func:`celery.task.apply_async`:
|
|
|
-
|
|
|
- >>> from myapp.tasks import RefreshFeedTask
|
|
|
- >>> RefreshFeedTask.apply_async(args=["http://cnn.com/rss"],
|
|
|
- ... routing_key="feed.importer")
|
|
|
-
|
|
|
-
|
|
|
- If you want, you can even have your feed processing worker handle regular
|
|
|
- tasks as well, maybe in times when there's a lot of work to do.
|
|
|
- Just add a new queue to server ``z``'s ``CELERY_QUEUES``:
|
|
|
-
|
|
|
- .. code-block:: python
|
|
|
-
|
|
|
- CELERY_QUEUES = {
|
|
|
- "feed_tasks": {
|
|
|
- "binding_key": "feed.#",
|
|
|
- },
|
|
|
- "regular_tasks": {
|
|
|
- "binding_key": "task.#",
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
-Since the default exchange is ``tasks``, they will both use the same
|
|
|
-exchange.
|
|
|
-
|
|
|
-If you have another queue but on another exchange you want to add,
|
|
|
-just specify a custom exchange and exchange type:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- CELERY_QUEUES = {
|
|
|
- "feed_tasks": {
|
|
|
- "binding_key": "feed.#",
|
|
|
- },
|
|
|
- "regular_tasks": {
|
|
|
- "binding_key": "task.#",
|
|
|
- }
|
|
|
- "image_tasks": {
|
|
|
- "binding_key": "image.compress",
|
|
|
- "exchange": "mediatasks",
|
|
|
- "exchange_type": "direct",
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
-If you're confused about these terms, you should read up on AMQP and RabbitMQ.
|
|
|
-`Rabbits and Warrens`_ is an excellent blog post describing queues and
|
|
|
-exchanges. There's also AMQP in 10 minutes*: `Flexible Routing Model`_,
|
|
|
-and `Standard Exchange Types`_. For users of RabbitMQ the `RabbitMQ FAQ`_
|
|
|
-could also be useful as a source of information.
|
|
|
-
|
|
|
-.. _`Rabbits and Warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
|
|
|
-.. _`Flexible Routing Model`: http://bit.ly/95XFO1
|
|
|
-.. _`Standard Exchange Types`: http://bit.ly/EEWca
|
|
|
-.. _`RabbitMQ FAQ`: http://www.rabbitmq.com/faq.html
|
|
|
+See :doc:`userguide/routing` for more information.
|
|
|
|
|
|
Can I change the interval of a periodic task at runtime?
|
|
|
--------------------------------------------------------
|