executing.rst 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. .. _guide-executing:
  2. =================
  3. Executing Tasks
  4. =================
  5. .. contents::
  6. :local:
  7. .. _executing-basics:
  8. Basics
  9. ======
  10. Executing tasks is done with :meth:`~celery.task.Base.Task.apply_async`,
  11. and its shortcut: :meth:`~celery.task.Base.Task.delay`.
  12. ``delay`` is simple and convenient, as it looks like calling a regular
  13. function:
  14. .. code-block:: python
  15. Task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
  16. The same thing using ``apply_async`` is written like this:
  17. .. code-block:: python
  18. Task.apply_async(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
  19. While ``delay`` is convenient, it doesn't give you as much control as using
  20. ``apply_async``. With ``apply_async`` you can override the execution options
  21. available as attributes on the ``Task`` class: ``routing_key``, ``exchange``,
  22. ``immediate``, ``mandatory``, ``priority``, and ``serializer``.
  23. In addition you can set a countdown/eta, or provide a custom broker connection.
  24. Let's go over these in more detail. The following examples use this simple
  25. task, which adds together two numbers:
  26. .. code-block:: python
  27. @task
  28. def add(x, y):
  29. return x + y
  30. .. note::
  31. You can also execute a task by name using
  32. :func:`~celery.execute.send_task`, if you don't have access to the
  33. task class::
  34. >>> from celery.execute import send_task
  35. >>> result = send_task("tasks.add", [2, 2])
  36. >>> result.get()
  37. 4
  38. .. _executing-eta:
  39. ETA and countdown
  40. =================
  41. The ETA (estimated time of arrival) lets you set a specific date and time that
  42. is the earliest time at which your task will execute. ``countdown`` is
  43. a shortcut to set this by seconds in the future.
  44. .. code-block:: python
  45. >>> result = add.apply_async(args=[10, 10], countdown=3)
  46. >>> result.get() # this takes at least 3 seconds to return
  47. 20
  48. Note that your task is guaranteed to be executed at some time *after* the
  49. specified date and time has passed, but not necessarily at that exact time.
  50. While ``countdown`` is an integer, ``eta`` must be a :class:`~datetime.datetime` object,
  51. specifying an exact date and time in the future. This is good if you already
  52. have a :class:`~datetime.datetime` object and need to modify it with a
  53. :class:`~datetime.timedelta`, or when using time in seconds is not very readable.
  54. .. code-block:: python
  55. from datetime import datetime, timedelta
  56. def add_tomorrow(username):
  57. """Add this tomorrow."""
  58. tomorrow = datetime.now() + timedelta(days=1)
  59. add.apply_async(args=[10, 10], eta=tomorrow)
  60. .. _executing-expiration:
  61. Expiration
  62. ==========
  63. The ``expires`` argument defines an optional expiry time,
  64. either as seconds after task publish, or a specific date and time using
  65. :class:~datetime.datetime`.
  66. >>> # Task expires after one minute from now.
  67. >>> add.apply_async(args=[10, 10], expires=60)
  68. >>> # Also supports datetime
  69. >>> from datetime import datetime, timedelta
  70. >>> add.apply_async(args=[10, 10], kwargs,
  71. ... expires=datetime.now() + timedelta(days=1)
  72. When a worker receives a task that has been expired it will mark
  73. the task as :state:`REVOKED` (:exc:`~celery.exceptions.TaskRevokedError`).
  74. .. _executing-serializers:
  75. Serializers
  76. ===========
  77. Data passed between celery and workers has to be serialized to be
  78. transferred. The default serializer is :mod:`pickle`, but you can
  79. change this for each
  80. task. There is built-in support for using :mod:`pickle`, ``JSON``, ``YAML``
  81. and ``msgpack``. You can also add your own custom serializers by registering
  82. them into the Carrot serializer registry.
  83. The default serializer (pickle) supports Python objects, like ``datetime`` and
  84. any custom datatypes you define yourself. But since pickle has poor support
  85. outside of the Python language, you need to choose another serializer if you
  86. need to communicate with other languages. In that case, ``JSON`` is a very
  87. popular choice.
  88. The serialization method is sent with the message, so the worker knows how to
  89. deserialize any task. Of course, if you use a custom serializer, this must
  90. also be registered in the worker.
  91. When sending a task the serialization method is taken from the following
  92. places in order: The ``serializer`` argument to ``apply_async``, the
  93. Task's ``serializer`` attribute, and finally the global default
  94. :setting:`CELERY_TASK_SERIALIZER` configuration directive.
  95. .. code-block:: python
  96. >>> add.apply_async(args=[10, 10], serializer="json")
  97. .. _executing-connections:
  98. Connections and connection timeouts.
  99. ====================================
  100. Currently there is no support for broker connection pools in celery,
  101. so this is something you need to be aware of when sending more than
  102. one task at a time, as ``apply_async``/``delay`` establishes and
  103. closes a connection every time.
  104. If you need to send more than one task at the same time, it's a good idea to
  105. establish the connection yourself and pass it to ``apply_async``:
  106. .. code-block:: python
  107. numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
  108. results = []
  109. publisher = add.get_publisher()
  110. try:
  111. for args in numbers:
  112. res = add.apply_async(args=args, publisher=publisher)
  113. results.append(res)
  114. finally:
  115. publisher.close()
  116. publisher.connection.close()
  117. print([res.get() for res in results])
  118. The connection timeout is the number of seconds to wait before we give up
  119. establishing the connection. You can set this with the ``connect_timeout``
  120. argument to ``apply_async``:
  121. .. code-block:: python
  122. add.apply_async([10, 10], connect_timeout=3)
  123. Or if you handle the connection manually:
  124. .. code-block:: python
  125. publisher = add.get_publisher(connect_timeout=3)
  126. .. _executing-routing:
  127. Routing options
  128. ===============
  129. Celery uses the AMQP routing mechanisms to route tasks to different workers.
  130. You can route tasks using the following entities: exchange, queue and routing key.
  131. Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
  132. routing key. Let's look at an example:
  133. Our application has a lot of tasks, some process video, others process images,
  134. and some gather collective intelligence about users. Some of these have
  135. higher priority than others so we want to make sure the high priority tasks
  136. get sent to powerful machines, while low priority tasks are sent to dedicated
  137. machines that can handle these at their own pace.
  138. For the sake of example we have only one exchange called ``tasks``.
  139. There are different types of exchanges that matches the routing key in
  140. different ways, the exchange types are:
  141. * direct
  142. Matches the routing key exactly.
  143. * topic
  144. In the topic exchange the routing key is made up of words separated by dots (``.``).
  145. Words can be matched by the wild cards ``*`` and ``#``, where ``*`` matches one
  146. exact word, and ``#`` matches one or many.
  147. For example, ``*.stock.#`` matches the routing keys ``usd.stock`` and
  148. ``euro.stock.db`` but not ``stock.nasdaq``.
  149. (there are also other exchange types, but these are not used by celery)
  150. So, we create three queues, ``video``, ``image`` and ``lowpri`` that bind to
  151. our ``tasks`` exchange. For the queues we use the following binding keys::
  152. video: video.#
  153. image: image.#
  154. lowpri: misc.#
  155. Now we can send our tasks to different worker machines, by making the workers
  156. listen to different queues:
  157. .. code-block:: python
  158. >>> add.apply_async(args=[filename],
  159. ... routing_key="video.compress")
  160. >>> add.apply_async(args=[filename, 360],
  161. ... routing_key="image.rotate")
  162. >>> add.apply_async(args=[filename, selection],
  163. ... routing_key="image.crop")
  164. >>> add.apply_async(routing_key="misc.recommend")
  165. Later, if the crop task is consuming a lot of resources,
  166. we can bind some new workers to handle just the ``"image.crop"`` task,
  167. by creating a new queue that binds to ``"image.crop``".
  168. .. seealso::
  169. To find out more about routing, please see :ref:`guide-routing`.
  170. .. _executing-amq-opts:
  171. AMQP options
  172. ============
  173. .. warning::
  174. The ``mandatory`` and ``immediate`` flags are not supported by
  175. :mod:`amqplib` at this point.
  176. * mandatory
  177. This sets the delivery to be mandatory. An exception will be raised
  178. if there are no running workers able to take on the task.
  179. * immediate
  180. Request immediate delivery. Will raise an exception
  181. if the task cannot be routed to a worker immediately.
  182. * priority
  183. A number between ``0`` and ``9``, where ``0`` is the highest priority.
  184. .. note::
  185. RabbitMQ does not yet support AMQP priorities.