| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 | .. _message-protocol:=================== Message Protocol===================.. contents::    :local:.. _message-protocol-task:.. _internals-task-message-protocol:Task messages=============.. _message-protocol-task-v2:Version 2---------Definition~~~~~~~~~~.. code-block:: python    # protocol v2 implies UTC=True    # 'class' header existing means protocol is v2    properties = {        'correlation_id': uuid task_id,        'content_type': string mimetype,        'content_encoding': string encoding,        # optional        'reply_to': string queue_or_url,    }    headers = {        'lang': string 'py'        'task': string task,        'id': uuid task_id,        'root_id': uuid root_id,        'parent_id': uuid parent_id,        'group': uuid group_id,        # optional        'meth': string method_name,        'shadow': string alias_name,        'eta':  iso8601 eta,        'expires'; iso8601 expires,        'retries': int retries,        'timelimit': (soft, hard),    }    body = (        object[] args,        Mapping kwargs,        Mapping embed {            'callbacks': Signature[] callbacks,            'errbacks': Signature[] errbacks,            'chain': Signature[] chain,            'chord': Signature chord_callback,        }    )Example~~~~~~~This example sends a task message using version 2 of the protocol:.. code-block:: python    # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8    task_id = uuid()    basic_publish(        message=json.dumps(([2, 2], {}, None),        application_headers={            'lang': 'py',            'task': 'proj.tasks.add',        }        properties={            'correlation_id': task_id,            'content_type': 'application/json',            'content_encoding': 'utf-8',        }    )Changes from version 1~~~~~~~~~~~~~~~~~~~~~~- Protocol version detected by the presence of a ``task`` message header.- Support for multiple languages via the ``lang`` header.    Worker may redirect the message to a worker that supports    the language.- Metadata moved to headers.    This means that workers/intermediates can inspect the message    and make decisions based on the headers without decoding    the payload (which may be language specific, e.g. serialized by the    Python specific pickle serializer).- Body is only for language specific data.    - Python stores args/kwargs and embedded signatures in body.    - If a message uses raw encoding then the raw data      will be passed as a single argument to the function.    - Java/C, etc. can use a thrift/protobuf document as the body- Dispatches to actor based on ``task``, ``meth`` headers    ``meth`` is unused by python, but may be used in the future    to specify class+method pairs.- Chain gains a dedicated field.    Reducing the chain into a recursive ``callbacks`` argument    causes problems when the recursion limit is exceeded.    This is fixed in the new message protocol by specifying    a list of signatures, each task will then pop a task off the list    when sending the next message::        execute_task(message)        chain = embed['chain']        if chain:            sig = maybe_signature(chain.pop())            sig.apply_async(chain=chain)- ``correlation_id`` replaces ``task_id`` field.- ``root_id`` and ``parent_id`` fields helps keep track of workflows.- ``shadow`` lets you specify a different name for logs, monitors  can be used for e.g. meta tasks that calls any function::    from celery.utils.imports import qualname    class PickleTask(Task):        abstract = True        def unpack_args(self, fun, args=()):            return fun, args        def apply_async(self, args, kwargs, **options):            fun, real_args = self.unpack_args(*args)            return super(PickleTask, self).apply_async(                (fun, real_args, kwargs), shadow=qualname(fun), **options            )    @app.task(base=PickleTask)    def call(fun, args, kwargs):        return fun(*args, **kwargs).. _message-protocol-task-v1:.. _task-message-protocol-v1:Version 1---------In version 1 of the protocol all fields are stored in the message body,which means workers and intermediate consumers must deserialize the payloadto read the fields.Message body~~~~~~~~~~~~* task    :`string`:    Name of the task. **required*** id    :`string`:    Unique id of the task (UUID). **required*** args    :`list`:    List of arguments. Will be an empty list if not provided.* kwargs    :`dictionary`:    Dictionary of keyword arguments. Will be an empty dictionary if not    provided.* retries    :`int`:    Current number of times this task has been retried.    Defaults to `0` if not specified.* eta    :`string` (ISO 8601):    Estimated time of arrival. This is the date and time in ISO 8601    format. If not provided the message is not scheduled, but will be    executed asap.* expires    :`string` (ISO 8601):    .. versionadded:: 2.0.2    Expiration date. This is the date and time in ISO 8601 format.    If not provided the message will never expire. The message    will be expired when the message is received and the expiration date    has been exceeded.* taskset    :`string`:    The taskset this task is part of (if any).* chord    :`Signature`:    .. versionadded:: 2.3    Signifies that this task is one of the header parts of a chord.  The value    of this key is the body of the cord that should be executed when all of    the tasks in the header has returned.* utc    :`bool`:    .. versionadded:: 2.5    If true time uses the UTC timezone, if not the current local timezone    should be used.* callbacks    :`<list>Signature`:    .. versionadded:: 3.0    A list of signatures to call if the task exited successfully.* errbacks    :`<list>Signature`:    .. versionadded:: 3.0    A list of signatures to call if an error occurs while executing the task.* timelimit    :`<tuple>(float, float)`:    .. versionadded:: 3.1    Task execution time limit settings. This is a tuple of hard and soft time    limit value (`int`/`float` or :const:`None` for no limit).    Example value specifying a soft time limit of 3 seconds, and a hard time    limt of 10 seconds::        {'timelimit': (3.0, 10.0)}Example message~~~~~~~~~~~~~~~This is an example invocation of a `celery.task.ping` task in JSONformat:.. code-block:: javascript    {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",     "task": "celery.task.PingTask",     "args": [],     "kwargs": {},     "retries": 0,     "eta": "2009-11-17T12:30:56.527191"}Task Serialization------------------Several types of serialization formats are supported using the`content_type` message header.The MIME-types supported by default are shown in the following table.    =============== =================================         Scheme                 MIME Type    =============== =================================    json            application/json    yaml            application/x-yaml    pickle          application/x-python-serialize    msgpack         application/x-msgpack    =============== =================================
 |