Browse Source

Task proto2: Custom headers were ignored

Ask Solem 10 năm trước cách đây
mục cha
commit
961429539e
2 tập tin đã thay đổi với 26 bổ sung11 xóa
  1. 6 4
      celery/app/amqp.py
  2. 20 7
      docs/userguide/signals.rst

+ 6 - 4
celery/app/amqp.py

@@ -417,7 +417,9 @@ class AMQP(object):
                          compression=None, declare=None,
                          headers=None, **kwargs):
             retry = default_retry if retry is None else retry
-            headers, properties, body, sent_event = message
+            headers2, properties, body, sent_event = message
+            if headers:
+                headers2.update(headers)
             if kwargs:
                 properties.update(kwargs)
 
@@ -448,7 +450,7 @@ class AMQP(object):
                 send_before_publish(
                     sender=name, body=body,
                     exchange=exchange, routing_key=routing_key,
-                    declare=declare, headers=headers,
+                    declare=declare, headers=headers2,
                     properties=kwargs,  retry_policy=retry_policy,
                 )
             ret = producer.publish(
@@ -459,11 +461,11 @@ class AMQP(object):
                 compression=compression or default_compressor,
                 retry=retry, retry_policy=_rp,
                 delivery_mode=delivery_mode, declare=declare,
-                headers=headers,
+                headers=headers2,
                 **properties
             )
             if after_receivers:
-                send_after_publish(sender=name, body=body,
+                send_after_publish(sender=name, body=body, headers=headers2,
                                    exchange=exchange, routing_key=routing_key)
             if sent_receivers:  # XXX deprecated
                 send_task_sent(sender=name, task_id=body['id'], task=name,

+ 20 - 7
docs/userguide/signals.rst

@@ -28,9 +28,12 @@ Example connecting to the :signal:`after_task_publish` signal:
     from celery.signals import after_task_publish
 
     @after_task_publish.connect
-    def task_sent_handler(sender=None, body=None, **kwargs):
-        print('after_task_publish for task id {body[id]}'.format(
-            body=body,
+    def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
+        # information about task are located in headers for task messages
+        # using the task protocol version 2.
+        info = headers if 'task' in headers else body
+        print('after_task_publish for task id {info[id]}'.format(
+            info=info,
         ))
 
 
@@ -44,9 +47,12 @@ is published:
 .. code-block:: python
 
     @after_task_publish.connect(sender='proj.tasks.add')
-    def task_sent_handler(sender=None, body=None, **kwargs):
-        print('after_task_publish for task id {body[id]}'.format(
-            body=body,
+    def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
+        # information about task are located in headers for task messages
+        # using the task protocol version 2.
+        info = headers if 'task' in headers else body
+        print('after_task_publish for task id {info[id]}'.format(
+            info=info,
         ))
 
 Signals use the same implementation as django.core.dispatch. As a result other
@@ -123,9 +129,16 @@ Sender is the name of the task being sent.
 
 Provides arguments:
 
+* headers
+
+    The task message headers, see :ref:`task-message-protocol-v2`
+    and :ref:`task-message-protocol-v1`.
+    for a reference of possible fields that can be defined.
+
 * body
 
-    The task message body, see :ref:`task-message-protocol-v1`
+    The task message body, see :ref:`task-message-protocol-v2`
+    and :ref:`task-message-protocol-v1`.
     for a reference of possible fields that can be defined.
 
 * exchange