Bläddra i källkod

Support AMQP heartbeats (if using kombu using_pyamqp branch)

Ask Solem 12 år sedan
förälder
incheckning
8ec2beb867
3 ändrade filer med 17 tillägg och 4 borttagningar
  1. 3 2
      celery/app/base.py
  2. 1 0
      celery/app/defaults.py
  3. 13 2
      celery/worker/consumer.py

+ 3 - 2
celery/app/base.py

@@ -197,7 +197,7 @@ class Celery(object):
     def connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, transport=None,
-            transport_options=None, **kwargs):
+            transport_options=None, heartbeat=None, **kwargs):
         conf = self.conf
         return self.amqp.Connection(
                     hostname or conf.BROKER_HOST,
@@ -209,7 +209,8 @@ class Celery(object):
                     insist=self.either('BROKER_INSIST', insist),
                     ssl=self.either('BROKER_USE_SSL', ssl),
                     connect_timeout=self.either(
-                                'BROKER_CONNECTION_TIMEOUT', connect_timeout),
+                        'BROKER_CONNECTION_TIMEOUT', connect_timeout),
+                    heartbeat=heartbeat,
                     transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
                                            **transport_options or {}))
     broker_connection = connection

+ 1 - 0
celery/app/defaults.py

@@ -67,6 +67,7 @@ NAMESPACES = {
         'CONNECTION_TIMEOUT': Option(4, type='float'),
         'CONNECTION_RETRY': Option(True, type='bool'),
         'CONNECTION_MAX_RETRIES': Option(100, type='int'),
+        'HEARTBEAT': Option(3, type='int'),
         'POOL_LIMIT': Option(10, type='int'),
         'INSIST': Option(False, type='bool',
                          deprecate_by='2.4', remove_by='4.0'),

+ 13 - 2
celery/worker/consumer.py

@@ -300,7 +300,8 @@ class Consumer(object):
     def __init__(self, ready_queue,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
-            timer=None, controller=None, hub=None, **kwargs):
+            timer=None, controller=None, hub=None, amq_heartbeat=None,
+            **kwargs):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
@@ -332,6 +333,11 @@ class Consumer(object):
             hub.on_init.append(self.on_poll_init)
         self.hub = hub
         self._quick_put = self.ready_queue.put
+        self.amq_heartbeat = amq_heartbeat
+        if self.amq_heartbeat is None:
+            self.amq_heartbeat = self.app.conf.BROKER_HEARTBEAT
+        if not hub:
+            self.amq_heartbeat = 0
 
     def update_strategies(self):
         S = self.strategies
@@ -377,12 +383,17 @@ class Consumer(object):
             fire_timers = hub.fire_timers
             scheduled = hub.timer._queue
             connection = self.connection
+            hb = self.amq_heartbeat
+            hbtick = getattr(connection.connection, 'heartbeat_tick')
             on_poll_start = connection.transport.on_poll_start
             strategies = self.strategies
             drain_nowait = connection.drain_nowait
             on_task_callbacks = hub.on_task
             keep_draining = connection.transport.nb_keep_draining
 
+            if hb and hbtick is not None:
+                hub.timer.apply_interval(hb * 1000.0, hbtick)
+
             def on_task_received(body, message):
                 if on_task_callbacks:
                     [callback() for callback in on_task_callbacks]
@@ -732,7 +743,7 @@ class Consumer(object):
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.
-        conn = self.app.connection()
+        conn = self.app.connection(heartbeat=self.amq_heartbeat)
         if not self.app.conf.BROKER_CONNECTION_RETRY:
             # retry disabled, just call connect directly.
             conn.connect()