calling.rst 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. .. _guide-calling:
  2. ===============
  3. Calling Tasks
  4. ===============
  5. .. contents::
  6. :local:
  7. :depth: 1
  8. .. _calling-basics:
  9. Basics
  10. ======
  11. This document describes Celery's uniform "Calling API"
  12. used by task instances and the :ref:`canvas <guide-canvas>`.
  13. The API defines a standard set of execution options, as well as three methods:
  14. - ``apply_async(args[, kwargs[, ...]])``
  15. Sends a task message.
  16. - ``delay(*args, **kwargs)``
  17. Shortcut to send a task message, but does not support execution
  18. options.
  19. - ``apply()``
  20. Does not send a message but executes the task inline instead.
  21. .. _calling-cheat:
  22. .. topic:: Quick Cheat Sheet
  23. - ``T.delay(arg, kwarg=value)``
  24. always a shortcut to ``.apply_async``.
  25. - ``T.apply_async((arg, ), {"kwarg": value})``
  26. - ``T.apply_async(countdown=10)``
  27. executes 10 seconds from now.
  28. - ``T.apply_async(eta=now + timedelta(seconds=10))``
  29. executes 10 seconds from now, specifed using ``eta``
  30. - ``T.apply_async(countdown=60, expires=120)``
  31. executes in one minute from now, but expires after 2 minutes.
  32. - ``T.apply_async(expires=now + timedelta(days=2))``
  33. expires in 2 days, set using :class:`~datetime.datetime`.
  34. Example
  35. -------
  36. The :meth:`~@Task.delay` method is convenient as it looks like calling a regular
  37. function:
  38. .. code-block:: python
  39. task.delay(arg1, arg2, kwarg1="x", kwarg2="y")
  40. Using :meth:`~@Task.apply_async` instead we have to write:
  41. .. code-block:: python
  42. task.apply_async(args=[arg1, arg2], kwargs={"kwarg1": "x", "kwarg2": "y"})
  43. .. sidebar:: Tip
  44. If the task is not registered in the current process
  45. you can use :meth:`~@send_task` to call the task by name instead.
  46. So `delay` is clearly convenient, but if you want to set additional execution
  47. options you have to use ``apply_async``.
  48. The rest of this document will go into the task execution
  49. options in detail. All examples use a task
  50. called `add`, returning the sum of two arguments:
  51. .. code-block:: python
  52. @celery.task()
  53. def add(x, y):
  54. return x + y
  55. .. topic:: There's another way...
  56. You will learn more about this later while reading about the :ref:`Canvas
  57. <guide-canvas>`, but :class:`~celery.subtask`'s are objects used to pass around
  58. the signature of a task invocation, (for example to send it over the
  59. network), and they also support the Calling API:
  60. .. code-block:: python
  61. task.s(arg1, arg2, kwarg1="x", kwargs2="y").apply_async()
  62. .. _calling-links:
  63. Linking (callbacks/errbacks)
  64. ============================
  65. Celery supports linking tasks together so that one task follows another.
  66. The callback task will be applied with the result of the parent task
  67. as a partial argument:
  68. .. code-block:: python
  69. add.apply_async((2, 2), link=add.s(16))
  70. .. sidebar:: What is ``s``?
  71. The ``add.s`` call used here is called a subtask, we talk
  72. more about subtasks in the :ref:`canvas guide <guide-canvas>`,
  73. where you can also learn about :class:`~celery.chain`, which
  74. is a simpler way to chain tasks together.
  75. In practice the ``link`` execution option is considered an internal
  76. primitive, and you will probably not use it directly, but
  77. rather use chains instead.
  78. Here the result of the first task (4) will be sent to a new
  79. task that adds 16 to the previous result, forming the expression
  80. :math:`(2 + 2) + 16 = 20`
  81. You can also cause a callback to be applied if task raises an exception
  82. (*errback*), but this behaves differently from a regular callback
  83. in that it will be passed the id of the parent task, not the result.
  84. This is because it may not always be possible to serialize
  85. the exception raised, and so this way the error callback requires
  86. a result backend to be enabled, and the task must retrieve the result
  87. of the task instead.
  88. This is an example error callback:
  89. .. code-block:: python
  90. @celery.task()
  91. def error_handler(uuid):
  92. result = AsyncResult(uuid)
  93. exc = result.get(propagate=False)
  94. print("Task %r raised exception: %r\n%r" % (
  95. exc, result.traceback))
  96. it can be added to the task using the ``link_error`` execution
  97. option:
  98. .. code-block:: python
  99. add.apply_async((2, 2), link_error=error_handler.s())
  100. In addition, both the ``link`` and ``link_error`` options can be expressed
  101. as a list::
  102. add.apply_async((2, 2), link=[add.s(16), other_task.s()])
  103. The callbacks/errbacks will then be called in order, and all
  104. callbacks will be called with the return value of the parent task
  105. as a partial argument.
  106. .. _calling-eta:
  107. ETA and countdown
  108. =================
  109. The ETA (estimated time of arrival) lets you set a specific date and time that
  110. is the earliest time at which your task will be executed. `countdown` is
  111. a shortcut to set eta by seconds into the future.
  112. .. code-block:: python
  113. >>> result = add.apply_async((2, 2), countdown=3)
  114. >>> result.get() # this takes at least 3 seconds to return
  115. 20
  116. The task is guaranteed to be executed at some time *after* the
  117. specified date and time, but not necessarily at that exact time.
  118. Possible reasons for broken deadlines may include many items waiting
  119. in the queue, or heavy network latency. To make sure your tasks
  120. are executed in a timely manner you should monitor the queue for congestion. Use
  121. Munin, or similar tools, to receive alerts, so appropriate action can be
  122. taken to ease the workload. See :ref:`monitoring-munin`.
  123. While `countdown` is an integer, `eta` must be a :class:`~datetime.datetime`
  124. object, specifying an exact date and time (including millisecond precision,
  125. and timezone information):
  126. .. code-block:: python
  127. >>> from datetime import datetime, timedelta
  128. >>> tomorrow = datetime.utcnow() + timedelta(days=1)
  129. >>> add.apply_async((2, 2), eta=tomorrow)
  130. .. _calling-expiration:
  131. Expiration
  132. ==========
  133. The `expires` argument defines an optional expiry time,
  134. either as seconds after task publish, or a specific date and time using
  135. :class:`~datetime.datetime`:
  136. .. code-block:: python
  137. >>> # Task expires after one minute from now.
  138. >>> add.apply_async((10, 10), expires=60)
  139. >>> # Also supports datetime
  140. >>> from datetime import datetime, timedelta
  141. >>> add.apply_async((10, 10), kwargs,
  142. ... expires=datetime.now() + timedelta(days=1)
  143. When a worker receives an expired task it will mark
  144. the task as :state:`REVOKED` (:exc:`~@TaskRevokedError`).
  145. .. _calling-retry:
  146. Message Sending Retry
  147. =====================
  148. Celery will automatically retry sending messages in the event of connection
  149. failure, and retry behavior can be configured -- like how often to retry, or a maximum
  150. number of retries -- or disabled all together.
  151. To disable retry you can set the ``retry`` execution option to :const:`False`:
  152. .. code-block:: python
  153. add.apply_async((2, 2), retry=False)
  154. .. topic:: Related Settings
  155. .. hlist::
  156. :columns: 2
  157. - :setting:`CELERY_TASK_PUBLISH_RETRY`
  158. - :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`
  159. Retry Policy
  160. ------------
  161. A retry policy is a mapping that controls how retries behave,
  162. and can contain the following keys:
  163. - `max_retries`
  164. Maximum number of retries before giving up, in this case the
  165. exception that caused the retry to fail will be raised.
  166. A value of 0 or :const:`None` means it will retry forever.
  167. The default is to retry 3 times.
  168. - `interval_start`
  169. Defines the number of seconds (float or integer) to wait between
  170. retries. Default is 0, which means the first retry will be
  171. instantaneous.
  172. - `interval_step`
  173. On each consecutive retry this number will be added to the retry
  174. delay (float or integer). Default is 0.2.
  175. - `interval_max`
  176. Maximum number of seconds (float or integer) to wait between
  177. retries. Default is 0.2.
  178. For example, the default policy correlates to:
  179. .. code-block:: python
  180. add.apply_async((2, 2), retry=True, retry_policy={
  181. "max_retries": 3,
  182. "interval_start": 0,
  183. "interval_step": 0.2,
  184. "interval_max": 0.2,
  185. })
  186. the maximum time spent retrying will be 0.4 seconds. It is set relatively
  187. short by default because a connection failure could lead to a retry pile effect
  188. if the broker connection is down: e.g. many web server processes waiting
  189. to retry blocking other incoming requests.
  190. .. _calling-serializers:
  191. Serializers
  192. ===========
  193. .. sidebar:: Security
  194. The pickle module allows for execution of arbitrary functions,
  195. please see the :ref:`security guide <guide-security>`.
  196. Celery also comes with a special serializer that uses
  197. cryptography to sign your messages.
  198. Data transferred between clients and workers needs to be serialized,
  199. so every message in Celery has a ``content_type`` header that
  200. describes the serialization method used to encode it.
  201. The default serializer is :mod:`pickle`, but you can
  202. change this using the :setting:`CELERY_TASK_SERIALIZER` setting,
  203. or for each individual task, or even per message.
  204. There's built-in support for :mod:`pickle`, `JSON`, `YAML`
  205. and `msgpack`, and you can also add your own custom serializers by registering
  206. them into the Kombu serializer registry (see `Kombu: Serialization of Data`_).
  207. .. _`Kombu: Serialization of Data`:
  208. http://packages.python.org/kombu/introduction.html#serialization-of-data
  209. Each option has its advantages and disadvantages.
  210. json -- JSON is supported in many programming languages, is now
  211. a standard part of Python (since 2.6), and is fairly fast to decode
  212. using the modern Python libraries such as :mod:`cjson` or :mod:`simplejson`.
  213. The primary disadvantage to JSON is that it limits you to the following
  214. data types: strings, Unicode, floats, boolean, dictionaries, and lists.
  215. Decimals and dates are notably missing.
  216. Also, binary data will be transferred using Base64 encoding, which will
  217. cause the transferred data to be around 34% larger than an encoding which
  218. supports native binary types.
  219. However, if your data fits inside the above constraints and you need
  220. cross-language support, the default setting of JSON is probably your
  221. best choice.
  222. See http://json.org for more information.
  223. pickle -- If you have no desire to support any language other than
  224. Python, then using the pickle encoding will gain you the support of
  225. all built-in Python data types (except class instances), smaller
  226. messages when sending binary files, and a slight speedup over JSON
  227. processing.
  228. See http://docs.python.org/library/pickle.html for more information.
  229. yaml -- YAML has many of the same characteristics as json,
  230. except that it natively supports more data types (including dates,
  231. recursive references, etc.)
  232. However, the Python libraries for YAML are a good bit slower than the
  233. libraries for JSON.
  234. If you need a more expressive set of data types and need to maintain
  235. cross-language compatibility, then YAML may be a better fit than the above.
  236. See http://yaml.org/ for more information.
  237. msgpack -- msgpack is a binary serialization format that is closer to JSON
  238. in features. It is very young however, and support should be considered
  239. experimental at this point.
  240. See http://msgpack.org/ for more information.
  241. The encoding used is available as a message header, so the worker knows how to
  242. deserialize any task. If you use a custom serializer, this serializer must
  243. be available for the worker.
  244. The following order is used to decide which serializer
  245. to use when sending a task:
  246. 1. The `serializer` execution option.
  247. 2. The :attr:`@-Task.serializer` attribute
  248. 3. The :setting:`CELERY_TASK_SERIALIZER` setting.
  249. Example setting a custom serializer for a single task invocation:
  250. .. code-block:: python
  251. >>> add.apply_async((10, 10), serializer="json")
  252. .. _calling-compression:
  253. Compression
  254. ===========
  255. Celery can compress the messages using either *gzip*, or *bzip2*.
  256. You can also create your own compression schemes and register
  257. them in the :func:`kombu compression registry <kombu.compression.register>`.
  258. The following order is used to decide which compression scheme
  259. to use when sending a task:
  260. 1. The `compression` execution option.
  261. 2. The :attr:`@-Task.compression` attribute.
  262. 3. The :setting:`CELERY_MESSAGE_COMPRESSION` attribute.
  263. Example specifying the compression used when calling a task::
  264. >>> add.apply_async((2, 2), compression="zlib")
  265. .. _calling-connections:
  266. Connections
  267. ===========
  268. .. sidebar:: Automatic Pool Support
  269. Since version 2.3 there is support for automatic connection pools,
  270. so you don't have to manually handle connections and publishers
  271. to reuse connections.
  272. The connection pool is enabled by default since version 2.5.
  273. See the :setting:`BROKER_POOL_LIMIT` setting for more information.
  274. You can handle the connection manually by creating a
  275. publisher:
  276. .. code-block:: python
  277. results = []
  278. with add.app.pool.acquire(block=True) as connection:
  279. with add.get_publisher(connection) as publisher:
  280. try:
  281. for args in numbers:
  282. res = add.apply_async((2, 2), publisher=publisher)
  283. results.append(res)
  284. print([res.get() for res in results])
  285. Though this particular example is much better expressed as a group:
  286. .. code-block:: python
  287. >>> from celery import group
  288. >>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
  289. >>> res = group(add.subtask(n) for i in numbers).apply_async()
  290. >>> res.get()
  291. [4, 8, 16, 32]
  292. .. _calling-routing:
  293. Routing options
  294. ===============
  295. Celery can route tasks to different queues.
  296. Simple routing (name <-> name) is accomplished using the ``queue`` option::
  297. add.apply_async(queue="priority.high")
  298. You can then assign workers to the ``priority.high`` queue by using
  299. the workers :option:`-Q` argument::
  300. $ celery worker -l info -Q celery,priority.high
  301. .. seealso::
  302. Hard-coding queue names in code is not recommended, the best practice
  303. is to use configuration routers (:setting:`CELERY_ROUTES`).
  304. To find out more about routing, please see :ref:`guide-routing`.
  305. Advanced Options
  306. ----------------
  307. These options are for advanced users who want to take use of
  308. AMQP's full routing capabilities. Interested parties may read the
  309. :ref:`routing guide <guide-routing>`.
  310. - exchange
  311. Name of exchange (or a :class:`kombu.entity.Exchange`) to
  312. send the message to.
  313. - routing_key
  314. Routing key used to determine.
  315. - mandatory
  316. This sets the delivery to be mandatory. An exception will be raised
  317. if there are no running workers able to take on the task.
  318. Not supported by :mod:`amqplib`.
  319. - immediate
  320. Request immediate delivery. Will raise an exception
  321. if the task cannot be routed to a worker immediately.
  322. Not supported by :mod:`amqplib`.
  323. - priority
  324. A number between `0` and `9`, where `0` is the highest priority.
  325. Supported by: redis, beanstalk