Browse Source

Adds app.amqp.autoexchange to specify the exchange for automatic queues.

The main purpose for this addition is to be able to use the AMQP 'anonymous
exchange':

    app = Celery()
    app.amqp.autoqueue = lambda name: None

The anon exchange can be used to bypass routing and send messages to a queue
directly by name.  The name of the queue is specified in the `routing_key`.
Using the anon exchange will speed up message publish.
Ask Solem 11 years ago
parent
commit
3c03d6f213
1 changed files with 17 additions and 4 deletions
  1. 17 4
      celery/app/amqp.py

+ 17 - 4
celery/app/amqp.py

@@ -50,12 +50,13 @@ class Queues(dict):
     _consume_from = None
 
     def __init__(self, queues=None, default_exchange=None,
-                 create_missing=True, ha_policy=None):
+                 create_missing=True, ha_policy=None, autoexchange=None):
         dict.__init__(self)
         self.aliases = WeakValueDictionary()
         self.default_exchange = default_exchange
         self.create_missing = create_missing
         self.ha_policy = ha_policy
+        self.autoexchange = Exchange if autoexchange is None else autoexchange
         if isinstance(queues, (tuple, list)):
             queues = dict((q.name, q) for q in queues)
         for name, q in items(queues or {}):
@@ -156,7 +157,7 @@ class Queues(dict):
             self._consume_from.pop(queue, None)
 
     def new_missing(self, name):
-        return Queue(name, Exchange(name), name)
+        return Queue(name, self.autoexchange(name), name)
 
     @property
     def consume_from(self):
@@ -337,13 +338,20 @@ class AMQP(object):
     #: set by the :attr:`producer_pool`.
     _producer_pool = None
 
+    # Exchange class/function used when defining automatic queues.
+    # E.g. you can use ``autoexchange = lambda n: None`` to use the
+    # amqp default exchange, which is a shortcut to bypass routing
+    # and instead send directly to the queue named in the routing key.
+    autoexchange = None
+
     def __init__(self, app):
         self.app = app
 
     def flush_routes(self):
         self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
 
-    def Queues(self, queues, create_missing=None, ha_policy=None):
+    def Queues(self, queues, create_missing=None, ha_policy=None,
+               autoexchange=None):
         """Create new :class:`Queues` instance, using queue defaults
         from the current configuration."""
         conf = self.app.conf
@@ -355,7 +363,12 @@ class AMQP(object):
             queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
                             exchange=self.default_exchange,
                             routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
-        return Queues(queues, self.default_exchange, create_missing, ha_policy)
+        autoexchange = (self.autoexchange if autoexchange is None
+                        else autoexchange)
+        return Queues(
+            queues, self.default_exchange, create_missing,
+            ha_policy, autoexchange,
+        )
 
     def Router(self, queues=None, create_missing=None):
         """Returns the current task router."""