|  | @@ -1,48 +1,115 @@
 | 
	
		
			
				|  |  | -# protocol v2 implies UTC=True
 | 
	
		
			
				|  |  | -# 'class' header existing means protocol is v2
 | 
	
		
			
				|  |  | +=======================================
 | 
	
		
			
				|  |  | + Task Message Protocol v2 Draft Spec.
 | 
	
		
			
				|  |  | +=======================================
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -properties = {
 | 
	
		
			
				|  |  | -    'correlation_id': (uuid)task_id,
 | 
	
		
			
				|  |  | -    'content_type': (string)mime,
 | 
	
		
			
				|  |  | -    'content_encoding': (string)encoding,
 | 
	
		
			
				|  |  | +Notes
 | 
	
		
			
				|  |  | +=====
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    # optional
 | 
	
		
			
				|  |  | -    'reply_to': (string)queue_or_url,
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -headers = {
 | 
	
		
			
				|  |  | -    'lang': (string)'py'
 | 
	
		
			
				|  |  | -    'class': (string)task,
 | 
	
		
			
				|  |  | +- Support for multiple languages via the ``lang`` header.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    # optional
 | 
	
		
			
				|  |  | -    'method': (string)'',
 | 
	
		
			
				|  |  | -    'eta': (iso8601)eta,
 | 
	
		
			
				|  |  | -    'expires'; (iso8601)expires,
 | 
	
		
			
				|  |  | -    'callbacks': (list)Signature,
 | 
	
		
			
				|  |  | -    'errbacks': (list)Signature,
 | 
	
		
			
				|  |  | -    'chain': (list)Signature,  # non-recursive
 | 
	
		
			
				|  |  | -    'group': (uuid)group_id,
 | 
	
		
			
				|  |  | -    'chord': (uuid)chord_id,
 | 
	
		
			
				|  |  | -    'retries': (int)retries,
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | +    Worker may redirect the message to a worker that supports
 | 
	
		
			
				|  |  | +    the language.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -body = (args, kwargs)
 | 
	
		
			
				|  |  | +- Metadata moved to headers.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    This means that workers/intermediates can inspect the message
 | 
	
		
			
				|  |  | +    and make decisions based on the headers without decoding
 | 
	
		
			
				|  |  | +    the payload (which may be language specific, e.g. serialized by the
 | 
	
		
			
				|  |  | +    Python specific pickle serializer).
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +- Body is only for language specific data.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    - Python stores args/kwargs in body.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -Example:
 | 
	
		
			
				|  |  | +    - If a message uses raw encoding then the raw data
 | 
	
		
			
				|  |  | +      will be passed as a single argument to the function.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    # chain: add(add(add(2, 2), 4), 8) = 2 + 2 + 4 + 8
 | 
	
		
			
				|  |  | +    - Java/C, etc can use a thrift/protobuf document as the body
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +- Dispatches to actor based on ``c_type``, ``c_meth`` headers
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    c_meth is unsued by python, but may be used in the future
 | 
	
		
			
				|  |  | +    to specify class+method pairs.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +- Chain gains a dedicated field.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Reducing the chain into a recursive ``callbacks`` argument
 | 
	
		
			
				|  |  | +    causes problems when the recursion limit is exceeded.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    This is fixed in the new message protocol by specifying
 | 
	
		
			
				|  |  | +    a list of chords, each task will then pop a chain off the list
 | 
	
		
			
				|  |  | +    when sending the next message::
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        execute_task(message)
 | 
	
		
			
				|  |  | +        chain = message.headers['chain']
 | 
	
		
			
				|  |  | +        if chain:
 | 
	
		
			
				|  |  | +            sig = maybe_signature(chain.pop())
 | 
	
		
			
				|  |  | +            sig.apply_async(chain=chain)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +- ``correlation_id`` replaces task_id ``field``.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +Undecided
 | 
	
		
			
				|  |  | +---------
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +- May consider moving callbacks/errbacks/chain into body.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    It's unknown if huge lists in headers will cause messaging overhead,
 | 
	
		
			
				|  |  | +    but the downside of moving them into the body is that intermiediates
 | 
	
		
			
				|  |  | +    will not be able to introspect these values.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +Definition
 | 
	
		
			
				|  |  | +==========
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +.. code-block:: python
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # protocol v2 implies UTC=True
 | 
	
		
			
				|  |  | +    # 'class' header existing means protocol is v2
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    properties = {
 | 
	
		
			
				|  |  | +        'correlation_id': (uuid)task_id,
 | 
	
		
			
				|  |  | +        'content_type': (string)mime,
 | 
	
		
			
				|  |  | +        'content_encoding': (string)encoding,
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # optional
 | 
	
		
			
				|  |  | +        'reply_to': (string)queue_or_url,
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    headers = {
 | 
	
		
			
				|  |  | +        'lang': (string)'py'
 | 
	
		
			
				|  |  | +        'c_type': (string)task,
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # optional
 | 
	
		
			
				|  |  | +        'c_meth': (string)'',
 | 
	
		
			
				|  |  | +        'eta': (iso8601)eta,
 | 
	
		
			
				|  |  | +        'expires'; (iso8601)expires,
 | 
	
		
			
				|  |  | +        'callbacks': (list)Signature,
 | 
	
		
			
				|  |  | +        'errbacks': (list)Signature,
 | 
	
		
			
				|  |  | +        'chain': (list)Signature,  # non-recursive, reversed list of signatures
 | 
	
		
			
				|  |  | +        'group': (uuid)group_id,
 | 
	
		
			
				|  |  | +        'chord': (uuid)chord_id,
 | 
	
		
			
				|  |  | +        'retries': (int)retries,
 | 
	
		
			
				|  |  | +        'time'
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    body = (args, kwargs)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +Example
 | 
	
		
			
				|  |  | +=======
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +.. code-block:: python
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      task_id = uuid()
 | 
	
		
			
				|  |  |      basic_publish(
 | 
	
		
			
				|  |  |          message=json.dumps([[2, 2], {}]),
 | 
	
		
			
				|  |  |          application_headers={
 | 
	
		
			
				|  |  |              'lang': 'py',
 | 
	
		
			
				|  |  | -            'class': 'proj.tasks.add',
 | 
	
		
			
				|  |  | +            'c_type': 'proj.tasks.add',
 | 
	
		
			
				|  |  |              'chain': [
 | 
	
		
			
				|  |  | -                {'task': 'proj.tasks.add', 'args': (4, )},
 | 
	
		
			
				|  |  | +                # reversed chain list
 | 
	
		
			
				|  |  |                  {'task': 'proj.tasks.add', 'args': (8, )},
 | 
	
		
			
				|  |  | +                {'task': 'proj.tasks.add', 'args': (4, )},
 | 
	
		
			
				|  |  |              ]
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          properties={
 |