Browse Source

Adds a CELERY_QUEUE_HA_POLICY setting for easy configuration of x-ha-policy

Ask Solem 12 years ago
parent
commit
5440e4d746
3 changed files with 47 additions and 3 deletions
  1. 20 3
      celery/app/amqp.py
  2. 1 0
      celery/app/defaults.py
  3. 26 0
      docs/configuration.rst

+ 20 - 3
celery/app/amqp.py

@@ -37,6 +37,7 @@ class Queues(dict):
                              added automatically, but if disabled
                              the occurrence of unknown queues
                              in `wanted` will raise :exc:`KeyError`.
+    :keyword ha_policy: Default HA policy for queues with none set.
 
 
     """
@@ -45,11 +46,12 @@ class Queues(dict):
     _consume_from = None
 
     def __init__(self, queues=None, default_exchange=None,
-            create_missing=True):
+            create_missing=True, ha_policy=None):
         dict.__init__(self)
         self.aliases = WeakValueDictionary()
         self.default_exchange = default_exchange
         self.create_missing = create_missing
+        self.ha_policy = ha_policy
         if isinstance(queues, (tuple, list)):
             queues = dict((q.name, q) for q in queues)
         for name, q in (queues or {}).iteritems():
@@ -86,6 +88,10 @@ class Queues(dict):
         """
         if not isinstance(queue, Queue):
             return self.add_compat(queue, **kwargs)
+        if self.ha_policy:
+            if queue.queue_arguments is None:
+                queue.queue_arguments = {}
+            self._set_ha_policy(queue.queue_arguments)
         self[queue.name] = queue
         return queue
 
@@ -94,9 +100,18 @@ class Queues(dict):
         options.setdefault('routing_key', options.get('binding_key'))
         if options['routing_key'] is None:
             options['routing_key'] = name
+        if self.ha_policy is not None:
+            self._set_ha_policy(options.setdefault('queue_arguments', {}))
         q = self[name] = entry_to_queue(name, **options)
         return q
 
+    def _set_ha_policy(self, args):
+        policy = self.ha_policy
+        if isinstance(policy, (list, tuple)):
+            return args.update({'x-ha-policy': 'nodes',
+                                'x-ha-policy-params': list(policy)})
+        args['x-ha-policy'] = policy
+
     def format(self, indent=0, indent_first=True):
         """Format routing table into string for log dumps."""
         active = self.consume_from
@@ -273,17 +288,19 @@ class AMQP(object):
     def flush_routes(self):
         self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
 
-    def Queues(self, queues, create_missing=None):
+    def Queues(self, queues, create_missing=None, ha_policy=None):
         """Create new :class:`Queues` instance, using queue defaults
         from the current configuration."""
         conf = self.app.conf
         if create_missing is None:
             create_missing = conf.CELERY_CREATE_MISSING_QUEUES
+        if ha_policy is None:
+            ha_policy = conf.CELERY_QUEUE_HA_POLICY
         if not queues and conf.CELERY_DEFAULT_QUEUE:
             queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
                             exchange=self.default_exchange,
                             routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
-        return Queues(queues, self.default_exchange, create_missing)
+        return Queues(queues, self.default_exchange, create_missing, ha_policy)
 
     def Router(self, queues=None, create_missing=None):
         """Returns the current task router."""

+ 1 - 0
celery/app/defaults.py

@@ -152,6 +152,7 @@ NAMESPACES = {
         'REDIRECT_STDOUTS': Option(True, type='bool'),
         'REDIRECT_STDOUTS_LEVEL': Option('WARNING'),
         'QUEUES': Option(type='dict'),
+        'QUEUE_HA_POLICY': Option(None, type='string'),
         'SECURITY_KEY': Option(type='string'),
         'SECURITY_CERTIFICATE': Option(type='string'),
         'SECURITY_CERT_STORE': Option(type='string'),

+ 26 - 0
docs/configuration.rst

@@ -623,6 +623,32 @@ A list of routers, or a single router used to route tasks to queues.
 When deciding the final destination of a task the routers are consulted
 in order.  See :ref:`routers` for more information.
 
+.. setting:: CELERY_QUEUE_HA_POLICY
+
+CELERY_QUEUE_HA_POLICY
+~~~~~~~~~~~~~~~~~~~~~~
+:brokers: RabbitMQ
+
+This will set the default HA policy for a queue, and the value
+can either be a string (usually ``all``):
+
+.. code-block:: python
+
+    CELERY_QUEUE_HA_POLICY = 'all'
+
+Using 'all' will replicate the queue to all current nodes,
+Or you can give it a list of nodes to replicate to:
+
+.. code-block:: python
+
+    CELERY_QUEUE_HA_POLICY = ['rabbit@host1', 'rabbit@host2']
+
+
+Using a list will implicitly set ``x-ha-policy`` to 'nodes' and
+``x-ha-policy-params`` to the given list of nodes.
+
+See http://www.rabbitmq.com/ha.html for more information.
+
 .. setting:: CELERY_WORKER_DIRECT
 
 CELERY_WORKER_DIRECT