======================================= Task Message Protocol v2 Draft Spec. ======================================= Notes ===== - Support for multiple languages via the ``lang`` header. Worker may redirect the message to a worker that supports the language. - Metadata moved to headers. This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (which may be language specific, e.g. serialized by the Python specific pickle serializer). - Body is only for language specific data. - Python stores args/kwargs in body. - If a message uses raw encoding then the raw data will be passed as a single argument to the function. - Java/C, etc can use a thrift/protobuf document as the body - Dispatches to actor based on ``c_type``, ``c_meth`` headers c_meth is unsued by python, but may be used in the future to specify class+method pairs. - Chain gains a dedicated field. Reducing the chain into a recursive ``callbacks`` argument causes problems when the recursion limit is exceeded. This is fixed in the new message protocol by specifying a list of chords, each task will then pop a chain off the list when sending the next message:: execute_task(message) chain = message.headers['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain) - ``correlation_id`` replaces task_id ``field``. Undecided --------- - May consider moving callbacks/errbacks/chain into body. It's unknown if huge lists in headers will cause messaging overhead, but the downside of moving them into the body is that intermiediates will not be able to introspect these values. Definition ========== .. code-block:: python # protocol v2 implies UTC=True # 'class' header existing means protocol is v2 properties = { 'correlation_id': (uuid)task_id, 'content_type': (string)mime, 'content_encoding': (string)encoding, # optional 'reply_to': (string)queue_or_url, } headers = { 'lang': (string)'py' 'c_type': (string)task, # optional 'c_meth': (string)'', 'eta': (iso8601)eta, 'expires'; (iso8601)expires, 'callbacks': (list)Signature, 'errbacks': (list)Signature, 'chain': (list)Signature, # non-recursive, reversed list of signatures 'group': (uuid)group_id, 'chord': (uuid)chord_id, 'retries': (int)retries, 'time' } body = (args, kwargs) Example ======= .. code-block:: python # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8 task_id = uuid() basic_publish( message=json.dumps([[2, 2], {}]), application_headers={ 'lang': 'py', 'c_type': 'proj.tasks.add', 'chain': [ # reversed chain list {'task': 'proj.tasks.add', 'args': (8, )}, {'task': 'proj.tasks.add', 'args': (4, )}, ] } properties={ 'correlation_id': task_id, 'content_type': 'application/json', 'content_encoding': 'utf-8', } )