executing.rst 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. .. _guide-executing:
  2. =================
  3. Executing Tasks
  4. =================
  5. .. contents::
  6. :local:
  7. .. _executing-basics:
  8. Basics
  9. ======
  10. Executing a task is done with :meth:`~@Task.apply_async`,
  11. or its shortcut: :meth:`~@Task.delay`.
  12. :meth:`~@Task.delay` is simple and convenient, as it looks like calling a regular
  13. function:
  14. .. code-block:: python
  15. Task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
  16. The same using :meth:`~@Task.apply_async` is written like this:
  17. .. code-block:: python
  18. Task.apply_async(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
  19. While `delay` is convenient, it doesn't give you as much control as using
  20. `apply_async`. With `apply_async` you can override the execution options
  21. available as attributes on the :class:`~@Task` class (see :ref:`task-options`).
  22. In addition you can set countdown/eta, task expiry, provide a custom broker
  23. connection and more.
  24. Let's go over these in more detail. All the examples uses a simple task
  25. called `add`, returning the sum of two positional arguments:
  26. .. code-block:: python
  27. @celery.task()
  28. def add(x, y):
  29. return x + y
  30. .. note::
  31. If the task is not registered in the current process
  32. then you can also execute a task by name.
  33. You do this by using the :meth:`@send_task` method of
  34. the celery instance
  35. .. code-block:: python
  36. >>> result = celery.send_task("tasks.add", [2, 2])
  37. >>> result.get()
  38. 4
  39. .. _executing-eta:
  40. ETA and countdown
  41. =================
  42. The ETA (estimated time of arrival) lets you set a specific date and time that
  43. is the earliest time at which your task will be executed. `countdown` is
  44. a shortcut to set eta by seconds into the future.
  45. .. code-block:: python
  46. >>> result = add.apply_async(args=[10, 10], countdown=3)
  47. >>> result.get() # this takes at least 3 seconds to return
  48. 20
  49. The task is guaranteed to be executed at some time *after* the
  50. specified date and time, but not necessarily at that exact time.
  51. Possible reasons for broken deadlines may include many items waiting
  52. in the queue, or heavy network latency. To make sure your tasks
  53. are executed in a timely manner you should monitor queue lengths. Use
  54. Munin, or similar tools, to receive alerts, so appropriate action can be
  55. taken to ease the workload. See :ref:`monitoring-munin`.
  56. While `countdown` is an integer, `eta` must be a :class:`~datetime.datetime`
  57. object, specifying an exact date and time (including millisecond precision,
  58. and timezone information):
  59. .. code-block:: python
  60. >>> from datetime import datetime, timedelta
  61. >>> tomorrow = datetime.now() + timedelta(days=1)
  62. >>> add.apply_async(args=[10, 10], eta=tomorrow)
  63. .. _executing-expiration:
  64. Expiration
  65. ==========
  66. The `expires` argument defines an optional expiry time,
  67. either as seconds after task publish, or a specific date and time using
  68. :class:`~datetime.datetime`:
  69. .. code-block:: python
  70. >>> # Task expires after one minute from now.
  71. >>> add.apply_async(args=[10, 10], expires=60)
  72. >>> # Also supports datetime
  73. >>> from datetime import datetime, timedelta
  74. >>> add.apply_async(args=[10, 10], kwargs,
  75. ... expires=datetime.now() + timedelta(days=1)
  76. When a worker receives an expired task it will mark
  77. the task as :state:`REVOKED` (:exc:`~@TaskRevokedError`).
  78. .. _executing-serializers:
  79. Serializers
  80. ===========
  81. Data transferred between clients and workers needs to be serialized.
  82. The default serializer is :mod:`pickle`, but you can
  83. change this globally or for each individual task.
  84. There is built-in support for :mod:`pickle`, `JSON`, `YAML`
  85. and `msgpack`, and you can also add your own custom serializers by registering
  86. them into the Kombu serializer registry (see `Kombu: Serialization of Data`_).
  87. .. _`Kombu: Serialization of Data`:
  88. http://packages.python.org/kombu/introduction.html#serialization-of-data
  89. Each option has its advantages and disadvantages.
  90. json -- JSON is supported in many programming languages, is now
  91. a standard part of Python (since 2.6), and is fairly fast to decode
  92. using the modern Python libraries such as :mod:`cjson` or :mod:`simplejson`.
  93. The primary disadvantage to JSON is that it limits you to the following
  94. data types: strings, Unicode, floats, boolean, dictionaries, and lists.
  95. Decimals and dates are notably missing.
  96. Also, binary data will be transferred using Base64 encoding, which will
  97. cause the transferred data to be around 34% larger than an encoding which
  98. supports native binary types.
  99. However, if your data fits inside the above constraints and you need
  100. cross-language support, the default setting of JSON is probably your
  101. best choice.
  102. See http://json.org for more information.
  103. pickle -- If you have no desire to support any language other than
  104. Python, then using the pickle encoding will gain you the support of
  105. all built-in Python data types (except class instances), smaller
  106. messages when sending binary files, and a slight speedup over JSON
  107. processing.
  108. See http://docs.python.org/library/pickle.html for more information.
  109. yaml -- YAML has many of the same characteristics as json,
  110. except that it natively supports more data types (including dates,
  111. recursive references, etc.)
  112. However, the Python libraries for YAML are a good bit slower than the
  113. libraries for JSON.
  114. If you need a more expressive set of data types and need to maintain
  115. cross-language compatibility, then YAML may be a better fit than the above.
  116. See http://yaml.org/ for more information.
  117. msgpack -- msgpack is a binary serialization format that is closer to JSON
  118. in features. It is very young however, and support should be considered
  119. experimental at this point.
  120. See http://msgpack.org/ for more information.
  121. The encoding used is available as a message header, so the worker knows how to
  122. deserialize any task. If you use a custom serializer, this serializer must
  123. be available for the worker.
  124. The client uses the following order to decide which serializer
  125. to use when sending a task:
  126. 1. The `serializer` argument to :meth:`~@Task.apply_async`
  127. 2. The :attr:`@-Task.serializer` attribute
  128. 3. The default :setting:`CELERY_TASK_SERIALIZER` setting.
  129. * Using the `serializer` argument to :meth:`~@Task.apply_async`:
  130. .. code-block:: python
  131. >>> add.apply_async(args=[10, 10], serializer="json")
  132. .. _executing-connections:
  133. Connections
  134. ===========
  135. .. admonition:: Automatic Pool Support
  136. Since version 2.3 there is support for automatic connection pools,
  137. so you don't have to manually handle connections and publishers
  138. to reuse connections.
  139. The connection pool is enabled by default since version 2.5.
  140. See the :setting:`BROKER_POOL_LIMIT` setting for more information.
  141. You can handle the connection manually by creating a
  142. publisher:
  143. .. code-block:: python
  144. results = []
  145. with add.app.pool.acquire(block=True) as connection:
  146. with add.get_publisher(connection) as publisher:
  147. try:
  148. for args in numbers:
  149. res = add.apply_async(args=args, publisher=publisher)
  150. results.append(res)
  151. print([res.get() for res in results])
  152. Though this particular example is much better expressed as a group:
  153. .. code-block:: python
  154. >>> from celery import group
  155. >>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
  156. >>> res = group(add.subtask(n) for i in numbers).apply_async()
  157. >>> res.get()
  158. [4, 8, 16, 32]
  159. .. _executing-routing:
  160. Routing options
  161. ===============
  162. Celery uses the AMQP routing mechanisms to route tasks to different workers.
  163. Messages (tasks) are sent to exchanges, a queue binds to an exchange with a
  164. routing key. Let's look at an example:
  165. Let's pretend we have an application with lot of different tasks: some
  166. process video, others process images, and some gather collective intelligence
  167. about its users. Some of these tasks are more important, so we want to make
  168. sure the high priority tasks get sent to dedicated nodes.
  169. For the sake of this example we have a single exchange called `tasks`.
  170. There are different types of exchanges, each type interpreting the routing
  171. key in different ways, implementing different messaging scenarios.
  172. The most common types used with Celery are `direct` and `topic`.
  173. * direct
  174. Matches the routing key exactly.
  175. * topic
  176. In the topic exchange the routing key is made up of words separated by
  177. dots (`.`). Words can be matched by the wild cards `*` and `#`,
  178. where `*` matches one exact word, and `#` matches one or many words.
  179. For example, `*.stock.#` matches the routing keys `usd.stock` and
  180. `euro.stock.db` but not `stock.nasdaq`.
  181. We create three queues, `video`, `image` and `lowpri` that binds to
  182. the `tasks` exchange. For the queues we use the following binding keys::
  183. video: video.#
  184. image: image.#
  185. lowpri: misc.#
  186. Now we can send our tasks to different worker machines, by making the workers
  187. listen to different queues:
  188. .. code-block:: python
  189. >>> add.apply_async(args=[filename],
  190. ... routing_key="video.compress")
  191. >>> add.apply_async(args=[filename, 360],
  192. ... routing_key="image.rotate")
  193. >>> add.apply_async(args=[filename, selection],
  194. ... routing_key="image.crop")
  195. >>> add.apply_async(routing_key="misc.recommend")
  196. Later, if the crop task is consuming a lot of resources,
  197. we can bind new workers to handle just the `"image.crop"` task,
  198. by creating a new queue that binds to `"image.crop`".
  199. .. seealso::
  200. To find out more about routing, please see :ref:`guide-routing`.
  201. .. _executing-amq-opts:
  202. AMQP options
  203. ============
  204. * mandatory
  205. This sets the delivery to be mandatory. An exception will be raised
  206. if there are no running workers able to take on the task.
  207. Not supported by :mod:`amqplib`.
  208. * immediate
  209. Request immediate delivery. Will raise an exception
  210. if the task cannot be routed to a worker immediately.
  211. Not supported by :mod:`amqplib`.
  212. * priority
  213. A number between `0` and `9`, where `0` is the highest priority.
  214. .. note::
  215. RabbitMQ does not yet support AMQP priorities.