protov2.rst 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. .. _protov2draft:
  2. ========================================
  3. Task Message Protocol v2 (Draft Spec.)
  4. ========================================
  5. Notes
  6. =====
  7. - Support for multiple languages via the ``lang`` header.
  8. Worker may redirect the message to a worker that supports
  9. the language.
  10. - Metadata moved to headers.
  11. This means that workers/intermediates can inspect the message
  12. and make decisions based on the headers without decoding
  13. the payload (which may be language specific, e.g. serialized by the
  14. Python specific pickle serializer).
  15. - Body is only for language specific data.
  16. - Python stores args/kwargs in body.
  17. - If a message uses raw encoding then the raw data
  18. will be passed as a single argument to the function.
  19. - Java/C, etc. can use a thrift/protobuf document as the body
  20. - Dispatches to actor based on ``c_type``, ``c_meth`` headers
  21. ``c_meth`` is unused by python, but may be used in the future
  22. to specify class+method pairs.
  23. - Chain gains a dedicated field.
  24. Reducing the chain into a recursive ``callbacks`` argument
  25. causes problems when the recursion limit is exceeded.
  26. This is fixed in the new message protocol by specifying
  27. a list of signatures, each task will then pop a task off the list
  28. when sending the next message::
  29. execute_task(message)
  30. chain = message.headers['chain']
  31. if chain:
  32. sig = maybe_signature(chain.pop())
  33. sig.apply_async(chain=chain)
  34. - ``correlation_id`` replaces ``task_id`` field.
  35. - ``c_shadow`` lets you specify a different name for logs, monitors
  36. can be used for e.g. meta tasks that calls any function::
  37. from celery.utils.imports import qualname
  38. class PickleTask(Task):
  39. abstract = True
  40. def unpack_args(self, fun, args=()):
  41. return fun, args
  42. def apply_async(self, args, kwargs, **options):
  43. fun, real_args = self.unpack_args(*args)
  44. return super(PickleTask, self).apply_async(
  45. (fun, real_args, kwargs), shadow=qualname(fun), **options
  46. )
  47. @app.task(base=PickleTask)
  48. def call(fun, args, kwargs):
  49. return fun(*args, **kwargs)
  50. Undecided
  51. ---------
  52. - May consider moving callbacks/errbacks/chain into body.
  53. Will huge lists in headers cause overhead?
  54. The downside of keeping them in the body is that intermediates
  55. won't be able to introspect these values.
  56. Definition
  57. ==========
  58. .. code-block:: python
  59. # protocol v2 implies UTC=True
  60. # 'class' header existing means protocol is v2
  61. properties = {
  62. 'correlation_id': (uuid)task_id,
  63. 'content_type': (string)mime,
  64. 'content_encoding': (string)encoding,
  65. # optional
  66. 'reply_to': (string)queue_or_url,
  67. }
  68. headers = {
  69. 'lang': (string)'py'
  70. 'c_type': (string)task,
  71. # optional
  72. 'c_meth': (string)unused,
  73. 'c_shadow': (string)replace_name,
  74. 'eta': (iso8601)eta,
  75. 'expires'; (iso8601)expires,
  76. 'callbacks': (list)Signature,
  77. 'errbacks': (list)Signature,
  78. 'chain': (list)Signature, # non-recursive, reversed list of signatures
  79. 'group': (uuid)group_id,
  80. 'chord': (uuid)chord_id,
  81. 'retries': (int)retries,
  82. 'timelimit': (tuple)(soft, hard),
  83. }
  84. body = (args, kwargs)
  85. Example
  86. =======
  87. .. code-block:: python
  88. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  89. task_id = uuid()
  90. basic_publish(
  91. message=json.dumps([[2, 2], {}]),
  92. application_headers={
  93. 'lang': 'py',
  94. 'c_type': 'proj.tasks.add',
  95. 'chain': [
  96. # reversed chain list
  97. {'task': 'proj.tasks.add', 'args': (8, )},
  98. {'task': 'proj.tasks.add', 'args': (4, )},
  99. ]
  100. }
  101. properties={
  102. 'correlation_id': task_id,
  103. 'content_type': 'application/json',
  104. 'content_encoding': 'utf-8',
  105. }
  106. )