Browse Source

Fixes celery amqp when using pyamqp://. Closes #2013

Ask Solem 10 years ago
parent
commit
4fc01fa7eb
1 changed files with 11 additions and 0 deletions
  1. 11 0
      celery/bin/amqp.py

+ 11 - 0
celery/bin/amqp.py

@@ -182,6 +182,16 @@ class AMQShell(cmd.Cmd):
         'basic.ack': Spec(('delivery_tag', int)),
     }
 
+    def _prepare_spec(self, conn):
+        # XXX Hack to fix Issue #2013
+        from amqp import Connection, Message
+        if isinstance(conn.connection, Connection):
+            self.amqp['basic.publish'] = Spec(('msg', Message),
+                                              ('exchange', str),
+                                              ('routing_key', str),
+                                              ('mandatory', bool, 'no'),
+                                              ('immediate', bool, 'no'))
+
     def __init__(self, *args, **kwargs):
         self.connect = kwargs.pop('connect')
         self.silent = kwargs.pop('silent', False)
@@ -298,6 +308,7 @@ class AMQShell(cmd.Cmd):
     def _reconnect(self):
         """Re-establish connection to the AMQP server."""
         self.conn = self.connect(self.conn)
+        self._prepare_spec(self.conn)
         self.chan = self.conn.default_channel
         self.needs_reconnect = False