|
@@ -0,0 +1,238 @@
|
|
|
+=================
|
|
|
+ Executing Tasks
|
|
|
+=================
|
|
|
+
|
|
|
+Executing tasks is done with ``apply_async``, and it's shortcut ``delay`.
|
|
|
+
|
|
|
+``delay`` is simple and convenient, as it looks like calling a regular
|
|
|
+function:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ MyTask.delay(arg1, arg2, kwarg1="x", kwarg2="y")
|
|
|
+
|
|
|
+The same thing using ``apply_async`` is written like this:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ AddTask.delay(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
|
|
|
+
|
|
|
+But ``delay`` doesn't give you as much control as using ``apply_async``.
|
|
|
+With ``apply_async`` you can override the execution options available as attributes on
|
|
|
+the ``Task`` class; ``routing_key``, ``exchange``, ``immediate``, ``mandatory``,
|
|
|
+``priority``, and ``serializer``. In addition you can set a countdown or an eta, provide
|
|
|
+a custom broker connection or change the broker connection timeout.
|
|
|
+
|
|
|
+Let's go over these in more detail. The following examples uses this simple
|
|
|
+task, used to add two numbers:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ class AddTask(Task):
|
|
|
+
|
|
|
+ def run(self, x, y):
|
|
|
+ return x + y
|
|
|
+
|
|
|
+
|
|
|
+ETA and countdown
|
|
|
+-----------------
|
|
|
+
|
|
|
+The ETA (estimated time of arrival) lets you set a specific date and time for
|
|
|
+when after, your task should execute. ``countdown`` is a shortcut to set this
|
|
|
+by seconds into the future.
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ >>> result = AddTask.apply_async(args=[10, 10], countdown=3)
|
|
|
+ >>> result.get() # this takes at least 3 seconds to return
|
|
|
+ 20
|
|
|
+
|
|
|
+Note that your task is guaranteed to be executed at some time *after* the
|
|
|
+specified date and time has passed, but not necessarily at that exact time.
|
|
|
+
|
|
|
+While ``countdown`` is an integer, ``eta`` must be a ``datetime`` object,
|
|
|
+specifying an exact date and time in the future. This is good if you already
|
|
|
+have a ``datatime`` object and need to modify it with a ``timedelta``, or when
|
|
|
+specifing the time in seconds is not very readable.
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from datetime import datetime, timedelta
|
|
|
+
|
|
|
+ def quickban(username):
|
|
|
+ """Ban user for 24 hours."""
|
|
|
+ ban(username)
|
|
|
+ tomorrow = datetime.now() + timedelta(days=1)
|
|
|
+ UnbanTask.apply_async(args=[username], eta=tomorrow)
|
|
|
+
|
|
|
+
|
|
|
+Serializer
|
|
|
+----------
|
|
|
+
|
|
|
+The default serializer used is :mod:`pickle`, but you can change this default by
|
|
|
+changing the ``CELERY_SERIALIZER`` configuration directive. There is built-in
|
|
|
+support for using ``pickle``, ``JSON`` and ``YAML``, and you can add your own
|
|
|
+custom serializers by registering them in the carrot serializer registry.
|
|
|
+
|
|
|
+You don't have to do any work on the worker receiving the task, the
|
|
|
+serialization method is sent with the message so the worker knows how to
|
|
|
+deserialize any task (of course, if you use a custom serializer, this must also be
|
|
|
+registered in the worker.)
|
|
|
+
|
|
|
+When sending a task the serializition method is taken from the following
|
|
|
+places in order: The ``serializer`` argument to ``apply_async``, the
|
|
|
+Task's ``serializer`` attribute, and finally the global default ``CELERY_SERIALIZER``
|
|
|
+configuration directive.
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ >>> AddTask.apply_async(args=[10, 10], serializer="JSON")
|
|
|
+
|
|
|
+Connections and connection timeouts.
|
|
|
+------------------------------------
|
|
|
+
|
|
|
+Currently there is no support for broker connection pooling in celery, but
|
|
|
+this might change in the future. This is something you need to be aware of
|
|
|
+when sending more than one task at a time, as ``apply_async`` establishes and
|
|
|
+closes a connection every time.
|
|
|
+
|
|
|
+If you need to send more than one task at the same time, it's a good idea to
|
|
|
+establish the connectin yourself and pass it to ``apply_async``:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from carrot.connection import DjangoBrokerConnection
|
|
|
+
|
|
|
+ numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
|
|
|
+
|
|
|
+ results = []
|
|
|
+ connection = DjangoBrokerConnection()
|
|
|
+ try:
|
|
|
+ for args in numbers:
|
|
|
+ res = AddTask.apply_async(args=args, connection=connection)
|
|
|
+ results.append(res)
|
|
|
+ finally:
|
|
|
+ connection.close()
|
|
|
+
|
|
|
+ print([res.get() for res in results])
|
|
|
+
|
|
|
+
|
|
|
+In python 2.5 and above you can use the ``with`` statement with carrot
|
|
|
+connections:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from __future__ import with_statement
|
|
|
+ from carrot.connection import DjangoBrokerConnection
|
|
|
+
|
|
|
+ numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
|
|
|
+
|
|
|
+ results = []
|
|
|
+ with DjangoBrokerConnection() as connection:
|
|
|
+ for args in numbers:
|
|
|
+ res = AddTask.apply_async(args=args, connection=connection)
|
|
|
+ results.append(res)
|
|
|
+
|
|
|
+ print([res.get() for res in results])
|
|
|
+
|
|
|
+
|
|
|
+*NOTE* TaskSets already re-uses the same connection, but not if you need to
|
|
|
+execute more than one TaskSet.
|
|
|
+
|
|
|
+The connection timeout is the number of seconds to wait before we give up on
|
|
|
+establishing the connection, you can set this with the ``connect_timeout``
|
|
|
+argument to ``apply_async``:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ AddTask.apply_async([10, 10], connect_timeout=3)
|
|
|
+
|
|
|
+or if you handle your connection manually by using the connection objects
|
|
|
+``timeout`` argument:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ connection = DjangoAMQPConnection(timeout=3)
|
|
|
+
|
|
|
+
|
|
|
+Routing options
|
|
|
+---------------
|
|
|
+
|
|
|
+Celery uses the AMQP routing mechanisms to route tasks to different workers.
|
|
|
+You can route tasks using the following entitites: exchange, queue and routing key.
|
|
|
+
|
|
|
+Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
|
|
|
+routing key. Let's look at an example:
|
|
|
+
|
|
|
+Our application has a lot of tasks, some process video, others process images,
|
|
|
+and some gathers collective intelligence about users. Some of these have
|
|
|
+higher priority than others so we want to make sure the high priority tasks
|
|
|
+get sent to powerful machines, while low priority tasks are sent to dedicated
|
|
|
+machines that can handle these at their own pace, uninterrupted.
|
|
|
+
|
|
|
+For the sake of example we have only one exchange called ``tasks``.
|
|
|
+There are different types of exchanges that matches the routing key in
|
|
|
+different ways, the exchange types are:
|
|
|
+
|
|
|
+* direct
|
|
|
+
|
|
|
+ Matches the routing key exactly.
|
|
|
+
|
|
|
+* topic
|
|
|
+
|
|
|
+ In the topic exchange the routing key is made up of words separated by dots (``.``).
|
|
|
+ Words can be matched by the wildcars ``*`` and ``#``, where ``*`` matches one
|
|
|
+ exact word, and ``#`` matches one or many.
|
|
|
+
|
|
|
+ For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and
|
|
|
+ ``euro.stock.db`` but not ``stock.nasdaq``.
|
|
|
+
|
|
|
+(there are also other exchange types, but these are not used by celery)
|
|
|
+
|
|
|
+So, we create three queues, ``video``, ``image`` and ``lowpri`` that binds to
|
|
|
+our ``tasks`` exchange, for the queues we use the following binding keys::
|
|
|
+
|
|
|
+ video: video.#
|
|
|
+ image: image.#
|
|
|
+ lowpri: misc.#
|
|
|
+
|
|
|
+Now we can send our tasks to different worker machines, by making the workers
|
|
|
+listen to different queues:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ >>> CompressVideoTask.apply_async(args=[filename],
|
|
|
+ ... routing_key="video.compress")
|
|
|
+
|
|
|
+ >>> ImageRotateTask.apply_async(args=[filename, 360],
|
|
|
+ routing_key="image.rotate")
|
|
|
+
|
|
|
+ >>> ImageCropTask.apply_async(args=[filename, selection],
|
|
|
+ routing_key="image.crop")
|
|
|
+ >>> UpdateReccomendationsTask.apply_async(routing_key="misc.recommend")
|
|
|
+
|
|
|
+
|
|
|
+Later, if suddenly the image crop task is consuming a lot of resources,
|
|
|
+we can bind some new workers to handle just the ``"image.crop"`` task,
|
|
|
+by creating a new queue that binds to ``"image.crop``".
|
|
|
+
|
|
|
+
|
|
|
+AMQP options
|
|
|
+------------
|
|
|
+
|
|
|
+* mandatory
|
|
|
+
|
|
|
+This sets the delivery to be mandatory. An exception will be raised
|
|
|
+if there are no running workers able to take on the task.
|
|
|
+
|
|
|
+* immediate
|
|
|
+
|
|
|
+Request immediate delivery. Will raise an exception
|
|
|
+if the task cannot be routed to a worker immediately.
|
|
|
+
|
|
|
+* priority
|
|
|
+
|
|
|
+A number between ``0`` and ``9``, where ``0`` is the highest priority.
|
|
|
+Note that RabbitMQ does not implement AMQP priorities, and maybe your broker
|
|
|
+does not either, please consult your brokers documentation for more
|
|
|
+information.
|