Explorar o código

Add option to worker to control heartbeat interval.

Thanks to my colleague Craig Northway for the patch.
Matthew Duggan %!s(int64=11) %!d(string=hai) anos
pai
achega
3cbe86c9e5

+ 5 - 0
celery/bin/worker.py

@@ -86,6 +86,10 @@ The :program:`celery worker` command (previously known as ``celeryd``)
 
     Do not send event heartbeats.
 
+.. cmdoption:: --heartbeat-interval
+
+    Interval in seconds at which to send worker heartbeat
+
 .. cmdoption:: --purge
 
     Purges all waiting tasks before the daemon is started.
@@ -245,6 +249,7 @@ class worker(Command):
             Option('--without-gossip', action='store_true', default=False),
             Option('--without-mingle', action='store_true', default=False),
             Option('--without-heartbeat', action='store_true', default=False),
+            Option('--heartbeat-interval', type='int'),
             Option('-O', dest='optimization'),
             Option('-D', '--detach', action='store_true'),
         ) + daemon_options() + tuple(self.app.user_options['worker'])

+ 3 - 1
celery/tests/bin/test_worker.py

@@ -443,8 +443,10 @@ class test_funs(WorkerAppCase):
     def test_parse_options(self):
         cmd = worker()
         cmd.app = self.app
-        opts, args = cmd.parse_options('worker', ['--concurrency=512'])
+        opts, args = cmd.parse_options('worker', ['--concurrency=512',
+                                       '--heartbeat-interval=10'])
         self.assertEqual(opts.concurrency, 512)
+        self.assertEqual(opts.heartbeat_interval, 10)
 
     @disable_stdouts
     def test_main(self):

+ 20 - 1
celery/tests/worker/test_consumer.py

@@ -164,11 +164,30 @@ class test_Heart(AppCase):
         with patch('celery.worker.heartbeat.Heart') as hcls:
             h = Heart(c)
             self.assertTrue(h.enabled)
+            self.assertEqual(h.heartbeat_interval, None)
             self.assertIsNone(c.heart)
 
             h.start(c)
             self.assertTrue(c.heart)
-            hcls.assert_called_with(c.timer, c.event_dispatcher)
+            hcls.assert_called_with(c.timer, c.event_dispatcher,
+                                    h.heartbeat_interval)
+            c.heart.start.assert_called_with()
+
+    def test_start_heartbeat_interval(self):
+        c = Mock()
+        c.timer = Mock()
+        c.event_dispatcher = Mock()
+
+        with patch('celery.worker.heartbeat.Heart') as hcls:
+            h = Heart(c, False, 20)
+            self.assertTrue(h.enabled)
+            self.assertEqual(h.heartbeat_interval, 20)
+            self.assertIsNone(c.heart)
+
+            h.start(c)
+            self.assertTrue(c.heart)
+            hcls.assert_called_with(c.timer, c.event_dispatcher,
+                                    h.heartbeat_interval)
             c.heart.start.assert_called_with()
 
 

+ 5 - 2
celery/worker/consumer.py

@@ -534,12 +534,15 @@ class Events(bootsteps.StartStopStep):
 class Heart(bootsteps.StartStopStep):
     requires = (Events, )
 
-    def __init__(self, c, without_heartbeat=False, **kwargs):
+    def __init__(self, c, without_heartbeat=False, heartbeat_interval=None,
+                 **kwargs):
         self.enabled = not without_heartbeat
+        self.heartbeat_interval = heartbeat_interval
         c.heart = None
 
     def start(self, c):
-        c.heart = heartbeat.Heart(c.timer, c.event_dispatcher)
+        c.heart = heartbeat.Heart(c.timer, c.event_dispatcher,
+            self.heartbeat_interval)
         c.heart.start()
 
     def stop(self, c):