executing.rst 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. =================
  2. Executing Tasks
  3. =================
  4. Executing tasks is done with ``apply_async``, and it's shortcut ``delay`.
  5. ``delay`` is simple and convenient, as it looks like calling a regular
  6. function:
  7. .. code-block:: python
  8. MyTask.delay(arg1, arg2, kwarg1="x", kwarg2="y")
  9. The same thing using ``apply_async`` is written like this:
  10. .. code-block:: python
  11. AddTask.delay(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
  12. But ``delay`` doesn't give you as much control as using ``apply_async``.
  13. With ``apply_async`` you can override the execution options available as attributes on
  14. the ``Task`` class; ``routing_key``, ``exchange``, ``immediate``, ``mandatory``,
  15. ``priority``, and ``serializer``. In addition you can set a countdown or an eta, provide
  16. a custom broker connection or change the broker connection timeout.
  17. Let's go over these in more detail. The following examples uses this simple
  18. task, used to add two numbers:
  19. .. code-block:: python
  20. class AddTask(Task):
  21. def run(self, x, y):
  22. return x + y
  23. ETA and countdown
  24. -----------------
  25. The ETA (estimated time of arrival) lets you set a specific date and time for
  26. when after, your task should execute. ``countdown`` is a shortcut to set this
  27. by seconds into the future.
  28. .. code-block:: python
  29. >>> result = AddTask.apply_async(args=[10, 10], countdown=3)
  30. >>> result.get() # this takes at least 3 seconds to return
  31. 20
  32. Note that your task is guaranteed to be executed at some time *after* the
  33. specified date and time has passed, but not necessarily at that exact time.
  34. While ``countdown`` is an integer, ``eta`` must be a ``datetime`` object,
  35. specifying an exact date and time in the future. This is good if you already
  36. have a ``datatime`` object and need to modify it with a ``timedelta``, or when
  37. specifing the time in seconds is not very readable.
  38. .. code-block:: python
  39. from datetime import datetime, timedelta
  40. def quickban(username):
  41. """Ban user for 24 hours."""
  42. ban(username)
  43. tomorrow = datetime.now() + timedelta(days=1)
  44. UnbanTask.apply_async(args=[username], eta=tomorrow)
  45. Serializer
  46. ----------
  47. The default serializer used is :mod:`pickle`, but you can change this default by
  48. changing the ``CELERY_SERIALIZER`` configuration directive. There is built-in
  49. support for using ``pickle``, ``JSON`` and ``YAML``, and you can add your own
  50. custom serializers by registering them in the carrot serializer registry.
  51. You don't have to do any work on the worker receiving the task, the
  52. serialization method is sent with the message so the worker knows how to
  53. deserialize any task (of course, if you use a custom serializer, this must also be
  54. registered in the worker.)
  55. When sending a task the serializition method is taken from the following
  56. places in order: The ``serializer`` argument to ``apply_async``, the
  57. Task's ``serializer`` attribute, and finally the global default ``CELERY_SERIALIZER``
  58. configuration directive.
  59. .. code-block:: python
  60. >>> AddTask.apply_async(args=[10, 10], serializer="JSON")
  61. Connections and connection timeouts.
  62. ------------------------------------
  63. Currently there is no support for broker connection pooling in celery, but
  64. this might change in the future. This is something you need to be aware of
  65. when sending more than one task at a time, as ``apply_async`` establishes and
  66. closes a connection every time.
  67. If you need to send more than one task at the same time, it's a good idea to
  68. establish the connectin yourself and pass it to ``apply_async``:
  69. .. code-block:: python
  70. from carrot.connection import DjangoBrokerConnection
  71. numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
  72. results = []
  73. connection = DjangoBrokerConnection()
  74. try:
  75. for args in numbers:
  76. res = AddTask.apply_async(args=args, connection=connection)
  77. results.append(res)
  78. finally:
  79. connection.close()
  80. print([res.get() for res in results])
  81. In python 2.5 and above you can use the ``with`` statement with carrot
  82. connections:
  83. .. code-block:: python
  84. from __future__ import with_statement
  85. from carrot.connection import DjangoBrokerConnection
  86. numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
  87. results = []
  88. with DjangoBrokerConnection() as connection:
  89. for args in numbers:
  90. res = AddTask.apply_async(args=args, connection=connection)
  91. results.append(res)
  92. print([res.get() for res in results])
  93. *NOTE* TaskSets already re-uses the same connection, but not if you need to
  94. execute more than one TaskSet.
  95. The connection timeout is the number of seconds to wait before we give up on
  96. establishing the connection, you can set this with the ``connect_timeout``
  97. argument to ``apply_async``:
  98. .. code-block:: python
  99. AddTask.apply_async([10, 10], connect_timeout=3)
  100. or if you handle your connection manually by using the connection objects
  101. ``timeout`` argument:
  102. .. code-block:: python
  103. connection = DjangoAMQPConnection(timeout=3)
  104. Routing options
  105. ---------------
  106. Celery uses the AMQP routing mechanisms to route tasks to different workers.
  107. You can route tasks using the following entitites: exchange, queue and routing key.
  108. Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
  109. routing key. Let's look at an example:
  110. Our application has a lot of tasks, some process video, others process images,
  111. and some gathers collective intelligence about users. Some of these have
  112. higher priority than others so we want to make sure the high priority tasks
  113. get sent to powerful machines, while low priority tasks are sent to dedicated
  114. machines that can handle these at their own pace, uninterrupted.
  115. For the sake of example we have only one exchange called ``tasks``.
  116. There are different types of exchanges that matches the routing key in
  117. different ways, the exchange types are:
  118. * direct
  119. Matches the routing key exactly.
  120. * topic
  121. In the topic exchange the routing key is made up of words separated by dots (``.``).
  122. Words can be matched by the wildcars ``*`` and ``#``, where ``*`` matches one
  123. exact word, and ``#`` matches one or many.
  124. For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and
  125. ``euro.stock.db`` but not ``stock.nasdaq``.
  126. (there are also other exchange types, but these are not used by celery)
  127. So, we create three queues, ``video``, ``image`` and ``lowpri`` that binds to
  128. our ``tasks`` exchange, for the queues we use the following binding keys::
  129. video: video.#
  130. image: image.#
  131. lowpri: misc.#
  132. Now we can send our tasks to different worker machines, by making the workers
  133. listen to different queues:
  134. .. code-block:: python
  135. >>> CompressVideoTask.apply_async(args=[filename],
  136. ... routing_key="video.compress")
  137. >>> ImageRotateTask.apply_async(args=[filename, 360],
  138. routing_key="image.rotate")
  139. >>> ImageCropTask.apply_async(args=[filename, selection],
  140. routing_key="image.crop")
  141. >>> UpdateReccomendationsTask.apply_async(routing_key="misc.recommend")
  142. Later, if suddenly the image crop task is consuming a lot of resources,
  143. we can bind some new workers to handle just the ``"image.crop"`` task,
  144. by creating a new queue that binds to ``"image.crop``".
  145. AMQP options
  146. ------------
  147. * mandatory
  148. This sets the delivery to be mandatory. An exception will be raised
  149. if there are no running workers able to take on the task.
  150. * immediate
  151. Request immediate delivery. Will raise an exception
  152. if the task cannot be routed to a worker immediately.
  153. * priority
  154. A number between ``0`` and ``9``, where ``0`` is the highest priority.
  155. Note that RabbitMQ does not implement AMQP priorities, and maybe your broker
  156. does not either, please consult your brokers documentation for more
  157. information.