======================================= Task Message Protocol v2 Draft Spec. ======================================= Notes ===== - Support for multiple languages via the ``lang`` header. Worker may redirect the message to a worker that supports the language. - Metadata moved to headers. This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (which may be language specific, e.g. serialized by the Python specific pickle serializer). - Body is only for language specific data. - Python stores args/kwargs in body. - If a message uses raw encoding then the raw data will be passed as a single argument to the function. - Java/C, etc. can use a thrift/protobuf document as the body - Dispatches to actor based on ``c_type``, ``c_meth`` headers ``c_meth`` is unused by python, but may be used in the future to specify class+method pairs. - Chain gains a dedicated field. Reducing the chain into a recursive ``callbacks`` argument causes problems when the recursion limit is exceeded. This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:: execute_task(message) chain = message.headers['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain) - ``correlation_id`` replaces ``task_id`` field. Undecided --------- - May consider moving callbacks/errbacks/chain into body. Will huge lists in headers cause overhead? The downside of keeping them in the body is that intermediates won't be able to introspect these values. Definition ========== .. code-block:: python # protocol v2 implies UTC=True # 'class' header existing means protocol is v2 properties = { 'correlation_id': (uuid)task_id, 'content_type': (string)mime, 'content_encoding': (string)encoding, # optional 'reply_to': (string)queue_or_url, } headers = { 'lang': (string)'py' 'c_type': (string)task, # optional 'c_meth': (string)'', 'eta': (iso8601)eta, 'expires'; (iso8601)expires, 'callbacks': (list)Signature, 'errbacks': (list)Signature, 'chain': (list)Signature, # non-recursive, reversed list of signatures 'group': (uuid)group_id, 'chord': (uuid)chord_id, 'retries': (int)retries, 'timelimit': (tuple)(soft, hard), } body = (args, kwargs) Example ======= .. code-block:: python # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8 task_id = uuid() basic_publish( message=json.dumps([[2, 2], {}]), application_headers={ 'lang': 'py', 'c_type': 'proj.tasks.add', 'chain': [ # reversed chain list {'task': 'proj.tasks.add', 'args': (8, )}, {'task': 'proj.tasks.add', 'args': (4, )}, ] } properties={ 'correlation_id': task_id, 'content_type': 'application/json', 'content_encoding': 'utf-8', } )