protocol.rst 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. .. _message-protocol:
  2. ===================
  3. Message Protocol
  4. ===================
  5. .. contents::
  6. :local:
  7. .. _message-protocol-task:
  8. .. _internals-task-message-protocol:
  9. Task messages
  10. =============
  11. .. _message-protocol-task-v2:
  12. Version 2
  13. ---------
  14. Definition
  15. ~~~~~~~~~~
  16. .. code-block:: python
  17. properties = {
  18. 'correlation_id': uuid task_id,
  19. 'content_type': string mimetype,
  20. 'content_encoding': string encoding,
  21. # optional
  22. 'reply_to': string queue_or_url,
  23. }
  24. headers = {
  25. 'lang': string 'py'
  26. 'task': string task,
  27. 'id': uuid task_id,
  28. 'root_id': uuid root_id,
  29. 'parent_id': uuid parent_id,
  30. 'group': uuid group_id,
  31. # optional
  32. 'meth': string method_name,
  33. 'shadow': string alias_name,
  34. 'eta': iso8601 ETA,
  35. 'expires': iso8601 expires,
  36. 'retries': int retries,
  37. 'timelimit': (soft, hard),
  38. 'argsrepr': str repr(args),
  39. 'kwargsrepr': str repr(kwargs),
  40. 'origin': str nodename,
  41. }
  42. body = (
  43. object[] args,
  44. Mapping kwargs,
  45. Mapping embed {
  46. 'callbacks': Signature[] callbacks,
  47. 'errbacks': Signature[] errbacks,
  48. 'chain': Signature[] chain,
  49. 'chord': Signature chord_callback,
  50. }
  51. )
  52. Example
  53. ~~~~~~~
  54. This example sends a task message using version 2 of the protocol:
  55. .. code-block:: python
  56. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  57. import json
  58. import os
  59. import socket
  60. task_id = uuid()
  61. args = (2, 2)
  62. kwargs = {}
  63. basic_publish(
  64. message=json.dumps((args, kwargs, None),
  65. application_headers={
  66. 'lang': 'py',
  67. 'task': 'proj.tasks.add',
  68. 'argsrepr': repr(args),
  69. 'kwargsrepr': repr(kwargs),
  70. 'origin': '@'.join([os.getpid(), socket.gethostname()])
  71. }
  72. properties={
  73. 'correlation_id': task_id,
  74. 'content_type': 'application/json',
  75. 'content_encoding': 'utf-8',
  76. }
  77. )
  78. Changes from version 1
  79. ~~~~~~~~~~~~~~~~~~~~~~
  80. - Protocol version detected by the presence of a ``task`` message header.
  81. - Support for multiple languages via the ``lang`` header.
  82. Worker may redirect the message to a worker that supports
  83. the language.
  84. - Meta-data moved to headers.
  85. This means that workers/intermediates can inspect the message
  86. and make decisions based on the headers without decoding
  87. the payload (that may be language specific, for example serialized by the
  88. Python specific pickle serializer).
  89. - Always UTC
  90. There's no ``utc`` flag anymore, so any time information missing timezone
  91. will be expected to be in UTC time.
  92. - Body is only for language specific data.
  93. - Python stores args/kwargs and embedded signatures in body.
  94. - If a message uses raw encoding then the raw data
  95. will be passed as a single argument to the function.
  96. - Java/C, etc. can use a Thrift/protobuf document as the body
  97. - ``origin`` is the name of the node sending the task.
  98. - Dispatches to actor based on ``task``, ``meth`` headers
  99. ``meth`` is unused by Python, but may be used in the future
  100. to specify class+method pairs.
  101. - Chain gains a dedicated field.
  102. Reducing the chain into a recursive ``callbacks`` argument
  103. causes problems when the recursion limit is exceeded.
  104. This is fixed in the new message protocol by specifying
  105. a list of signatures, each task will then pop a task off the list
  106. when sending the next message:
  107. .. code-block:: python
  108. execute_task(message)
  109. chain = embed['chain']
  110. if chain:
  111. sig = maybe_signature(chain.pop())
  112. sig.apply_async(chain=chain)
  113. - ``correlation_id`` replaces ``task_id`` field.
  114. - ``root_id`` and ``parent_id`` fields helps keep track of work-flows.
  115. - ``shadow`` lets you specify a different name for logs, monitors
  116. can be used for concepts like tasks that calls a function
  117. specified as argument:
  118. .. code-block:: python
  119. from celery.utils.imports import qualname
  120. class PickleTask(Task):
  121. def unpack_args(self, fun, args=()):
  122. return fun, args
  123. def apply_async(self, args, kwargs, **options):
  124. fun, real_args = self.unpack_args(*args)
  125. return super(PickleTask, self).apply_async(
  126. (fun, real_args, kwargs), shadow=qualname(fun), **options
  127. )
  128. @app.task(base=PickleTask)
  129. def call(fun, args, kwargs):
  130. return fun(*args, **kwargs)
  131. .. _message-protocol-task-v1:
  132. .. _task-message-protocol-v1:
  133. Version 1
  134. ---------
  135. In version 1 of the protocol all fields are stored in the message body:
  136. meaning workers and intermediate consumers must deserialize the payload
  137. to read the fields.
  138. Message body
  139. ~~~~~~~~~~~~
  140. * ``task``
  141. :`string`:
  142. Name of the task. **required**
  143. * ``id``
  144. :`string`:
  145. Unique id of the task (UUID). **required**
  146. * ``args``
  147. :`list`:
  148. List of arguments. Will be an empty list if not provided.
  149. * ``kwargs``
  150. :`dictionary`:
  151. Dictionary of keyword arguments. Will be an empty dictionary if not
  152. provided.
  153. * ``retries``
  154. :`int`:
  155. Current number of times this task has been retried.
  156. Defaults to `0` if not specified.
  157. * ``eta``
  158. :`string` (ISO 8601):
  159. Estimated time of arrival. This is the date and time in ISO 8601
  160. format. If not provided the message isn't scheduled, but will be
  161. executed asap.
  162. * ``expires``
  163. :`string` (ISO 8601):
  164. .. versionadded:: 2.0.2
  165. Expiration date. This is the date and time in ISO 8601 format.
  166. If not provided the message will never expire. The message
  167. will be expired when the message is received and the expiration date
  168. has been exceeded.
  169. * ``taskset``
  170. :`string`:
  171. The group this task is part of (if any).
  172. * ``chord``
  173. :`Signature`:
  174. .. versionadded:: 2.3
  175. Signifies that this task is one of the header parts of a chord. The value
  176. of this key is the body of the cord that should be executed when all of
  177. the tasks in the header has returned.
  178. * ``utc``
  179. :`bool`:
  180. .. versionadded:: 2.5
  181. If true time uses the UTC timezone, if not the current local timezone
  182. should be used.
  183. * ``callbacks``
  184. :`<list>Signature`:
  185. .. versionadded:: 3.0
  186. A list of signatures to call if the task exited successfully.
  187. * ``errbacks``
  188. :`<list>Signature`:
  189. .. versionadded:: 3.0
  190. A list of signatures to call if an error occurs while executing the task.
  191. * ``timelimit``
  192. :`<tuple>(float, float)`:
  193. .. versionadded:: 3.1
  194. Task execution time limit settings. This is a tuple of hard and soft time
  195. limit value (`int`/`float` or :const:`None` for no limit).
  196. Example value specifying a soft time limit of 3 seconds, and a hard time
  197. limit of 10 seconds::
  198. {'timelimit': (3.0, 10.0)}
  199. Example message
  200. ~~~~~~~~~~~~~~~
  201. This is an example invocation of a `celery.task.ping` task in json
  202. format:
  203. .. code-block:: javascript
  204. {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
  205. "task": "celery.task.PingTask",
  206. "args": [],
  207. "kwargs": {},
  208. "retries": 0,
  209. "eta": "2009-11-17T12:30:56.527191"}
  210. Task Serialization
  211. ------------------
  212. Several types of serialization formats are supported using the
  213. `content_type` message header.
  214. The MIME-types supported by default are shown in the following table.
  215. =============== =================================
  216. Scheme MIME Type
  217. =============== =================================
  218. json application/json
  219. yaml application/x-yaml
  220. pickle application/x-python-serialize
  221. msgpack application/x-msgpack
  222. =============== =================================
  223. .. _message-protocol-event:
  224. Event Messages
  225. ==============
  226. Event messages are always JSON serialized and can contain arbitrary message
  227. body fields.
  228. Since version 4.0. the body can consist of either a single mapping (one event),
  229. or a list of mappings (multiple events).
  230. There are also standard fields that must always be present in an event
  231. message:
  232. Standard body fields
  233. --------------------
  234. - *string* ``type``
  235. The type of event. This is a string containing the *category* and
  236. *action* separated by a dash delimiter (e.g., ``task-succeeded``).
  237. - *string* ``hostname``
  238. The fully qualified hostname of where the event occurred at.
  239. - *unsigned long long* ``clock``
  240. The logical clock value for this event (Lamport time-stamp).
  241. - *float* ``timestamp``
  242. The UNIX time-stamp corresponding to the time of when the event occurred.
  243. - *signed short* ``utcoffset``
  244. This field describes the timezone of the originating host, and is
  245. specified as the number of hours ahead of/behind UTC (e.g., -2 or
  246. +1).
  247. - *unsigned long long* ``pid``
  248. The process id of the process the event originated in.
  249. Standard event types
  250. --------------------
  251. For a list of standard event types and their fields see the
  252. :ref:`event-reference`.
  253. Example message
  254. ---------------
  255. This is the message fields for a ``task-succeeded`` event:
  256. .. code-block:: python
  257. properties = {
  258. 'routing_key': 'task.succeeded',
  259. 'exchange': 'celeryev',
  260. 'content_type': 'application/json',
  261. 'content_encoding': 'utf-8',
  262. 'delivery_mode': 1,
  263. }
  264. headers = {
  265. 'hostname': 'worker1@george.vandelay.com',
  266. }
  267. body = {
  268. 'type': 'task-succeeded',
  269. 'hostname': 'worker1@george.vandelay.com',
  270. 'pid': 6335,
  271. 'clock': 393912923921,
  272. 'timestamp': 1401717709.101747,
  273. 'utcoffset': -1,
  274. 'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
  275. 'retval': '4',
  276. 'runtime': 0.0003212,
  277. )