Browse Source

Use new kombu Queue.expires setting

Ask Solem 8 years ago
parent
commit
ff14064744
5 changed files with 29 additions and 46 deletions
  1. 8 11
      celery/backends/amqp.py
  2. 3 6
      celery/backends/rpc.py
  3. 7 16
      celery/events/__init__.py
  4. 3 8
      celery/utils/time.py
  5. 8 5
      t/unit/backends/test_amqp.py

+ 8 - 11
celery/backends/amqp.py

@@ -13,9 +13,7 @@ from celery import states
 from celery.exceptions import TimeoutError
 from celery.five import range, monotonic
 from celery.utils import deprecated
-from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
-from celery.utils.time import maybe_s_to_ms
 
 from .base import BaseBackend
 
@@ -80,9 +78,6 @@ class AMQPBackend(BaseBackend):
         )
         self.serializer = serializer or conf.result_serializer
         self.auto_delete = auto_delete
-        self.queue_arguments = dictfilter({
-            'x-expires': maybe_s_to_ms(self.expires),
-        })
 
     def _create_exchange(self, name, type='direct', delivery_mode=2):
         return self.Exchange(name=name,
@@ -93,12 +88,14 @@ class AMQPBackend(BaseBackend):
 
     def _create_binding(self, task_id):
         name = self.rkey(task_id)
-        return self.Queue(name=name,
-                          exchange=self.exchange,
-                          routing_key=name,
-                          durable=self.persistent,
-                          auto_delete=self.auto_delete,
-                          queue_arguments=self.queue_arguments)
+        return self.Queue(
+            name=name,
+            exchange=self.exchange,
+            routing_key=name,
+            durable=self.persistent,
+            auto_delete=self.auto_delete,
+            expires=self.expires,
+        )
 
     def revive(self, channel):
         pass

+ 3 - 6
celery/backends/rpc.py

@@ -16,8 +16,6 @@ from celery import current_task
 from celery import states
 from celery._state import task_join_will_block
 from celery.five import items, range
-from celery.utils.functional import dictfilter
-from celery.utils.time import maybe_s_to_ms
 
 from . import base
 from .async import AsyncBackendMixin, BaseResultConsumer
@@ -124,9 +122,6 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
         )
         self.serializer = serializer or conf.result_serializer
         self.auto_delete = auto_delete
-        self.queue_arguments = dictfilter({
-            'x-expires': maybe_s_to_ms(self.expires),
-        })
         self.result_consumer = self.ResultConsumer(
             self, self.app, self.accept,
             self._pending_results, self._pending_messages,
@@ -310,7 +305,9 @@ class RPCBackend(BaseRPCBackend):
     def binding(self):
         return self.Queue(
             self.oid, self.exchange, self.oid,
-            durable=False, auto_delete=True
+            durable=False,
+            auto_delete=True,
+            expires=self.expires,
         )
 
     @cached_property

+ 7 - 16
celery/events/__init__.py

@@ -24,9 +24,8 @@ from kombu.utils.objects import cached_property
 from celery import uuid
 from celery.app import app_or_default
 from celery.five import items
-from celery.utils.functional import dictfilter
 from celery.utils.nodenames import anon_nodename
-from celery.utils.time import adjust_timestamp, utcoffset, maybe_s_to_ms
+from celery.utils.time import adjust_timestamp, utcoffset
 
 __all__ = ['Events', 'Event', 'EventDispatcher', 'EventReceiver']
 
@@ -310,14 +309,17 @@ class EventReceiver(ConsumerMixin):
         self.queue_prefix = queue_prefix or self.app.conf.event_queue_prefix
         self.exchange = get_exchange(
             self.connection or self.app.connection_for_write())
+        if queue_ttl is None:
+            queue_ttl = self.app.conf.event_queue_ttl
+        if queue_expires is None:
+            queue_expires = self.app.conf.event_queue_expires
         self.queue = Queue(
             '.'.join([self.queue_prefix, self.node_id]),
             exchange=self.exchange,
             routing_key=self.routing_key,
             auto_delete=True, durable=False,
-            queue_arguments=self._get_queue_arguments(
-                ttl=queue_ttl, expires=queue_expires,
-            ),
+            message_ttl=queue_ttl,
+            expires=queue_expires,
         )
         self.clock = self.app.clock
         self.adjust_clock = self.clock.adjust
@@ -326,17 +328,6 @@ class EventReceiver(ConsumerMixin):
             accept = {self.app.conf.event_serializer, 'json'}
         self.accept = accept
 
-    def _get_queue_arguments(self, ttl=None, expires=None):
-        conf = self.app.conf
-        return dictfilter({
-            'x-message-ttl': maybe_s_to_ms(
-                ttl if ttl is not None else conf.event_queue_ttl,
-            ),
-            'x-expires': maybe_s_to_ms(
-                expires if expires is not None else conf.event_queue_expires,
-            ),
-        })
-
     def process(self, type, event):
         """Process event by dispatching to configured handler."""
         handler = self.handlers.get(type) or self.handlers.get('*')

+ 3 - 8
celery/utils/time.py

@@ -24,9 +24,9 @@ from .text import pluralize
 __all__ = [
     'LocalTimezone', 'timezone', 'maybe_timedelta',
     'delta_resolution', 'remaining', 'rate', 'weekday',
-    'humanize_seconds', 'maybe_iso8601', 'is_naive', 'make_aware',
-    'localize', 'to_utc', 'maybe_make_aware', 'ffwd', 'utcoffset',
-    'adjust_timestamp', 'maybe_s_to_ms',
+    'humanize_seconds', 'maybe_iso8601', 'is_naive',
+    'make_aware', 'localize', 'to_utc', 'maybe_make_aware',
+    'ffwd', 'utcoffset', 'adjust_timestamp',
 ]
 
 PY3 = sys.version_info[0] == 3
@@ -379,8 +379,3 @@ def utcoffset(time=_time, localtime=_time.localtime):
 def adjust_timestamp(ts, offset, here=utcoffset):
     """Adjust timestamp based on provided utcoffset."""
     return ts - (offset - here()) * 3600
-
-
-def maybe_s_to_ms(v):
-    """Convert seconds to milliseconds, but return None for None."""
-    return int(float(v) * 1000.0) if v is not None else v

+ 8 - 5
t/unit/backends/test_amqp.py

@@ -99,15 +99,18 @@ class test_AMQPBackend:
 
     def test_expires_is_int(self):
         b = self.create_backend(expires=48)
-        assert b.queue_arguments.get('x-expires') == 48 * 1000.0
+        q = b._create_binding('x1y2z3')
+        assert q.expires == 48
 
     def test_expires_is_float(self):
         b = self.create_backend(expires=48.3)
-        assert b.queue_arguments.get('x-expires') == 48.3 * 1000.0
+        q = b._create_binding('x1y2z3')
+        assert q.expires == 48.3
 
     def test_expires_is_timedelta(self):
         b = self.create_backend(expires=timedelta(minutes=1))
-        assert b.queue_arguments.get('x-expires') == 60 * 1000.0
+        q = b._create_binding('x1y2z3')
+        assert q.expires == 60
 
     @mock.sleepdeprived()
     def test_store_result_retries(self):
@@ -246,8 +249,8 @@ class test_AMQPBackend:
         app = self.app
         app.conf.result_expires = None
         b = self.create_backend(expires=None)
-        with pytest.raises(KeyError):
-            b.queue_arguments['x-expires']
+        q = b._create_binding('foo')
+        assert q.expires is None
 
     def test_process_cleanup(self):
         self.create_backend().process_cleanup()