protocol.rst 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. .. _message-protocol:
  2. ===================
  3. Message Protocol
  4. ===================
  5. .. contents::
  6. :local:
  7. .. _message-protocol-task:
  8. .. _internals-task-message-protocol:
  9. Task messages
  10. =============
  11. .. _message-protocol-task-v2:
  12. Version 2
  13. ---------
  14. Definition
  15. ~~~~~~~~~~
  16. .. code-block:: python
  17. # protocol v2 implies UTC=True
  18. # 'class' header existing means protocol is v2
  19. properties = {
  20. 'correlation_id': uuid task_id,
  21. 'content_type': string mimetype,
  22. 'content_encoding': string encoding,
  23. # optional
  24. 'reply_to': string queue_or_url,
  25. }
  26. headers = {
  27. 'lang': string 'py'
  28. 'task': string task,
  29. 'id': uuid task_id,
  30. 'root_id': uuid root_id,
  31. 'parent_id': uuid parent_id,
  32. 'group': uuid group_id,
  33. # optional
  34. 'meth': string method_name,
  35. 'shadow': string alias_name,
  36. 'eta': iso8601 eta,
  37. 'expires'; iso8601 expires,
  38. 'retries': int retries,
  39. 'timelimit': (soft, hard),
  40. }
  41. body = (
  42. object[] args,
  43. Mapping kwargs,
  44. Mapping embed {
  45. 'callbacks': Signature[] callbacks,
  46. 'errbacks': Signature[] errbacks,
  47. 'chain': Signature[] chain,
  48. 'chord': Signature chord_callback,
  49. }
  50. )
  51. Example
  52. ~~~~~~~
  53. This example sends a task message using version 2 of the protocol:
  54. .. code-block:: python
  55. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  56. task_id = uuid()
  57. basic_publish(
  58. message=json.dumps(([2, 2], {}, None),
  59. application_headers={
  60. 'lang': 'py',
  61. 'task': 'proj.tasks.add',
  62. }
  63. properties={
  64. 'correlation_id': task_id,
  65. 'content_type': 'application/json',
  66. 'content_encoding': 'utf-8',
  67. }
  68. )
  69. Changes from version 1
  70. ~~~~~~~~~~~~~~~~~~~~~~
  71. - Protocol version detected by the presence of a ``task`` message header.
  72. - Support for multiple languages via the ``lang`` header.
  73. Worker may redirect the message to a worker that supports
  74. the language.
  75. - Metadata moved to headers.
  76. This means that workers/intermediates can inspect the message
  77. and make decisions based on the headers without decoding
  78. the payload (which may be language specific, e.g. serialized by the
  79. Python specific pickle serializer).
  80. - Body is only for language specific data.
  81. - Python stores args/kwargs and embedded signatures in body.
  82. - If a message uses raw encoding then the raw data
  83. will be passed as a single argument to the function.
  84. - Java/C, etc. can use a thrift/protobuf document as the body
  85. - Dispatches to actor based on ``task``, ``meth`` headers
  86. ``meth`` is unused by python, but may be used in the future
  87. to specify class+method pairs.
  88. - Chain gains a dedicated field.
  89. Reducing the chain into a recursive ``callbacks`` argument
  90. causes problems when the recursion limit is exceeded.
  91. This is fixed in the new message protocol by specifying
  92. a list of signatures, each task will then pop a task off the list
  93. when sending the next message::
  94. execute_task(message)
  95. chain = embed['chain']
  96. if chain:
  97. sig = maybe_signature(chain.pop())
  98. sig.apply_async(chain=chain)
  99. - ``correlation_id`` replaces ``task_id`` field.
  100. - ``root_id`` and ``parent_id`` fields helps keep track of workflows.
  101. - ``shadow`` lets you specify a different name for logs, monitors
  102. can be used for e.g. meta tasks that calls any function::
  103. from celery.utils.imports import qualname
  104. class PickleTask(Task):
  105. abstract = True
  106. def unpack_args(self, fun, args=()):
  107. return fun, args
  108. def apply_async(self, args, kwargs, **options):
  109. fun, real_args = self.unpack_args(*args)
  110. return super(PickleTask, self).apply_async(
  111. (fun, real_args, kwargs), shadow=qualname(fun), **options
  112. )
  113. @app.task(base=PickleTask)
  114. def call(fun, args, kwargs):
  115. return fun(*args, **kwargs)
  116. .. _message-protocol-task-v1:
  117. .. _task-message-protocol-v1:
  118. Version 1
  119. ---------
  120. In version 1 of the protocol all fields are stored in the message body,
  121. which means workers and intermediate consumers must deserialize the payload
  122. to read the fields.
  123. Message body
  124. ~~~~~~~~~~~~
  125. * task
  126. :`string`:
  127. Name of the task. **required**
  128. * id
  129. :`string`:
  130. Unique id of the task (UUID). **required**
  131. * args
  132. :`list`:
  133. List of arguments. Will be an empty list if not provided.
  134. * kwargs
  135. :`dictionary`:
  136. Dictionary of keyword arguments. Will be an empty dictionary if not
  137. provided.
  138. * retries
  139. :`int`:
  140. Current number of times this task has been retried.
  141. Defaults to `0` if not specified.
  142. * eta
  143. :`string` (ISO 8601):
  144. Estimated time of arrival. This is the date and time in ISO 8601
  145. format. If not provided the message is not scheduled, but will be
  146. executed asap.
  147. * expires
  148. :`string` (ISO 8601):
  149. .. versionadded:: 2.0.2
  150. Expiration date. This is the date and time in ISO 8601 format.
  151. If not provided the message will never expire. The message
  152. will be expired when the message is received and the expiration date
  153. has been exceeded.
  154. * taskset
  155. :`string`:
  156. The taskset this task is part of (if any).
  157. * chord
  158. :`Signature`:
  159. .. versionadded:: 2.3
  160. Signifies that this task is one of the header parts of a chord. The value
  161. of this key is the body of the cord that should be executed when all of
  162. the tasks in the header has returned.
  163. * utc
  164. :`bool`:
  165. .. versionadded:: 2.5
  166. If true time uses the UTC timezone, if not the current local timezone
  167. should be used.
  168. * callbacks
  169. :`<list>Signature`:
  170. .. versionadded:: 3.0
  171. A list of signatures to call if the task exited successfully.
  172. * errbacks
  173. :`<list>Signature`:
  174. .. versionadded:: 3.0
  175. A list of signatures to call if an error occurs while executing the task.
  176. * timelimit
  177. :`<tuple>(float, float)`:
  178. .. versionadded:: 3.1
  179. Task execution time limit settings. This is a tuple of hard and soft time
  180. limit value (`int`/`float` or :const:`None` for no limit).
  181. Example value specifying a soft time limit of 3 seconds, and a hard time
  182. limt of 10 seconds::
  183. {'timelimit': (3.0, 10.0)}
  184. Example message
  185. ~~~~~~~~~~~~~~~
  186. This is an example invocation of a `celery.task.ping` task in JSON
  187. format:
  188. .. code-block:: javascript
  189. {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
  190. "task": "celery.task.PingTask",
  191. "args": [],
  192. "kwargs": {},
  193. "retries": 0,
  194. "eta": "2009-11-17T12:30:56.527191"}
  195. Task Serialization
  196. ------------------
  197. Several types of serialization formats are supported using the
  198. `content_type` message header.
  199. The MIME-types supported by default are shown in the following table.
  200. =============== =================================
  201. Scheme MIME Type
  202. =============== =================================
  203. json application/json
  204. yaml application/x-yaml
  205. pickle application/x-python-serialize
  206. msgpack application/x-msgpack
  207. =============== =================================
  208. .. _message-protocol-event:
  209. Event Messages
  210. ==============
  211. Event messages are always JSON serialized and can contain arbitrary message
  212. body fields.
  213. Since version 3.2. the body can consist of either a single mapping (one event),
  214. or a list of mappings (multiple events).
  215. There are also standard fields that must always be present in an event
  216. message:
  217. Standard body fields
  218. --------------------
  219. - *string* ``type``
  220. The type of event. This is a string containing the *category* and
  221. *action* separated by a dash delimeter (e.g. ``task-succeeded``).
  222. - *string* ``hostname``
  223. The fully qualified hostname of where the event occurred at.
  224. - *unsigned long long* ``clock``
  225. The logical clock value for this event (Lamport timestamp).
  226. - *float* ``timestamp``
  227. The UNIX timestamp corresponding to the time of when the event occurred.
  228. - *signed short* ``utcoffset``
  229. This field describes the timezone of the originating host, and is
  230. specified as the number of hours ahead of/behind UTC. E.g. ``-2`` or
  231. ``+1``.
  232. - *unsigned long long* ``pid``
  233. The process id of the process the event originated in.
  234. Standard event types
  235. --------------------
  236. For a list of standard event types and their fields see the
  237. :ref:`event-reference`.
  238. Example message
  239. ---------------
  240. This is the message fields for a ``task-succeeded`` event:
  241. .. code-block:: python
  242. properties = {
  243. 'routing_key': 'task.succeeded',
  244. 'exchange': 'celeryev',
  245. 'content_type': 'application/json',
  246. 'content_encoding': 'utf-8',
  247. 'delivery_mode': 1,
  248. }
  249. headers = {
  250. 'hostname': 'worker1@george.vandelay.com',
  251. }
  252. body = {
  253. 'type': 'task-succeeded',
  254. 'hostname': 'worker1@george.vandelay.com',
  255. 'pid': 6335,
  256. 'clock': 393912923921,
  257. 'timestamp': 1401717709.101747,
  258. 'utcoffset': -1,
  259. 'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
  260. 'retval': '4',
  261. 'runtime': 0.0003212,
  262. )