protov2.rst 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. =======================================
  2. Task Message Protocol v2 Draft Spec.
  3. =======================================
  4. Notes
  5. =====
  6. - Support for multiple languages via the ``lang`` header.
  7. Worker may redirect the message to a worker that supports
  8. the language.
  9. - Metadata moved to headers.
  10. This means that workers/intermediates can inspect the message
  11. and make decisions based on the headers without decoding
  12. the payload (which may be language specific, e.g. serialized by the
  13. Python specific pickle serializer).
  14. - Body is only for language specific data.
  15. - Python stores args/kwargs in body.
  16. - If a message uses raw encoding then the raw data
  17. will be passed as a single argument to the function.
  18. - Java/C, etc. can use a thrift/protobuf document as the body
  19. - Dispatches to actor based on ``c_type``, ``c_meth`` headers
  20. ``c_meth`` is unused by python, but may be used in the future
  21. to specify class+method pairs.
  22. - Chain gains a dedicated field.
  23. Reducing the chain into a recursive ``callbacks`` argument
  24. causes problems when the recursion limit is exceeded.
  25. This is fixed in the new message protocol by specifying
  26. a list of signatures, each task will then pop a task off the list
  27. when sending the next message::
  28. execute_task(message)
  29. chain = message.headers['chain']
  30. if chain:
  31. sig = maybe_signature(chain.pop())
  32. sig.apply_async(chain=chain)
  33. - ``correlation_id`` replaces ``task_id`` field.
  34. Undecided
  35. ---------
  36. - May consider moving callbacks/errbacks/chain into body.
  37. Will huge lists in headers cause overhead?
  38. The downside of keeping them in the body is that intermediates
  39. won't be able to introspect these values.
  40. Definition
  41. ==========
  42. .. code-block:: python
  43. # protocol v2 implies UTC=True
  44. # 'class' header existing means protocol is v2
  45. properties = {
  46. 'correlation_id': (uuid)task_id,
  47. 'content_type': (string)mime,
  48. 'content_encoding': (string)encoding,
  49. # optional
  50. 'reply_to': (string)queue_or_url,
  51. }
  52. headers = {
  53. 'lang': (string)'py'
  54. 'c_type': (string)task,
  55. # optional
  56. 'c_meth': (string)'',
  57. 'eta': (iso8601)eta,
  58. 'expires'; (iso8601)expires,
  59. 'callbacks': (list)Signature,
  60. 'errbacks': (list)Signature,
  61. 'chain': (list)Signature, # non-recursive, reversed list of signatures
  62. 'group': (uuid)group_id,
  63. 'chord': (uuid)chord_id,
  64. 'retries': (int)retries,
  65. 'timelimit': (tuple)(soft, hard),
  66. }
  67. body = (args, kwargs)
  68. Example
  69. =======
  70. .. code-block:: python
  71. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  72. task_id = uuid()
  73. basic_publish(
  74. message=json.dumps([[2, 2], {}]),
  75. application_headers={
  76. 'lang': 'py',
  77. 'c_type': 'proj.tasks.add',
  78. 'chain': [
  79. # reversed chain list
  80. {'task': 'proj.tasks.add', 'args': (8, )},
  81. {'task': 'proj.tasks.add', 'args': (4, )},
  82. ]
  83. }
  84. properties={
  85. 'correlation_id': task_id,
  86. 'content_type': 'application/json',
  87. 'content_encoding': 'utf-8',
  88. }
  89. )