Browse Source

Merge branch 'pymonger/master'

Ask Solem 9 years ago
parent
commit
1099345539
3 changed files with 54 additions and 3 deletions
  1. 18 3
      celery/app/amqp.py
  2. 1 0
      celery/app/defaults.py
  3. 35 0
      celery/tests/app/test_amqp.py

+ 18 - 3
celery/app/amqp.py

@@ -49,6 +49,7 @@ class Queues(dict):
                              the occurrence of unknown queues
                              in `wanted` will raise :exc:`KeyError`.
     :keyword ha_policy: Default HA policy for queues with none set.
+    :keyword max_priority: Default x-max-priority for queues with none set.
 
 
     """
@@ -57,13 +58,15 @@ class Queues(dict):
     _consume_from = None
 
     def __init__(self, queues=None, default_exchange=None,
-                 create_missing=True, ha_policy=None, autoexchange=None):
+                 create_missing=True, ha_policy=None, autoexchange=None,
+                 max_priority=None):
         dict.__init__(self)
         self.aliases = WeakValueDictionary()
         self.default_exchange = default_exchange
         self.create_missing = create_missing
         self.ha_policy = ha_policy
         self.autoexchange = Exchange if autoexchange is None else autoexchange
+        self.max_priority = max_priority
         if isinstance(queues, (tuple, list)):
             queues = {q.name: q for q in queues}
         for name, q in items(queues or {}):
@@ -109,6 +112,10 @@ class Queues(dict):
             if queue.queue_arguments is None:
                 queue.queue_arguments = {}
             self._set_ha_policy(queue.queue_arguments)
+        if self.max_priority is not None:
+            if queue.queue_arguments is None:
+                queue.queue_arguments = {}
+            self._set_max_priority(queue.queue_arguments)
         self[queue.name] = queue
         return queue
 
@@ -119,6 +126,8 @@ class Queues(dict):
             options['routing_key'] = name
         if self.ha_policy is not None:
             self._set_ha_policy(options.setdefault('queue_arguments', {}))
+        if self.max_priority is not None:
+            self._set_max_priority(options.setdefault('queue_arguments', {}))
         q = self[name] = Queue.from_dict(name, **options)
         return q
 
@@ -129,6 +138,10 @@ class Queues(dict):
                                 'x-ha-policy-params': list(policy)})
         args['x-ha-policy'] = policy
 
+    def _set_max_priority(self, args):
+        if 'x-max-priority' not in args and self.max_priority is not None:
+            return args.update({'x-max-priority': self.max_priority})
+
     def format(self, indent=0, indent_first=True):
         """Format routing table into string for log dumps."""
         active = self.consume_from
@@ -227,7 +240,7 @@ class AMQP(object):
         return self._create_task_sender()
 
     def Queues(self, queues, create_missing=None, ha_policy=None,
-               autoexchange=None):
+               autoexchange=None, max_priority=None):
         """Create new :class:`Queues` instance, using queue defaults
         from the current configuration."""
         conf = self.app.conf
@@ -235,6 +248,8 @@ class AMQP(object):
             create_missing = conf.CELERY_CREATE_MISSING_QUEUES
         if ha_policy is None:
             ha_policy = conf.CELERY_QUEUE_HA_POLICY
+        if max_priority is None:
+            max_priority = conf.CELERY_QUEUE_MAX_PRIORITY
         if not queues and conf.CELERY_DEFAULT_QUEUE:
             queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
                             exchange=self.default_exchange,
@@ -243,7 +258,7 @@ class AMQP(object):
                         else autoexchange)
         return self.queues_cls(
             queues, self.default_exchange, create_missing,
-            ha_policy, autoexchange,
+            ha_policy, autoexchange, max_priority,
         )
 
     def Router(self, queues=None, create_missing=None):

+ 1 - 0
celery/app/defaults.py

@@ -165,6 +165,7 @@ NAMESPACES = {
         'REDIRECT_STDOUTS_LEVEL': Option('WARNING'),
         'QUEUES': Option(type='dict'),
         'QUEUE_HA_POLICY': Option(None, type='string'),
+        'QUEUE_MAX_PRIORITY': Option(None, type='int'),
         'SECURITY_KEY': Option(type='string'),
         'SECURITY_CERTIFICATE': Option(type='string'),
         'SECURITY_CERT_STORE': Option(type='string'),

+ 35 - 0
celery/tests/app/test_amqp.py

@@ -134,3 +134,38 @@ class test_Queues(AppCase):
         q = Queues()
         q.add(Queue('foo', alias='barfoo'))
         self.assertIs(q['barfoo'], q['foo'])
+
+    def test_with_max_priority(self):
+        qs1 = Queues(max_priority=10)
+        qs1.add('foo')
+        self.assertEqual(qs1['foo'].queue_arguments, {'x-max-priority': 10})
+
+        q1 = Queue('xyx', queue_arguments={'x-max-priority': 3})
+        qs1.add(q1)
+        self.assertEqual(qs1['xyx'].queue_arguments, {
+            'x-max-priority': 3,
+        })
+
+        qs2 = Queues(ha_policy='all', max_priority=5)
+        qs2.add('bar')
+        self.assertEqual(qs2['bar'].queue_arguments, {
+            'x-ha-policy': 'all',
+            'x-max-priority': 5
+        })
+
+        q2 = Queue('xyx2', queue_arguments={'x-max-priority': 2})
+        qs2.add(q2)
+        self.assertEqual(qs2['xyx2'].queue_arguments, {
+            'x-ha-policy': 'all',
+            'x-max-priority': 2,
+        })
+
+        qs3 = Queues(max_priority=None)
+        qs3.add('foo2')
+        self.assertEqual(qs3['foo2'].queue_arguments, None)
+
+        q3 = Queue('xyx3', queue_arguments={'x-max-priority': 7})
+        qs3.add(q3)
+        self.assertEqual(qs3['xyx3'].queue_arguments, {
+            'x-max-priority': 7,
+        })