123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- .. _protov2draft:
- ========================================
- Task Message Protocol v2 (Draft Spec.)
- ========================================
- Notes
- =====
- - Support for multiple languages via the ``lang`` header.
- Worker may redirect the message to a worker that supports
- the language.
- - Metadata moved to headers.
- This means that workers/intermediates can inspect the message
- and make decisions based on the headers without decoding
- the payload (which may be language specific, e.g. serialized by the
- Python specific pickle serializer).
- - Body is only for language specific data.
- - Python stores args/kwargs in body.
- - If a message uses raw encoding then the raw data
- will be passed as a single argument to the function.
- - Java/C, etc. can use a thrift/protobuf document as the body
- - Dispatches to actor based on ``c_type``, ``c_meth`` headers
- ``c_meth`` is unused by python, but may be used in the future
- to specify class+method pairs.
- - Chain gains a dedicated field.
- Reducing the chain into a recursive ``callbacks`` argument
- causes problems when the recursion limit is exceeded.
- This is fixed in the new message protocol by specifying
- a list of signatures, each task will then pop a task off the list
- when sending the next message::
- execute_task(message)
- chain = message.headers['chain']
- if chain:
- sig = maybe_signature(chain.pop())
- sig.apply_async(chain=chain)
- - ``correlation_id`` replaces ``task_id`` field.
- - ``c_shadow`` lets you specify a different name for logs, monitors
- can be used for e.g. meta tasks that calls any function::
- from celery.utils.imports import qualname
- class PickleTask(Task):
- abstract = True
- def unpack_args(self, fun, args=()):
- return fun, args
- def apply_async(self, args, kwargs, **options):
- fun, real_args = self.unpack_args(*args)
- return super(PickleTask, self).apply_async(
- (fun, real_args, kwargs), shadow=qualname(fun), **options
- )
- @app.task(base=PickleTask)
- def call(fun, args, kwargs):
- return fun(*args, **kwargs)
- Undecided
- ---------
- - May consider moving callbacks/errbacks/chain into body.
- Will huge lists in headers cause overhead?
- The downside of keeping them in the body is that intermediates
- won't be able to introspect these values.
- Definition
- ==========
- .. code-block:: python
- # protocol v2 implies UTC=True
- # 'class' header existing means protocol is v2
- properties = {
- 'correlation_id': (uuid)task_id,
- 'content_type': (string)mime,
- 'content_encoding': (string)encoding,
- # optional
- 'reply_to': (string)queue_or_url,
- }
- headers = {
- 'lang': (string)'py'
- 'c_type': (string)task,
- # optional
- 'c_meth': (string)unused,
- 'c_shadow': (string)replace_name,
- 'eta': (iso8601)eta,
- 'expires'; (iso8601)expires,
- 'callbacks': (list)Signature,
- 'errbacks': (list)Signature,
- 'chain': (list)Signature, # non-recursive, reversed list of signatures
- 'group': (uuid)group_id,
- 'chord': (uuid)chord_id,
- 'retries': (int)retries,
- 'timelimit': (tuple)(soft, hard),
- }
- body = (args, kwargs)
- Example
- =======
- .. code-block:: python
- # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
- task_id = uuid()
- basic_publish(
- message=json.dumps([[2, 2], {}]),
- application_headers={
- 'lang': 'py',
- 'c_type': 'proj.tasks.add',
- 'chain': [
- # reversed chain list
- {'task': 'proj.tasks.add', 'args': (8, )},
- {'task': 'proj.tasks.add', 'args': (4, )},
- ]
- }
- properties={
- 'correlation_id': task_id,
- 'content_type': 'application/json',
- 'content_encoding': 'utf-8',
- }
- )
|