Forráskód Böngészése

Adds settings CELERY_EVENT_QUEUE_TTL / CELERY_EVENT_QUEUE_EXPIRES. Closes #1499

Ask Solem 11 éve
szülő
commit
be9252b823

+ 2 - 0
celery/app/defaults.py

@@ -114,6 +114,8 @@ NAMESPACES = {
         'EAGER_PROPAGATES_EXCEPTIONS': Option(False, type='bool'),
         'EAGER_PROPAGATES_EXCEPTIONS': Option(False, type='bool'),
         'ENABLE_UTC': Option(True, type='bool'),
         'ENABLE_UTC': Option(True, type='bool'),
         'EVENT_SERIALIZER': Option('json'),
         'EVENT_SERIALIZER': Option('json'),
+        'EVENT_QUEUE_EXPIRES': Option(None, type='float'),
+        'EVENT_QUEUE_TTL': Option(None, type='float'),
         'IMPORTS': Option((), type='tuple'),
         'IMPORTS': Option((), type='tuple'),
         'INCLUDE': Option((), type='tuple'),
         'INCLUDE': Option((), type='tuple'),
         'IGNORE_RESULT': Option(False, type='bool'),
         'IGNORE_RESULT': Option(False, type='bool'),

+ 5 - 3
celery/backends/amqp.py

@@ -22,7 +22,9 @@ from kombu import Exchange, Queue, Producer, Consumer
 from celery import states
 from celery import states
 from celery.exceptions import TimeoutError
 from celery.exceptions import TimeoutError
 from celery.five import range
 from celery.five import range
+from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
 from celery.utils.log import get_logger
+from celery.utils.timeutils import maybe_s_to_ms
 
 
 from .base import BaseBackend
 from .base import BaseBackend
 
 
@@ -65,7 +67,6 @@ class AMQPBackend(BaseBackend):
         super(AMQPBackend, self).__init__(app, **kwargs)
         super(AMQPBackend, self).__init__(app, **kwargs)
         conf = self.app.conf
         conf = self.app.conf
         self._connection = connection
         self._connection = connection
-        self.queue_arguments = {}
         self.persistent = (conf.CELERY_RESULT_PERSISTENT if persistent is None
         self.persistent = (conf.CELERY_RESULT_PERSISTENT if persistent is None
                            else persistent)
                            else persistent)
         exchange = exchange or conf.CELERY_RESULT_EXCHANGE
         exchange = exchange or conf.CELERY_RESULT_EXCHANGE
@@ -78,8 +79,9 @@ class AMQPBackend(BaseBackend):
         self.expires = None
         self.expires = None
         if 'expires' not in kwargs or kwargs['expires'] is not None:
         if 'expires' not in kwargs or kwargs['expires'] is not None:
             self.expires = self.prepare_expires(kwargs.get('expires'))
             self.expires = self.prepare_expires(kwargs.get('expires'))
-        if self.expires:
-            self.queue_arguments['x-expires'] = int(self.expires * 1000)
+        self.queue_arguments = dictfilter({
+            'x-expires': maybe_s_to_ms(self.expires),
+        })
         self.mutex = threading.Lock()
         self.mutex = threading.Lock()
 
 
     def _create_exchange(self, name, type='direct', persistent=True):
     def _create_exchange(self, name, type='direct', persistent=True):

+ 11 - 2
celery/events/__init__.py

@@ -26,7 +26,8 @@ from kombu.utils import cached_property
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.utils import uuid
 from celery.utils import uuid
-from celery.utils.timeutils import adjust_timestamp, utcoffset
+from celery.utils.functional import dictfilter
+from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
 
 
 event_exchange = Exchange('celeryev', type='topic')
 event_exchange = Exchange('celeryev', type='topic')
 
 
@@ -267,9 +268,17 @@ class EventReceiver(ConsumerMixin):
                            exchange=self.exchange,
                            exchange=self.exchange,
                            routing_key=self.routing_key,
                            routing_key=self.routing_key,
                            auto_delete=True,
                            auto_delete=True,
-                           durable=False)
+                           durable=False,
+                           queue_arguments=self._get_queue_arguments())
         self.adjust_clock = self.app.clock.adjust
         self.adjust_clock = self.app.clock.adjust
 
 
+    def _get_queue_arguments(self):
+        conf = self.app.conf
+        return dictfilter({
+            'x-message-ttl': maybe_s_to_ms(conf.CELERY_EVENT_QUEUE_TTL),
+            'x-expires': maybe_s_to_ms(conf.CELERY_EVENT_QUEUE_EXPIRES),
+        })
+
     def process(self, type, event):
     def process(self, type, event):
         """Process the received event by dispatching it to the appropriate
         """Process the received event by dispatching it to the appropriate
         handler."""
         handler."""

+ 3 - 2
celery/utils/functional.py

@@ -295,6 +295,7 @@ class _regen(UserList, list):
         return list(self.__it)
         return list(self.__it)
 
 
 
 
-def dictfilter(d, **filterkeys):
-    d = dict(d, **filterkeys) if filterkeys else d
+def dictfilter(d=None, **kw):
+    """Removes keys which value is :const:`None`"""
+    d = kw if d is None else (dict(d, **kw) if kw else d)
     return dict((k, v) for k, v in items(d) if v is not None)
     return dict((k, v) for k, v in items(d) if v is not None)

+ 4 - 0
celery/utils/timeutils.py

@@ -360,3 +360,7 @@ def utcoffset():
 
 
 def adjust_timestamp(ts, offset, here=utcoffset):
 def adjust_timestamp(ts, offset, here=utcoffset):
     return ts - (offset - here()) * 3600
     return ts - (offset - here()) * 3600
+
+
+def maybe_s_to_ms(v):
+    return int(float(v) * 1000.0) if v is not None else v

+ 26 - 0
docs/configuration.rst

@@ -1470,6 +1470,32 @@ tracked before they are consumed by a worker.
 
 
 Disabled by default.
 Disabled by default.
 
 
+.. setting:: CELERY_EVENT_QUEUE_TTL
+
+CELERY_EVENT_QUEUE_TTL
+~~~~~~~~~~~~~~~~~~~~~~
+:transports supported: ``amqp``
+
+Message expiry time in seconds (int/float) for when messages sent to a monitor clients
+event queue is deleted (``x-message-ttl``)
+
+For example, if this value is set to 10 then a message delivered to this queue
+will be deleted after 10 seconds.
+
+Disabled by default.
+
+.. setting:: CELERY_EVENT_QUEUE_EXPIRES
+
+CELERY_EVENT_QUEUE_EXPIRES
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+:transports supported: ``amqp``
+
+
+Expiry time in seconds (int/float) for when a monitor clients
+event queue will be deleted (``x-expires``).
+
+Default is never, relying on the queue autodelete setting.
+
 .. setting:: CELERY_EVENT_SERIALIZER
 .. setting:: CELERY_EVENT_SERIALIZER
 
 
 CELERY_EVENT_SERIALIZER
 CELERY_EVENT_SERIALIZER