Browse Source

Heartbeat check is now heartbeat / 2

Ask Solem 12 years ago
parent
commit
fa11f8a5a2
1 changed files with 15 additions and 10 deletions
  1. 15 10
      celery/worker/consumer.py

+ 15 - 10
celery/worker/consumer.py

@@ -100,6 +100,9 @@ from .heartbeat import Heart
 RUN = 0x1
 CLOSE = 0x2
 
+#: Heartbeat check is called every heartbeat_seconds' / rate'.
+AMQHEARTBEAT_RATE = 2.0
+
 #: Prefetch count can't exceed short.
 PREFETCH_COUNT_MAX = 0xFFFF
 
@@ -300,7 +303,7 @@ class Consumer(object):
     def __init__(self, ready_queue,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
-            timer=None, controller=None, hub=None, amq_heartbeat=None,
+            timer=None, controller=None, hub=None, amqheartbeat=None,
             **kwargs):
         self.app = app_or_default(app)
         self.connection = None
@@ -333,11 +336,11 @@ class Consumer(object):
             hub.on_init.append(self.on_poll_init)
         self.hub = hub
         self._quick_put = self.ready_queue.put
-        self.amq_heartbeat = amq_heartbeat
-        if self.amq_heartbeat is None:
-            self.amq_heartbeat = self.app.conf.BROKER_HEARTBEAT
+        self.amqheartbeat = amqheartbeat
+        if self.amqheartbeat is None:
+            self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
         if not hub:
-            self.amq_heartbeat = 0
+            self.amqheartbeat = 0
 
     def update_strategies(self):
         S = self.strategies
@@ -371,7 +374,8 @@ class Consumer(object):
         hub.update_readers(self.connection.eventmap)
         self.connection.transport.on_poll_init(hub.poller)
 
-    def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
+    def consume_messages(self, sleep=sleep, min=min, Empty=Empty,
+            hbrate=AMQHEARTBEAT_RATE):
         """Consume messages forever (or until an exception is raised)."""
 
         with self.hub as hub:
@@ -383,7 +387,7 @@ class Consumer(object):
             fire_timers = hub.fire_timers
             scheduled = hub.timer._queue
             connection = self.connection
-            hb = self.amq_heartbeat
+            hb = self.amqheartbeat
             hbtick = getattr(connection.connection, 'heartbeat_tick')
             on_poll_start = connection.transport.on_poll_start
             strategies = self.strategies
@@ -392,7 +396,8 @@ class Consumer(object):
             keep_draining = connection.transport.nb_keep_draining
 
             if hb and hbtick is not None:
-                hub.timer.apply_interval(hb * 1000.0, hbtick)
+                hub.timer.apply_interval(
+                    hb * 1000.0 / hbrate, hbtick, (hbrate, ))
 
             def on_task_received(body, message):
                 if on_task_callbacks:
@@ -447,7 +452,7 @@ class Consumer(object):
                                             handlermap[fileno](fileno, event)
                                         except KeyError:
                                             pass
-                            except Empty:
+                            except (KeyError, Empty):
                                 continue
                             except socket.error:
                                 if self._state != CLOSE:  # pragma: no cover
@@ -743,7 +748,7 @@ class Consumer(object):
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.
-        conn = self.app.connection(heartbeat=self.amq_heartbeat)
+        conn = self.app.connection(heartbeat=self.amqheartbeat)
         if not self.app.conf.BROKER_CONNECTION_RETRY:
             # retry disabled, just call connect directly.
             conn.connect()