protov2.rst 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. .. _protov2draft:
  2. ========================================
  3. Task Message Protocol v2 (Draft Spec.)
  4. ========================================
  5. Notes
  6. =====
  7. - Support for multiple languages via the ``lang`` header.
  8. Worker may redirect the message to a worker that supports
  9. the language.
  10. - Metadata moved to headers.
  11. This means that workers/intermediates can inspect the message
  12. and make decisions based on the headers without decoding
  13. the payload (which may be language specific, e.g. serialized by the
  14. Python specific pickle serializer).
  15. - Body is only for language specific data.
  16. - Python stores args/kwargs in body.
  17. - If a message uses raw encoding then the raw data
  18. will be passed as a single argument to the function.
  19. - Java/C, etc. can use a thrift/protobuf document as the body
  20. - Dispatches to actor based on ``c_type``, ``c_meth`` headers
  21. ``c_meth`` is unused by python, but may be used in the future
  22. to specify class+method pairs.
  23. - Chain gains a dedicated field.
  24. Reducing the chain into a recursive ``callbacks`` argument
  25. causes problems when the recursion limit is exceeded.
  26. This is fixed in the new message protocol by specifying
  27. a list of signatures, each task will then pop a task off the list
  28. when sending the next message::
  29. execute_task(message)
  30. chain = message.headers['chain']
  31. if chain:
  32. sig = maybe_signature(chain.pop())
  33. sig.apply_async(chain=chain)
  34. - ``correlation_id`` replaces ``task_id`` field.
  35. Undecided
  36. ---------
  37. - May consider moving callbacks/errbacks/chain into body.
  38. Will huge lists in headers cause overhead?
  39. The downside of keeping them in the body is that intermediates
  40. won't be able to introspect these values.
  41. Definition
  42. ==========
  43. .. code-block:: python
  44. # protocol v2 implies UTC=True
  45. # 'class' header existing means protocol is v2
  46. properties = {
  47. 'correlation_id': (uuid)task_id,
  48. 'content_type': (string)mime,
  49. 'content_encoding': (string)encoding,
  50. # optional
  51. 'reply_to': (string)queue_or_url,
  52. }
  53. headers = {
  54. 'lang': (string)'py'
  55. 'c_type': (string)task,
  56. # optional
  57. 'c_meth': (string)'',
  58. 'eta': (iso8601)eta,
  59. 'expires'; (iso8601)expires,
  60. 'callbacks': (list)Signature,
  61. 'errbacks': (list)Signature,
  62. 'chain': (list)Signature, # non-recursive, reversed list of signatures
  63. 'group': (uuid)group_id,
  64. 'chord': (uuid)chord_id,
  65. 'retries': (int)retries,
  66. 'timelimit': (tuple)(soft, hard),
  67. }
  68. body = (args, kwargs)
  69. Example
  70. =======
  71. .. code-block:: python
  72. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  73. task_id = uuid()
  74. basic_publish(
  75. message=json.dumps([[2, 2], {}]),
  76. application_headers={
  77. 'lang': 'py',
  78. 'c_type': 'proj.tasks.add',
  79. 'chain': [
  80. # reversed chain list
  81. {'task': 'proj.tasks.add', 'args': (8, )},
  82. {'task': 'proj.tasks.add', 'args': (4, )},
  83. ]
  84. }
  85. properties={
  86. 'correlation_id': task_id,
  87. 'content_type': 'application/json',
  88. 'content_encoding': 'utf-8',
  89. }
  90. )