|
@@ -14,7 +14,7 @@ Basics
|
|
|
======
|
|
|
|
|
|
Executing tasks is done with :meth:`~celery.task.Base.Task.apply_async`,
|
|
|
-and its shortcut: :meth:`~celery.task.Base.Task.delay`.
|
|
|
+and the shortcut: :meth:`~celery.task.Base.Task.delay`.
|
|
|
|
|
|
``delay`` is simple and convenient, as it looks like calling a regular
|
|
|
function:
|
|
@@ -23,7 +23,7 @@ function:
|
|
|
|
|
|
Task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
|
|
|
|
|
|
-The same thing using ``apply_async`` is written like this:
|
|
|
+The same using ``apply_async`` is written like this:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -32,12 +32,12 @@ The same thing using ``apply_async`` is written like this:
|
|
|
|
|
|
While ``delay`` is convenient, it doesn't give you as much control as using
|
|
|
``apply_async``. With ``apply_async`` you can override the execution options
|
|
|
-available as attributes on the ``Task`` class: ``routing_key``, ``exchange``,
|
|
|
-``immediate``, ``mandatory``, ``priority``, and ``serializer``.
|
|
|
-In addition you can set a countdown/eta, or provide a custom broker connection.
|
|
|
+available as attributes on the ``Task`` class (see :ref:`task-options`).
|
|
|
+In addition you can set countdown/eta, task expiry, provide a custom broker
|
|
|
+connection and more.
|
|
|
|
|
|
-Let's go over these in more detail. The following examples use this simple
|
|
|
-task, which adds together two numbers:
|
|
|
+Let's go over these in more detail. All the examples uses a simple task,
|
|
|
+called ``add``, taking two positional arguments and returning the sum:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -45,7 +45,6 @@ task, which adds together two numbers:
|
|
|
def add(x, y):
|
|
|
return x + y
|
|
|
|
|
|
-
|
|
|
.. note::
|
|
|
|
|
|
You can also execute a task by name using
|
|
@@ -63,8 +62,8 @@ ETA and countdown
|
|
|
=================
|
|
|
|
|
|
The ETA (estimated time of arrival) lets you set a specific date and time that
|
|
|
-is the earliest time at which your task will execute. ``countdown`` is
|
|
|
-a shortcut to set this by seconds in the future.
|
|
|
+is the earliest time at which your task will be executed. ``countdown`` is
|
|
|
+a shortcut to set eta by seconds into the future.
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -72,22 +71,24 @@ a shortcut to set this by seconds in the future.
|
|
|
>>> result.get() # this takes at least 3 seconds to return
|
|
|
20
|
|
|
|
|
|
-Note that your task is guaranteed to be executed at some time *after* the
|
|
|
-specified date and time has passed, but not necessarily at that exact time.
|
|
|
+The task is guaranteed to be executed at some time *after* the
|
|
|
+specified date and time, but not necessarily at that exact time.
|
|
|
+Possible reasons for broken deadlines may include many items waiting
|
|
|
+in the queue, or heavy network latency. To make sure your tasks
|
|
|
+are executed in a timely manner you should monitor queue lenghts. Use
|
|
|
+Munin, or similar tools, to receive alerts, so appropiate action can be
|
|
|
+taken to ease the workload. See :ref:`monitoring-munin`.
|
|
|
|
|
|
-While ``countdown`` is an integer, ``eta`` must be a :class:`~datetime.datetime` object,
|
|
|
-specifying an exact date and time in the future. This is good if you already
|
|
|
-have a :class:`~datetime.datetime` object and need to modify it with a
|
|
|
-:class:`~datetime.timedelta`, or when using time in seconds is not very readable.
|
|
|
+While ``countdown`` is an integer, ``eta`` must be a :class:`~datetime.datetime`
|
|
|
+object, specifying an exact date and time (including millisecond precision,
|
|
|
+and timezone information):
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
- from datetime import datetime, timedelta
|
|
|
+ >>> from datetime import datetime, timedelta
|
|
|
|
|
|
- def add_tomorrow(username):
|
|
|
- """Add this tomorrow."""
|
|
|
- tomorrow = datetime.now() + timedelta(days=1)
|
|
|
- add.apply_async(args=[10, 10], eta=tomorrow)
|
|
|
+ >>> tomorrow = datetime.now() + timedelta(days=1)
|
|
|
+ >>> add.apply_async(args=[10, 10], eta=tomorrow)
|
|
|
|
|
|
.. _executing-expiration:
|
|
|
|
|
@@ -96,7 +97,9 @@ Expiration
|
|
|
|
|
|
The ``expires`` argument defines an optional expiry time,
|
|
|
either as seconds after task publish, or a specific date and time using
|
|
|
-:class:~datetime.datetime`.
|
|
|
+:class:~datetime.datetime`:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
|
|
|
>>> # Task expires after one minute from now.
|
|
|
>>> add.apply_async(args=[10, 10], expires=60)
|
|
@@ -107,7 +110,7 @@ either as seconds after task publish, or a specific date and time using
|
|
|
... expires=datetime.now() + timedelta(days=1)
|
|
|
|
|
|
|
|
|
-When a worker receives a task that has been expired it will mark
|
|
|
+When a worker receives an expired task it will mark
|
|
|
the task as :state:`REVOKED` (:exc:`~celery.exceptions.TaskRevokedError`).
|
|
|
|
|
|
.. _executing-serializers:
|
|
@@ -115,27 +118,76 @@ the task as :state:`REVOKED` (:exc:`~celery.exceptions.TaskRevokedError`).
|
|
|
Serializers
|
|
|
===========
|
|
|
|
|
|
-Data passed between celery and workers has to be serialized to be
|
|
|
-transferred. The default serializer is :mod:`pickle`, but you can
|
|
|
-change this for each
|
|
|
-task. There is built-in support for using :mod:`pickle`, ``JSON``, ``YAML``
|
|
|
-and ``msgpack``. You can also add your own custom serializers by registering
|
|
|
-them into the Carrot serializer registry.
|
|
|
+Data transferred between clients and workers needs to be serialized.
|
|
|
+The default serializer is :mod:`pickle`, but you can
|
|
|
+change this globally or for each individual task.
|
|
|
+There is built-in support for :mod:`pickle`, ``JSON``, ``YAML``
|
|
|
+and ``msgpack``, and you can also add your own custom serializers by registering
|
|
|
+them into the Carrot serializer registry (see
|
|
|
+`Carrot: Serialization of Data`_).
|
|
|
+
|
|
|
+.. _`Carrot: Serialization of Data`:
|
|
|
+ http://packages.python.org/carrot/introduction.html#serialization-of-data
|
|
|
+
|
|
|
+Each option has its advantages and disadvantages.
|
|
|
+
|
|
|
+json -- JSON is supported in many programming languages, is now
|
|
|
+ a standard part of Python (since 2.6), and is fairly fast to decode
|
|
|
+ using the modern Python libraries such as :mod:`cjson` or :mod:`simplejson`.
|
|
|
+
|
|
|
+ The primary disadvantage to JSON is that it limits you to the following
|
|
|
+ data types: strings, unicode, floats, boolean, dictionaries, and lists.
|
|
|
+ Decimals and dates are notably missing.
|
|
|
+
|
|
|
+ Also, binary data will be transferred using base64 encoding, which will
|
|
|
+ cause the transferred data to be around 34% larger than an encoding which
|
|
|
+ supports native binary types.
|
|
|
+
|
|
|
+ However, if your data fits inside the above constraints and you need
|
|
|
+ cross-language support, the default setting of JSON is probably your
|
|
|
+ best choice.
|
|
|
+
|
|
|
+ See http://json.org for more information.
|
|
|
+
|
|
|
+pickle -- If you have no desire to support any language other than
|
|
|
+ Python, then using the pickle encoding will gain you the support of
|
|
|
+ all built-in Python data types (except class instances), smaller
|
|
|
+ messages when sending binary files, and a slight speedup over JSON
|
|
|
+ processing.
|
|
|
+
|
|
|
+ See http://docs.python.org/library/pickle.html for more information.
|
|
|
+
|
|
|
+yaml -- YAML has many of the same characteristics as json,
|
|
|
+ except that it natively supports more data types (including dates,
|
|
|
+ recursive references, etc.)
|
|
|
+
|
|
|
+ However, the Python libraries for YAML are a good bit slower than the
|
|
|
+ libraries for JSON.
|
|
|
|
|
|
-The default serializer (pickle) supports Python objects, like ``datetime`` and
|
|
|
-any custom datatypes you define yourself. But since pickle has poor support
|
|
|
-outside of the Python language, you need to choose another serializer if you
|
|
|
-need to communicate with other languages. In that case, ``JSON`` is a very
|
|
|
-popular choice.
|
|
|
+ If you need a more expressive set of data types and need to maintain
|
|
|
+ cross-language compatibility, then YAML may be a better fit than the above.
|
|
|
|
|
|
-The serialization method is sent with the message, so the worker knows how to
|
|
|
-deserialize any task. Of course, if you use a custom serializer, this must
|
|
|
-also be registered in the worker.
|
|
|
+ See http://yaml.org/ for more information.
|
|
|
|
|
|
-When sending a task the serialization method is taken from the following
|
|
|
-places in order: The ``serializer`` argument to ``apply_async``, the
|
|
|
-Task's ``serializer`` attribute, and finally the global default
|
|
|
-:setting:`CELERY_TASK_SERIALIZER` configuration directive.
|
|
|
+msgpack -- msgpack is a binary serialization format that is closer to JSON
|
|
|
+ in features. It is very young however, and support should be considered
|
|
|
+ experimental at this point.
|
|
|
+
|
|
|
+ See http://msgpack.org/ for more information.
|
|
|
+
|
|
|
+The encoding used is available as a message header, so the worker knows how to
|
|
|
+deserialize any task. If you use a custom serializer, this serializer must
|
|
|
+be available for the worker.
|
|
|
+
|
|
|
+The client uses the following order to decide which serializer
|
|
|
+to use when sending a task:
|
|
|
+
|
|
|
+ 1. The ``serializer`` argument to ``apply_async``
|
|
|
+ 2. The tasks ``serializer`` attribute
|
|
|
+ 3. The default :setting:`CELERY_TASK_SERIALIZER` setting.
|
|
|
+
|
|
|
+
|
|
|
+*Using the ``serializer`` argument to ``apply_async``:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -146,13 +198,13 @@ Task's ``serializer`` attribute, and finally the global default
|
|
|
Connections and connection timeouts.
|
|
|
====================================
|
|
|
|
|
|
-Currently there is no support for broker connection pools in celery,
|
|
|
-so this is something you need to be aware of when sending more than
|
|
|
-one task at a time, as ``apply_async``/``delay`` establishes and
|
|
|
-closes a connection every time.
|
|
|
+Currently there is no support for broker connection pools, so
|
|
|
+``apply_async`` establishes and closes a new connection every time
|
|
|
+it is called. This is something you need to be aware of when sending
|
|
|
+more than one task at a time.
|
|
|
|
|
|
-If you need to send more than one task at the same time, it's a good idea to
|
|
|
-establish the connection yourself and pass it to ``apply_async``:
|
|
|
+You handle the connection manually by creating a
|
|
|
+publisher::
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -171,9 +223,15 @@ establish the connection yourself and pass it to ``apply_async``:
|
|
|
print([res.get() for res in results])
|
|
|
|
|
|
|
|
|
-The connection timeout is the number of seconds to wait before we give up
|
|
|
-establishing the connection. You can set this with the ``connect_timeout``
|
|
|
-argument to ``apply_async``:
|
|
|
+.. note::
|
|
|
+
|
|
|
+ This particularly example is better expressed as a task set.
|
|
|
+ See :ref:`sets-taskset`. Tasksets already reuses connections.
|
|
|
+
|
|
|
+
|
|
|
+The connection timeout is the number of seconds to wait before giving up
|
|
|
+on establishing the connection. You can set this by using the
|
|
|
+``connect_timeout`` argument to ``apply_async``:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -191,20 +249,20 @@ Routing options
|
|
|
===============
|
|
|
|
|
|
Celery uses the AMQP routing mechanisms to route tasks to different workers.
|
|
|
-You can route tasks using the following entities: exchange, queue and routing key.
|
|
|
|
|
|
Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
|
|
|
routing key. Let's look at an example:
|
|
|
|
|
|
-Our application has a lot of tasks, some process video, others process images,
|
|
|
-and some gather collective intelligence about users. Some of these have
|
|
|
-higher priority than others so we want to make sure the high priority tasks
|
|
|
-get sent to powerful machines, while low priority tasks are sent to dedicated
|
|
|
-machines that can handle these at their own pace.
|
|
|
+Let's pretend we have an application with lot of different tasks: some
|
|
|
+process video, others process images, and some gather collective intelligence
|
|
|
+about its users. Some of these tasks are more important, so we want to make
|
|
|
+sure the high priority tasks get sent to dedicated nodes.
|
|
|
|
|
|
-For the sake of example we have only one exchange called ``tasks``.
|
|
|
-There are different types of exchanges that matches the routing key in
|
|
|
-different ways, the exchange types are:
|
|
|
+For the sake of this example we have a single exchange called ``tasks``.
|
|
|
+There are different types of exchanges, each type interpreting the routing
|
|
|
+key in different ways, implementing different messaging scenarios.
|
|
|
+
|
|
|
+The most common types used with Celery are ``direct`` and ``topic``.
|
|
|
|
|
|
* direct
|
|
|
|
|
@@ -212,17 +270,15 @@ different ways, the exchange types are:
|
|
|
|
|
|
* topic
|
|
|
|
|
|
- In the topic exchange the routing key is made up of words separated by dots (``.``).
|
|
|
- Words can be matched by the wild cards ``*`` and ``#``, where ``*`` matches one
|
|
|
- exact word, and ``#`` matches one or many.
|
|
|
+ In the topic exchange the routing key is made up of words separated by
|
|
|
+ dots (``.``). Words can be matched by the wild cards ``*`` and ``#``,
|
|
|
+ where ``*`` matches one exact word, and ``#`` matches one or many words.
|
|
|
|
|
|
For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and
|
|
|
``euro.stock.db`` but not ``stock.nasdaq``.
|
|
|
|
|
|
-(there are also other exchange types, but these are not used by celery)
|
|
|
-
|
|
|
-So, we create three queues, ``video``, ``image`` and ``lowpri`` that bind to
|
|
|
-our ``tasks`` exchange. For the queues we use the following binding keys::
|
|
|
+We create three queues, ``video``, ``image`` and ``lowpri`` that binds to
|
|
|
+the ``tasks`` exchange. For the queues we use the following binding keys::
|
|
|
|
|
|
video: video.#
|
|
|
image: image.#
|
|
@@ -245,7 +301,7 @@ listen to different queues:
|
|
|
|
|
|
|
|
|
Later, if the crop task is consuming a lot of resources,
|
|
|
-we can bind some new workers to handle just the ``"image.crop"`` task,
|
|
|
+we can bind new workers to handle just the ``"image.crop"`` task,
|
|
|
by creating a new queue that binds to ``"image.crop``".
|
|
|
|
|
|
.. seealso::
|
|
@@ -257,20 +313,20 @@ by creating a new queue that binds to ``"image.crop``".
|
|
|
AMQP options
|
|
|
============
|
|
|
|
|
|
-.. warning::
|
|
|
- The ``mandatory`` and ``immediate`` flags are not supported by
|
|
|
- :mod:`amqplib` at this point.
|
|
|
-
|
|
|
* mandatory
|
|
|
|
|
|
-This sets the delivery to be mandatory. An exception will be raised
|
|
|
+This sets the delivery to be mandatory. An exception will be raised
|
|
|
if there are no running workers able to take on the task.
|
|
|
|
|
|
+Not supported by :mod:`amqplib`.
|
|
|
+
|
|
|
* immediate
|
|
|
|
|
|
Request immediate delivery. Will raise an exception
|
|
|
if the task cannot be routed to a worker immediately.
|
|
|
|
|
|
+Not supported by :mod:`amqplib`.
|
|
|
+
|
|
|
* priority
|
|
|
|
|
|
A number between ``0`` and ``9``, where ``0`` is the highest priority.
|