protov2.rst 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. =======================================
  2. Task Message Protocol v2 Draft Spec.
  3. =======================================
  4. Notes
  5. =====
  6. - Support for multiple languages via the ``lang`` header.
  7. Worker may redirect the message to a worker that supports
  8. the language.
  9. - Metadata moved to headers.
  10. This means that workers/intermediates can inspect the message
  11. and make decisions based on the headers without decoding
  12. the payload (which may be language specific, e.g. serialized by the
  13. Python specific pickle serializer).
  14. - Body is only for language specific data.
  15. - Python stores args/kwargs in body.
  16. - If a message uses raw encoding then the raw data
  17. will be passed as a single argument to the function.
  18. - Java/C, etc can use a thrift/protobuf document as the body
  19. - Dispatches to actor based on ``c_type``, ``c_meth`` headers
  20. c_meth is unsued by python, but may be used in the future
  21. to specify class+method pairs.
  22. - Chain gains a dedicated field.
  23. Reducing the chain into a recursive ``callbacks`` argument
  24. causes problems when the recursion limit is exceeded.
  25. This is fixed in the new message protocol by specifying
  26. a list of chords, each task will then pop a chain off the list
  27. when sending the next message::
  28. execute_task(message)
  29. chain = message.headers['chain']
  30. if chain:
  31. sig = maybe_signature(chain.pop())
  32. sig.apply_async(chain=chain)
  33. - ``correlation_id`` replaces task_id ``field``.
  34. Undecided
  35. ---------
  36. - May consider moving callbacks/errbacks/chain into body.
  37. It's unknown if huge lists in headers will cause messaging overhead,
  38. but the downside of moving them into the body is that intermiediates
  39. will not be able to introspect these values.
  40. Definition
  41. ==========
  42. .. code-block:: python
  43. # protocol v2 implies UTC=True
  44. # 'class' header existing means protocol is v2
  45. properties = {
  46. 'correlation_id': (uuid)task_id,
  47. 'content_type': (string)mime,
  48. 'content_encoding': (string)encoding,
  49. # optional
  50. 'reply_to': (string)queue_or_url,
  51. }
  52. headers = {
  53. 'lang': (string)'py'
  54. 'c_type': (string)task,
  55. # optional
  56. 'c_meth': (string)'',
  57. 'eta': (iso8601)eta,
  58. 'expires'; (iso8601)expires,
  59. 'callbacks': (list)Signature,
  60. 'errbacks': (list)Signature,
  61. 'chain': (list)Signature, # non-recursive, reversed list of signatures
  62. 'group': (uuid)group_id,
  63. 'chord': (uuid)chord_id,
  64. 'retries': (int)retries,
  65. 'time'
  66. }
  67. body = (args, kwargs)
  68. Example
  69. =======
  70. .. code-block:: python
  71. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  72. task_id = uuid()
  73. basic_publish(
  74. message=json.dumps([[2, 2], {}]),
  75. application_headers={
  76. 'lang': 'py',
  77. 'c_type': 'proj.tasks.add',
  78. 'chain': [
  79. # reversed chain list
  80. {'task': 'proj.tasks.add', 'args': (8, )},
  81. {'task': 'proj.tasks.add', 'args': (4, )},
  82. ]
  83. }
  84. properties={
  85. 'correlation_id': task_id,
  86. 'content_type': 'application/json',
  87. 'content_encoding': 'utf-8',
  88. }
  89. )