Преглед изворни кода

Gossip: Now sets x-message-ttl for event queue to heartbeat_interval s. (Issue #2005)

Ask Solem пре 10 година
родитељ
комит
ca4c9b00e0
2 измењених фајлова са 23 додато и 12 уклоњено
  1. 18 10
      celery/events/__init__.py
  2. 5 2
      celery/worker/consumer.py

+ 18 - 10
celery/events/__init__.py

@@ -297,7 +297,7 @@ class EventReceiver(ConsumerMixin):
 
     def __init__(self, channel, handlers=None, routing_key='#',
                  node_id=None, app=None, queue_prefix='celeryev',
-                 accept=None):
+                 accept=None, queue_ttl=None, queue_expires=None):
         self.app = app_or_default(app or self.app)
         self.channel = maybe_channel(channel)
         self.handlers = {} if handlers is None else handlers
@@ -305,12 +305,15 @@ class EventReceiver(ConsumerMixin):
         self.node_id = node_id or uuid()
         self.queue_prefix = queue_prefix
         self.exchange = get_exchange(self.connection or self.app.connection())
-        self.queue = Queue('.'.join([self.queue_prefix, self.node_id]),
-                           exchange=self.exchange,
-                           routing_key=self.routing_key,
-                           auto_delete=True,
-                           durable=False,
-                           queue_arguments=self._get_queue_arguments())
+        self.queue = Queue(
+            '.'.join([self.queue_prefix, self.node_id]),
+            exchange=self.exchange,
+            routing_key=self.routing_key,
+            auto_delete=True, durable=False,
+            queue_arguments=self._get_queue_arguments(
+                ttl=queue_ttl, expires=queue_expires,
+            ),
+        )
         self.clock = self.app.clock
         self.adjust_clock = self.clock.adjust
         self.forward_clock = self.clock.forward
@@ -318,11 +321,16 @@ class EventReceiver(ConsumerMixin):
             accept = {self.app.conf.CELERY_EVENT_SERIALIZER, 'json'}
         self.accept = accept
 
-    def _get_queue_arguments(self):
+    def _get_queue_arguments(self, ttl=None, expires=None):
         conf = self.app.conf
         return dictfilter({
-            'x-message-ttl': maybe_s_to_ms(conf.CELERY_EVENT_QUEUE_TTL),
-            'x-expires': maybe_s_to_ms(conf.CELERY_EVENT_QUEUE_EXPIRES),
+            'x-message-ttl': maybe_s_to_ms(
+                ttl if ttl is not None else conf.CELERY_EVENT_QUEUE_TTL,
+            ),
+            'x-expires': maybe_s_to_ms(
+                expires if expires is not None
+                else conf.CELERY_EVENT_QUEUE_EXPIRES,
+            ),
         })
 
     def process(self, type, event):

+ 5 - 2
celery/worker/consumer.py

@@ -685,7 +685,8 @@ class Gossip(bootsteps.ConsumerStep):
     )
     compatible_transports = {'amqp', 'redis'}
 
-    def __init__(self, c, without_gossip=False, interval=5.0, **kwargs):
+    def __init__(self, c, without_gossip=False,
+                 interval=5.0, heartbeat_interval=2.0, **kwargs):
         self.enabled = not without_gossip and self.compatible_transport(c.app)
         self.app = c.app
         c.gossip = self
@@ -704,6 +705,7 @@ class Gossip(bootsteps.ConsumerStep):
                 c._mutex = DummyLock()
             self.update_state = self.state.event
         self.interval = interval
+        self.heartbeat_interval = heartbeat_interval
         self._tref = None
         self.consensus_requests = defaultdict(list)
         self.consensus_replies = {}
@@ -802,7 +804,8 @@ class Gossip(bootsteps.ConsumerStep):
 
     def get_consumers(self, channel):
         self.register_timer()
-        ev = self.Receiver(channel, routing_key='worker.#')
+        ev = self.Receiver(channel, routing_key='worker.#',
+                           queue_ttl=self.heartbeat_interval)
         return [kombu.Consumer(
             channel,
             queues=[ev.queue],