Browse Source

Fixed TypeError due to change of task protocol (#3714)

If the body is a tuple, protocol version 2 is used.  Therefore collect
the parameters according to:
https://github.com/celery/celery/blob/v4.0.2/docs/internals/protocol.rst#definition

Otherwise protocol version 1 is still in use.

Fixes celery/celery#3707
Michael 8 years ago
parent
commit
fb9ad6c9d4
2 changed files with 18 additions and 3 deletions
  1. 12 3
      celery/app/amqp.py
  2. 6 0
      t/unit/app/test_amqp.py

+ 12 - 3
celery/app/amqp.py

@@ -561,9 +561,18 @@ class AMQP(object):
                 send_after_publish(sender=name, body=body, headers=headers2,
                                    exchange=exchange, routing_key=routing_key)
             if sent_receivers:  # XXX deprecated
-                send_task_sent(sender=name, task_id=body['id'], task=name,
-                               args=body['args'], kwargs=body['kwargs'],
-                               eta=body['eta'], taskset=body['taskset'])
+                if isinstance(body, tuple):  # protocol version 2
+                    send_task_sent(
+                        sender=name, task_id=headers2['id'], task=name,
+                        args=body[0], kwargs=body[1],
+                        eta=headers2['eta'], taskset=headers2['group'],
+                    )
+                else:  # protocol version 1
+                    send_task_sent(
+                        sender=name, task_id=body['id'], task=name,
+                        args=body['args'], kwargs=body['kwargs'],
+                        eta=body['eta'], taskset=body['taskset'],
+                    )
             if sent_event:
                 evd = event_dispatcher or default_evd
                 exname = exchange

+ 6 - 0
t/unit/app/test_amqp.py

@@ -284,6 +284,12 @@ class test_AMQP:
         )
         assert prod.publish.call_args[1]['delivery_mode'] == 33
 
+    def test_send_task_message__with_receivers(self):
+        from case import patch
+        mocked_receiver = ((Mock(), Mock()), Mock())
+        with patch('celery.signals.task_sent.receivers', [mocked_receiver]):
+            self.app.amqp.send_task_message(Mock(), 'foo', self.simple_message)
+
     def test_routes(self):
         r1 = self.app.amqp.routes
         r2 = self.app.amqp.routes