| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 | ================= Executing Tasks=================Executing tasks is done with ``apply_async``, and it's shortcut ``delay``.``delay`` is simple and convenient, as it looks like calling a regularfunction:.. code-block:: python    MyTask.delay(arg1, arg2, kwarg1="x", kwarg2="y")The same thing using ``apply_async`` is written like this:.. code-block:: python    AddTask.delay(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})But ``delay`` doesn't give you as much control as using ``apply_async``.With ``apply_async`` you can override the execution options available as attributes onthe ``Task`` class; ``routing_key``, ``exchange``, ``immediate``, ``mandatory``,``priority``, and ``serializer``.  In addition you can set a countdown or an eta, providea custom broker connection or change the broker connection timeout.Let's go over these in more detail. The following examples uses this simpletask, used to add two numbers:.. code-block:: python    class AddTask(Task):        def run(self, x, y):            return x + yETA and countdown-----------------The ETA (estimated time of arrival) lets you set a specific date and time forwhen after, your task should execute. ``countdown`` is a shortcut to set thisby seconds into the future... code-block:: python    >>> result = AddTask.apply_async(args=[10, 10], countdown=3)    >>> result.get()    # this takes at least 3 seconds to return    20Note that your task is guaranteed to be executed at some time *after* thespecified date and time has passed, but not necessarily at that exact time.While ``countdown`` is an integer, ``eta`` must be a ``datetime`` object,specifying an exact date and time in the future. This is good if you alreadyhave a ``datatime`` object and need to modify it with a ``timedelta``, or whenusing time in seconds is not very readable... code-block:: python    from datetime import datetime, timedelta    def quickban(username):        """Ban user for 24 hours."""        ban(username)        tomorrow = datetime.now() + timedelta(days=1)        UnbanTask.apply_async(args=[username], eta=tomorrow)Serializer----------The default serializer used is :mod:`pickle`, but you can change this default bychanging the ``CELERY_SERIALIZER`` configuration directive. There is built-insupport for using ``pickle``, ``JSON`` and ``YAML``, and you can add your owncustom serializers by registering them in the carrot serializer registry.You don't have to do any work on the worker receiving the task, theserialization method is sent with the message so the worker knows how todeserialize any task (of course, if you use a custom serializer, this must also beregistered in the worker.)When sending a task the serializition method is taken from the followingplaces in order: The ``serializer`` argument to ``apply_async``, theTask's ``serializer`` attribute, and finally the global default ``CELERY_SERIALIZER``configuration directive... code-block:: python    >>> AddTask.apply_async(args=[10, 10], serializer="JSON")Connections and connection timeouts.------------------------------------Currently there is no support for broker connection pooling in celery, butthis might change in the future. This is something you need to be aware ofwhen sending more than one task at a time, as ``apply_async`` establishes andcloses a connection every time.If you need to send more than one task at the same time, it's a good idea toestablish the connection yourself and pass it to ``apply_async``:.. code-block:: python    from carrot.connection import DjangoBrokerConnection    numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]    results = []    connection = DjangoBrokerConnection()    try:        for args in numbers:            res = AddTask.apply_async(args=args, connection=connection)            results.append(res)    finally:        connection.close()    print([res.get() for res in results])In Python 2.5 and above, you can use the ``with`` statement:.. code-block:: python    from __future__ import with_statement    from carrot.connection import DjangoBrokerConnection    numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]    results = []    with DjangoBrokerConnection() as connection:        for args in numbers:            res = AddTask.apply_async(args=args, connection=connection)            results.append(res)    print([res.get() for res in results])*NOTE* TaskSets already re-uses the same connection, but not if you need toexecute more than one TaskSet.The connection timeout is the number of seconds to wait before we give up onestablishing the connection, you can set this with the ``connect_timeout``argument to ``apply_async``:.. code-block:: python    AddTask.apply_async([10, 10], connect_timeout=3)or if you handle your connection manually by using the connection objects``timeout`` argument:.. code-block:: python    connection = DjangoAMQPConnection(timeout=3)Routing options---------------Celery uses the AMQP routing mechanisms to route tasks to different workers.You can route tasks using the following entitites: exchange, queue and routing key.Messages (tasks) are sent to exchanges, a queue binds to an exchange with arouting key. Let's look at an example:Our application has a lot of tasks, some process video, others process images,and some gathers collective intelligence about users. Some of these havehigher priority than others so we want to make sure the high priority tasksget sent to powerful machines, while low priority tasks are sent to dedicatedmachines that can handle these at their own pace, uninterrupted.For the sake of example we have only one exchange called ``tasks``.There are different types of exchanges that matches the routing key indifferent ways, the exchange types are:* direct    Matches the routing key exactly.* topic    In the topic exchange the routing key is made up of words separated by dots (``.``).    Words can be matched by the wildcars ``*`` and ``#``, where ``*`` matches one    exact word, and ``#`` matches one or many.    For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and    ``euro.stock.db`` but not ``stock.nasdaq``.(there are also other exchange types, but these are not used by celery)So, we create three queues, ``video``, ``image`` and ``lowpri`` that binds toour ``tasks`` exchange, for the queues we use the following binding keys::    video: video.#    image: image.#    lowpri: misc.#Now we can send our tasks to different worker machines, by making the workerslisten to different queues:.. code-block:: python    >>> CompressVideoTask.apply_async(args=[filename],    ...                               routing_key="video.compress")    >>> ImageRotateTask.apply_async(args=[filename, 360],                                    routing_key="image.rotate")    >>> ImageCropTask.apply_async(args=[filename, selection],                                  routing_key="image.crop")    >>> UpdateReccomendationsTask.apply_async(routing_key="misc.recommend")Later, if suddenly the image crop task is consuming a lot of resources,we can bind some new workers to handle just the ``"image.crop"`` task,by creating a new queue that binds to ``"image.crop``".AMQP options------------* mandatoryThis sets the delivery to be mandatory. An exception will be raisedif there are no running workers able to take on the task.* immediateRequest immediate delivery. Will raise an exceptionif the task cannot be routed to a worker immediately.* priorityA number between ``0`` and ``9``, where ``0`` is the highest priority.Note that RabbitMQ does not implement AMQP priorities, and maybe your brokerdoes not either, please consult your brokers documentation for moreinformation.
 |