123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- =================
- Executing Tasks
- =================
- .. contents::
- :local:
- Basics
- ======
- Executing tasks is done with :meth:`~celery.task.Base.Task.apply_async`,
- and its shortcut: :meth:`~celery.task.Base.Task.delay`.
- ``delay`` is simple and convenient, as it looks like calling a regular
- function:
- .. code-block:: python
- Task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
- The same thing using ``apply_async`` is written like this:
- .. code-block:: python
- Task.apply_async(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
- You can also execute a task by name using :func:`~celery.execute.send_task`,
- if you don't have access to the task's class::
- >>> from celery.execute import send_task
- >>> result = send_task("tasks.add", [2, 2])
- >>> result.get()
- 4
- While ``delay`` is convenient, it doesn't give you as much control as using ``apply_async``.
- With ``apply_async`` you can override the execution options available as attributes on
- the ``Task`` class: ``routing_key``, ``exchange``, ``immediate``, ``mandatory``,
- ``priority``, and ``serializer``. In addition you can set a countdown/eta, or provide
- a custom broker connection.
- Let's go over these in more detail. The following examples use this simple
- task, which adds together two numbers:
- .. code-block:: python
- @task
- def add(x, y):
- return x + y
- ETA and countdown
- =================
- The ETA (estimated time of arrival) lets you set a specific date and time that
- is the earliest time at which your task will execute. ``countdown`` is
- a shortcut to set this by seconds in the future.
- .. code-block:: python
- >>> result = add.apply_async(args=[10, 10], countdown=3)
- >>> result.get() # this takes at least 3 seconds to return
- 20
- Note that your task is guaranteed to be executed at some time *after* the
- specified date and time has passed, but not necessarily at that exact time.
- While ``countdown`` is an integer, ``eta`` must be a :class:`~datetime.datetime` object,
- specifying an exact date and time in the future. This is good if you already
- have a :class:`~datetime.datetime`` object and need to modify it with a
- :class:`~datetime.timedelta`, or when using time in seconds is not very readable.
- .. code-block:: python
- from datetime import datetime, timedelta
- def quickban(username):
- """Ban user for 24 hours."""
- ban(username)
- tomorrow = datetime.now() + timedelta(days=1)
- UnbanTask.apply_async(args=[username], eta=tomorrow)
- Serializers
- ===========
- Data passed between celery and workers has to be serialized to be
- transferred. The default serializer is :mod:`pickle`, but you can
- change this for each
- task. There is built-in support for using :mod:`pickle`, ``JSON`` and ``YAML``,
- and you can add your own custom serializers by registering them into the
- carrot serializer registry.
- The default serializer (pickle) supports Python objects, like ``datetime`` and
- any custom datatypes you define yourself. But since pickle has poor support
- outside of the Python language, you need to choose another serializer if you
- need to communicate with other languages. In that case, ``JSON`` is a very
- popular choice.
- The serialization method is sent with the message, so the worker knows how to
- deserialize any task. Of course, if you use a custom serializer, this must
- also be registered in the worker.
- When sending a task the serialization method is taken from the following
- places in order: The ``serializer`` argument to ``apply_async``, the
- Task's ``serializer`` attribute, and finally the global default ``CELERY_SERIALIZER``
- configuration directive.
- .. code-block:: python
- >>> add.apply_async(args=[10, 10], serializer="json")
- Connections and connection timeouts.
- ====================================
- Currently there is no support for broker connection pools in celery,
- so this is something you need to be aware of when sending more than
- one task at a time, as ``apply_async``/``delay`` establishes and
- closes a connection every time.
- If you need to send more than one task at the same time, it's a good idea to
- establish the connection yourself and pass it to ``apply_async``:
- .. code-block:: python
- numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
- results = []
- publisher = add.get_publisher()
- try:
- for args in numbers:
- res = add.apply_async(args=args, publisher=publisher)
- results.append(res)
- finally:
- publisher.close()
- publisher.connection.close()
- print([res.get() for res in results])
- The connection timeout is the number of seconds to wait before we give up
- establishing the connection. You can set this with the ``connect_timeout``
- argument to ``apply_async``:
- .. code-block:: python
- add.apply_async([10, 10], connect_timeout=3)
- Or if you handle the connection manually:
- .. code-block:: python
- publisher = add.get_publisher(connect_timeout=3)
- Routing options
- ===============
- Celery uses the AMQP routing mechanisms to route tasks to different workers.
- You can route tasks using the following entities: exchange, queue and routing key.
- Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
- routing key. Let's look at an example:
- Our application has a lot of tasks, some process video, others process images,
- and some gather collective intelligence about users. Some of these have
- higher priority than others so we want to make sure the high priority tasks
- get sent to powerful machines, while low priority tasks are sent to dedicated
- machines that can handle these at their own pace.
- For the sake of example we have only one exchange called ``tasks``.
- There are different types of exchanges that matches the routing key in
- different ways, the exchange types are:
- * direct
- Matches the routing key exactly.
- * topic
- In the topic exchange the routing key is made up of words separated by dots (``.``).
- Words can be matched by the wild cards ``*`` and ``#``, where ``*`` matches one
- exact word, and ``#`` matches one or many.
- For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and
- ``euro.stock.db`` but not ``stock.nasdaq``.
- (there are also other exchange types, but these are not used by celery)
- So, we create three queues, ``video``, ``image`` and ``lowpri`` that bind to
- our ``tasks`` exchange. For the queues we use the following binding keys::
- video: video.#
- image: image.#
- lowpri: misc.#
- Now we can send our tasks to different worker machines, by making the workers
- listen to different queues:
- .. code-block:: python
- >>> CompressVideoTask.apply_async(args=[filename],
- ... routing_key="video.compress")
- >>> ImageRotateTask.apply_async(args=[filename, 360],
- ... routing_key="image.rotate")
- >>> ImageCropTask.apply_async(args=[filename, selection],
- ... routing_key="image.crop")
- >>> UpdateReccomendationsTask.apply_async(routing_key="misc.recommend")
- Later, if the crop task is consuming a lot of resources,
- we can bind some new workers to handle just the ``"image.crop"`` task,
- by creating a new queue that binds to ``"image.crop``".
- AMQP options
- ============
- **NOTE** The ``mandatory`` and ``immediate`` flags are not supported by
- ``amqplib`` at this point.
- * mandatory
- This sets the delivery to be mandatory. An exception will be raised
- if there are no running workers able to take on the task.
- * immediate
- Request immediate delivery. Will raise an exception
- if the task cannot be routed to a worker immediately.
- * priority
- A number between ``0`` and ``9``, where ``0`` is the highest priority.
- Note that RabbitMQ does not implement AMQP priorities, and maybe your broker
- does not either, consult your broker's documentation for more
- information.
|