protov2.rst 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. .. _protov2draft:
  2. ========================================
  3. Task Message Protocol v2 (Draft Spec.)
  4. ========================================
  5. Notes
  6. =====
  7. - Support for multiple languages via the ``lang`` header.
  8. Worker may redirect the message to a worker that supports
  9. the language.
  10. - Metadata moved to headers.
  11. This means that workers/intermediates can inspect the message
  12. and make decisions based on the headers without decoding
  13. the payload (which may be language specific, e.g. serialized by the
  14. Python specific pickle serializer).
  15. - Body is only for language specific data.
  16. - Python stores args/kwargs in body.
  17. - If a message uses raw encoding then the raw data
  18. will be passed as a single argument to the function.
  19. - Java/C, etc. can use a thrift/protobuf document as the body
  20. - Dispatches to actor based on ``task``, ``meth`` headers
  21. ``meth`` is unused by python, but may be used in the future
  22. to specify class+method pairs.
  23. - Chain gains a dedicated field.
  24. Reducing the chain into a recursive ``callbacks`` argument
  25. causes problems when the recursion limit is exceeded.
  26. This is fixed in the new message protocol by specifying
  27. a list of signatures, each task will then pop a task off the list
  28. when sending the next message::
  29. execute_task(message)
  30. chain = message.headers['chain']
  31. if chain:
  32. sig = maybe_signature(chain.pop())
  33. sig.apply_async(chain=chain)
  34. - ``correlation_id`` replaces ``task_id`` field.
  35. - ``root_id`` and ``parent_id`` fields helps keep track of workflows.
  36. - ``shadow`` lets you specify a different name for logs, monitors
  37. can be used for e.g. meta tasks that calls any function::
  38. from celery.utils.imports import qualname
  39. class PickleTask(Task):
  40. abstract = True
  41. def unpack_args(self, fun, args=()):
  42. return fun, args
  43. def apply_async(self, args, kwargs, **options):
  44. fun, real_args = self.unpack_args(*args)
  45. return super(PickleTask, self).apply_async(
  46. (fun, real_args, kwargs), shadow=qualname(fun), **options
  47. )
  48. @app.task(base=PickleTask)
  49. def call(fun, args, kwargs):
  50. return fun(*args, **kwargs)
  51. Undecided
  52. ---------
  53. - May consider moving callbacks/errbacks/chain into body.
  54. Will huge lists in headers cause overhead?
  55. The downside of keeping them in the body is that intermediates
  56. won't be able to introspect these values.
  57. Definition
  58. ==========
  59. .. code-block:: python
  60. # protocol v2 implies UTC=True
  61. # 'class' header existing means protocol is v2
  62. properties = {
  63. 'correlation_id': (uuid)task_id,
  64. 'content_type': (string)mime,
  65. 'content_encoding': (string)encoding,
  66. # optional
  67. 'reply_to': (string)queue_or_url,
  68. }
  69. headers = {
  70. 'lang': (string)'py'
  71. 'task': (string)task,
  72. 'id': (uuid)task_id,
  73. 'root_id': (uuid)root_id,
  74. 'parent_id': (uuid)parent_id,
  75. # optional
  76. 'meth': (string)unused,
  77. 'shadow': (string)replace_name,
  78. 'eta': (iso8601)eta,
  79. 'expires'; (iso8601)expires,
  80. 'callbacks': (list)Signature,
  81. 'errbacks': (list)Signature,
  82. 'chain': (list)Signature, # non-recursive, reversed list of signatures
  83. 'group': (uuid)group_id,
  84. 'chord': (uuid)chord_id,
  85. 'retries': (int)retries,
  86. 'timelimit': (tuple)(soft, hard),
  87. }
  88. body = (args, kwargs)
  89. Example
  90. =======
  91. .. code-block:: python
  92. # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
  93. task_id = uuid()
  94. basic_publish(
  95. message=json.dumps([[2, 2], {}]),
  96. application_headers={
  97. 'lang': 'py',
  98. 'task': 'proj.tasks.add',
  99. 'chain': [
  100. # reversed chain list
  101. {'task': 'proj.tasks.add', 'args': (8, )},
  102. {'task': 'proj.tasks.add', 'args': (4, )},
  103. ]
  104. }
  105. properties={
  106. 'correlation_id': task_id,
  107. 'content_type': 'application/json',
  108. 'content_encoding': 'utf-8',
  109. }
  110. )