calling.rst 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. .. _guide-calling:
  2. ===============
  3. Calling Tasks
  4. ===============
  5. .. contents::
  6. :local:
  7. :depth: 1
  8. .. _calling-basics:
  9. Basics
  10. ======
  11. This document describes Celery's uniform "Calling API"
  12. used by task instances and the :ref:`canvas <guide-canvas>`.
  13. The API defines a standard set of execution options, as well as three methods:
  14. - ``apply_async(args[, kwargs[, …]])``
  15. Sends a task message.
  16. - ``delay(*args, **kwargs)``
  17. Shortcut to send a task message, but doesn't support execution
  18. options.
  19. - *calling* (``__call__``)
  20. Applying an object supporting the calling API (e.g., ``add(2, 2)``)
  21. means that the task will not be executed by a worker, but in the current
  22. process instead (a message won't be sent).
  23. .. _calling-cheat:
  24. .. topic:: Quick Cheat Sheet
  25. - ``T.delay(arg, kwarg=value)``
  26. Star arguments shortcut to ``.apply_async``.
  27. (``.delay(*args, **kwargs)`` calls ``.apply_async(args, kwargs)``).
  28. - ``T.apply_async((arg,), {'kwarg': value})``
  29. - ``T.apply_async(countdown=10)``
  30. executes 10 seconds from now.
  31. - ``T.apply_async(eta=now + timedelta(seconds=10))``
  32. executes 10 seconds from now, specified using ``eta``
  33. - ``T.apply_async(countdown=60, expires=120)``
  34. executes in one minute from now, but expires after 2 minutes.
  35. - ``T.apply_async(expires=now + timedelta(days=2))``
  36. expires in 2 days, set using :class:`~datetime.datetime`.
  37. Example
  38. -------
  39. The :meth:`~@Task.delay` method is convenient as it looks like calling a regular
  40. function:
  41. .. code-block:: python
  42. task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
  43. Using :meth:`~@Task.apply_async` instead you have to write:
  44. .. code-block:: python
  45. task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
  46. .. sidebar:: Tip
  47. If the task isn't registered in the current process
  48. you can use :meth:`~@send_task` to call the task by name instead.
  49. So `delay` is clearly convenient, but if you want to set additional execution
  50. options you have to use ``apply_async``.
  51. The rest of this document will go into the task execution
  52. options in detail. All examples use a task
  53. called `add`, returning the sum of two arguments:
  54. .. code-block:: python
  55. @app.task
  56. def add(x, y):
  57. return x + y
  58. .. topic:: There's another way…
  59. You'll learn more about this later while reading about the :ref:`Canvas
  60. <guide-canvas>`, but :class:`~celery.signature`'s are objects used to pass around
  61. the signature of a task invocation, (for example to send it over the
  62. network), and they also support the Calling API:
  63. .. code-block:: python
  64. task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()
  65. .. _calling-links:
  66. Linking (callbacks/errbacks)
  67. ============================
  68. Celery supports linking tasks together so that one task follows another.
  69. The callback task will be applied with the result of the parent task
  70. as a partial argument:
  71. .. code-block:: python
  72. add.apply_async((2, 2), link=add.s(16))
  73. .. sidebar:: What's ``s``?
  74. The ``add.s`` call used here is called a signature. If you
  75. don't know what they are you should read about them in the
  76. :ref:`canvas guide <guide-canvas>`.
  77. There you can also learn about :class:`~celery.chain`: a simpler
  78. way to chain tasks together.
  79. In practice the ``link`` execution option is considered an internal
  80. primitive, and you'll probably not use it directly, but
  81. use chains instead.
  82. Here the result of the first task (4) will be sent to a new
  83. task that adds 16 to the previous result, forming the expression
  84. :math:`(2 + 2) + 16 = 20`
  85. You can also cause a callback to be applied if task raises an exception
  86. (*errback*), but this behaves differently from a regular callback
  87. in that it will be passed the id of the parent task, not the result.
  88. This is because it may not always be possible to serialize
  89. the exception raised, and so this way the error callback requires
  90. a result backend to be enabled, and the task must retrieve the result
  91. of the task instead.
  92. This is an example error callback:
  93. .. code-block:: python
  94. @app.task
  95. def error_handler(uuid):
  96. result = AsyncResult(uuid)
  97. exc = result.get(propagate=False)
  98. print('Task {0} raised exception: {1!r}\n{2!r}'.format(
  99. uuid, exc, result.traceback))
  100. it can be added to the task using the ``link_error`` execution
  101. option:
  102. .. code-block:: python
  103. add.apply_async((2, 2), link_error=error_handler.s())
  104. In addition, both the ``link`` and ``link_error`` options can be expressed
  105. as a list:
  106. .. code-block:: python
  107. add.apply_async((2, 2), link=[add.s(16), other_task.s()])
  108. The callbacks/errbacks will then be called in order, and all
  109. callbacks will be called with the return value of the parent task
  110. as a partial argument.
  111. .. _calling-on-message:
  112. On message
  113. ============================
  114. Celery supports catching all states changes by setting on_message callback.
  115. For example for long-running tasks to send task progress you can do something like this:
  116. .. code-block:: python
  117. @app.task(bind=True)
  118. def hello(self, a, b):
  119. time.sleep(1)
  120. self.update_state(state="PROGRESS", meta={'progress': 50})
  121. time.sleep(1)
  122. self.update_state(state="PROGRESS", meta={'progress': 90})
  123. time.sleep(1)
  124. return 'hello world: %i' % (a+b)
  125. .. code-block:: python
  126. def on_raw_message(body):
  127. print(body)
  128. r = hello.apply_async()
  129. print(r.get(on_message=on_raw_message, propagate=False))
  130. Will generate output like this:
  131. .. code-block::
  132. {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 50}, 'children': [], 'status': 'PROGRESS', 'traceback': None}
  133. {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 90}, 'children': [], 'status': 'PROGRESS', 'traceback': None}
  134. {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': 'hello world: 10', 'children': [], 'status': 'SUCCESS', 'traceback': None}
  135. hello world: 10
  136. .. _calling-eta:
  137. ETA and Countdown
  138. =================
  139. The ETA (estimated time of arrival) lets you set a specific date and time that
  140. is the earliest time at which your task will be executed. `countdown` is
  141. a shortcut to set ETA by seconds into the future.
  142. .. code-block:: pycon
  143. >>> result = add.apply_async((2, 2), countdown=3)
  144. >>> result.get() # this takes at least 3 seconds to return
  145. 20
  146. The task is guaranteed to be executed at some time *after* the
  147. specified date and time, but not necessarily at that exact time.
  148. Possible reasons for broken deadlines may include many items waiting
  149. in the queue, or heavy network latency. To make sure your tasks
  150. are executed in a timely manner you should monitor the queue for congestion. Use
  151. Munin, or similar tools, to receive alerts, so appropriate action can be
  152. taken to ease the workload. See :ref:`monitoring-munin`.
  153. While `countdown` is an integer, `eta` must be a :class:`~datetime.datetime`
  154. object, specifying an exact date and time (including millisecond precision,
  155. and timezone information):
  156. .. code-block:: pycon
  157. >>> from datetime import datetime, timedelta
  158. >>> tomorrow = datetime.utcnow() + timedelta(days=1)
  159. >>> add.apply_async((2, 2), eta=tomorrow)
  160. .. _calling-expiration:
  161. Expiration
  162. ==========
  163. The `expires` argument defines an optional expiry time,
  164. either as seconds after task publish, or a specific date and time using
  165. :class:`~datetime.datetime`:
  166. .. code-block:: pycon
  167. >>> # Task expires after one minute from now.
  168. >>> add.apply_async((10, 10), expires=60)
  169. >>> # Also supports datetime
  170. >>> from datetime import datetime, timedelta
  171. >>> add.apply_async((10, 10), kwargs,
  172. ... expires=datetime.now() + timedelta(days=1)
  173. When a worker receives an expired task it will mark
  174. the task as :state:`REVOKED` (:exc:`~@TaskRevokedError`).
  175. .. _calling-retry:
  176. Message Sending Retry
  177. =====================
  178. Celery will automatically retry sending messages in the event of connection
  179. failure, and retry behavior can be configured -- like how often to retry, or a maximum
  180. number of retries -- or disabled all together.
  181. To disable retry you can set the ``retry`` execution option to :const:`False`:
  182. .. code-block:: python
  183. add.apply_async((2, 2), retry=False)
  184. .. topic:: Related Settings
  185. .. hlist::
  186. :columns: 2
  187. - :setting:`task_publish_retry`
  188. - :setting:`task_publish_retry_policy`
  189. Retry Policy
  190. ------------
  191. A retry policy is a mapping that controls how retries behave,
  192. and can contain the following keys:
  193. - `max_retries`
  194. Maximum number of retries before giving up, in this case the
  195. exception that caused the retry to fail will be raised.
  196. A value of :const:`None` means it will retry forever.
  197. The default is to retry 3 times.
  198. - `interval_start`
  199. Defines the number of seconds (float or integer) to wait between
  200. retries. Default is 0 (the first retry will be instantaneous).
  201. - `interval_step`
  202. On each consecutive retry this number will be added to the retry
  203. delay (float or integer). Default is 0.2.
  204. - `interval_max`
  205. Maximum number of seconds (float or integer) to wait between
  206. retries. Default is 0.2.
  207. For example, the default policy correlates to:
  208. .. code-block:: python
  209. add.apply_async((2, 2), retry=True, retry_policy={
  210. 'max_retries': 3,
  211. 'interval_start': 0,
  212. 'interval_step': 0.2,
  213. 'interval_max': 0.2,
  214. })
  215. the maximum time spent retrying will be 0.4 seconds. It's set relatively
  216. short by default because a connection failure could lead to a retry pile effect
  217. if the broker connection is down -- For example, many web server processes waiting
  218. to retry, blocking other incoming requests.
  219. .. _calling-connection-errors:
  220. Connection Error Handling
  221. =========================
  222. When you send a task and the message transport connection is lost, or
  223. the connection cannot be initiated, an :exc:`~kombu.exceptions.OperationalError`
  224. error will be raised:
  225. .. code-block:: pycon
  226. >>> from proj.tasks import add
  227. >>> add.delay(2, 2)
  228. Traceback (most recent call last):
  229. File "<stdin>", line 1, in <module>
  230. File "celery/app/task.py", line 388, in delay
  231. return self.apply_async(args, kwargs)
  232. File "celery/app/task.py", line 503, in apply_async
  233. **options
  234. File "celery/app/base.py", line 662, in send_task
  235. amqp.send_task_message(P, name, message, **options)
  236. File "celery/backends/rpc.py", line 275, in on_task_call
  237. maybe_declare(self.binding(producer.channel), retry=True)
  238. File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
  239. channel = self._channel = channel()
  240. File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
  241. self.transport.connect()
  242. File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
  243. self._connect(self.host, self.port, self.connect_timeout)
  244. File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
  245. self.sock.connect(sa)
  246. kombu.exceptions.OperationalError: [Errno 61] Connection refused
  247. If you have :ref:`retries <calling-retry>` enabled this will only happen after
  248. retries are exhausted, or when disabled immediately.
  249. You can handle this error too:
  250. .. code-block:: pycon
  251. >>> from celery.utils.log import get_logger
  252. >>> logger = get_logger(__name__)
  253. >>> try:
  254. ... add.delay(2, 2)
  255. ... except add.OperationalError as exc:
  256. ... logger.exception('Sending task raised: %r', exc)
  257. .. _calling-serializers:
  258. Serializers
  259. ===========
  260. .. sidebar:: Security
  261. The pickle module allows for execution of arbitrary functions,
  262. please see the :ref:`security guide <guide-security>`.
  263. Celery also comes with a special serializer that uses
  264. cryptography to sign your messages.
  265. Data transferred between clients and workers needs to be serialized,
  266. so every message in Celery has a ``content_type`` header that
  267. describes the serialization method used to encode it.
  268. The default serializer is :mod:`pickle`, but you can
  269. change this using the :setting:`task_serializer` setting,
  270. or for each individual task, or even per message.
  271. There's built-in support for :mod:`pickle`, `JSON`, `YAML`
  272. and ``msgpack``, and you can also add your own custom serializers by registering
  273. them into the Kombu serializer registry
  274. .. seealso::
  275. :ref:`Message Serialization <kombu:guide-serialization>` in the Kombu user
  276. guide.
  277. Each option has its advantages and disadvantages.
  278. json -- JSON is supported in many programming languages, is now
  279. a standard part of Python (since 2.6), and is fairly fast to decode
  280. using the modern Python libraries, such as :pypi:`simplejson`.
  281. The primary disadvantage to JSON is that it limits you to the following
  282. data types: strings, Unicode, floats, Boolean, dictionaries, and lists.
  283. Decimals and dates are notably missing.
  284. Binary data will be transferred using Base64 encoding,
  285. increasing the size of the transferred data by 34% compared to an encoding
  286. format where native binary types are supported.
  287. However, if your data fits inside the above constraints and you need
  288. cross-language support, the default setting of JSON is probably your
  289. best choice.
  290. See http://json.org for more information.
  291. pickle -- If you have no desire to support any language other than
  292. Python, then using the pickle encoding will gain you the support of
  293. all built-in Python data types (except class instances), smaller
  294. messages when sending binary files, and a slight speedup over JSON
  295. processing.
  296. See :mod:`pickle` for more information.
  297. yaml -- YAML has many of the same characteristics as json,
  298. except that it natively supports more data types (including dates,
  299. recursive references, etc.).
  300. However, the Python libraries for YAML are a good bit slower than the
  301. libraries for JSON.
  302. If you need a more expressive set of data types and need to maintain
  303. cross-language compatibility, then YAML may be a better fit than the above.
  304. See http://yaml.org/ for more information.
  305. msgpack -- msgpack is a binary serialization format that's closer to JSON
  306. in features. It's very young however, and support should be considered
  307. experimental at this point.
  308. See http://msgpack.org/ for more information.
  309. The encoding used is available as a message header, so the worker knows how to
  310. deserialize any task. If you use a custom serializer, this serializer must
  311. be available for the worker.
  312. The following order is used to decide the serializer
  313. used when sending a task:
  314. 1. The `serializer` execution option.
  315. 2. The :attr:`@-Task.serializer` attribute
  316. 3. The :setting:`task_serializer` setting.
  317. Example setting a custom serializer for a single task invocation:
  318. .. code-block:: pycon
  319. >>> add.apply_async((10, 10), serializer='json')
  320. .. _calling-compression:
  321. Compression
  322. ===========
  323. Celery can compress the messages using either *gzip*, or *bzip2*.
  324. You can also create your own compression schemes and register
  325. them in the :func:`kombu compression registry <kombu.compression.register>`.
  326. The following order is used to decide the compression scheme
  327. used when sending a task:
  328. 1. The `compression` execution option.
  329. 2. The :attr:`@-Task.compression` attribute.
  330. 3. The :setting:`task_compression` attribute.
  331. Example specifying the compression used when calling a task::
  332. >>> add.apply_async((2, 2), compression='zlib')
  333. .. _calling-connections:
  334. Connections
  335. ===========
  336. .. sidebar:: Automatic Pool Support
  337. Since version 2.3 there's support for automatic connection pools,
  338. so you don't have to manually handle connections and publishers
  339. to reuse connections.
  340. The connection pool is enabled by default since version 2.5.
  341. See the :setting:`broker_pool_limit` setting for more information.
  342. You can handle the connection manually by creating a
  343. publisher:
  344. .. code-block:: python
  345. results = []
  346. with add.app.pool.acquire(block=True) as connection:
  347. with add.get_publisher(connection) as publisher:
  348. try:
  349. for args in numbers:
  350. res = add.apply_async((2, 2), publisher=publisher)
  351. results.append(res)
  352. print([res.get() for res in results])
  353. Though this particular example is much better expressed as a group:
  354. .. code-block:: pycon
  355. >>> from celery import group
  356. >>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
  357. >>> res = group(add.s(i, j) for i, j in numbers).apply_async()
  358. >>> res.get()
  359. [4, 8, 16, 32]
  360. .. _calling-routing:
  361. Routing options
  362. ===============
  363. Celery can route tasks to different queues.
  364. Simple routing (name <-> name) is accomplished using the ``queue`` option::
  365. add.apply_async(queue='priority.high')
  366. You can then assign workers to the ``priority.high`` queue by using
  367. the workers :option:`-Q <celery worker -Q>` argument:
  368. .. code-block:: console
  369. $ celery -A proj worker -l info -Q celery,priority.high
  370. .. seealso::
  371. Hard-coding queue names in code isn't recommended, the best practice
  372. is to use configuration routers (:setting:`task_routes`).
  373. To find out more about routing, please see :ref:`guide-routing`.
  374. Advanced Options
  375. ----------------
  376. These options are for advanced users who want to take use of
  377. AMQP's full routing capabilities. Interested parties may read the
  378. :ref:`routing guide <guide-routing>`.
  379. - exchange
  380. Name of exchange (or a :class:`kombu.entity.Exchange`) to
  381. send the message to.
  382. - routing_key
  383. Routing key used to determine.
  384. - priority
  385. A number between `0` and `255`, where `255` is the highest priority.
  386. Supported by: RabbitMQ, Redis (priority reversed, 0 is highest).