Bläddra i källkod

Merge pull request #1876 from epantry/sync-scheduled-tasks

setting for optionally syncing on every scheduled task execution
Ask Solem Hoel 11 år sedan
förälder
incheckning
153217cdc7
4 ändrade filer med 53 tillägg och 2 borttagningar
  1. 1 0
      celery/app/defaults.py
  2. 8 1
      celery/beat.py
  3. 34 1
      celery/tests/app/test_beat.py
  4. 10 0
      docs/configuration.rst

+ 1 - 0
celery/app/defaults.py

@@ -196,6 +196,7 @@ NAMESPACES = {
         'SCHEDULE': Option({}, type='dict'),
         'SCHEDULER': Option('celery.beat:PersistentScheduler'),
         'SCHEDULE_FILENAME': Option('celerybeat-schedule'),
+        'SYNC_EVERY': Option(0, type='int'),
         'MAX_LOOP_INTERVAL': Option(0, type='float'),
         'LOG_LEVEL': Option('INFO', deprecate_by='2.4', remove_by='4.0',
                             alt='--loglevel argument'),

+ 8 - 1
celery/beat.py

@@ -162,6 +162,7 @@ class Scheduler(object):
     sync_every = 3 * 60
 
     _last_sync = None
+    _sync_every = 0
 
     logger = logger  # compat
 
@@ -220,7 +221,11 @@ class Scheduler(object):
 
     def should_sync(self):
         return (not self._last_sync or
-                (monotonic() - self._last_sync) > self.sync_every)
+               (monotonic() - self._last_sync) > self.sync_every) \
+               or \
+               (self.app.conf.CELERYBEAT_SYNC_EVERY and
+                self._sync_every >= self.app.conf.CELERYBEAT_SYNC_EVERY)
+
 
     def reserve(self, entry):
         new_entry = self.schedule[entry.name] = next(entry)
@@ -247,6 +252,7 @@ class Scheduler(object):
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
                     entry, exc=exc)), sys.exc_info()[2])
         finally:
+            self._sync_every += 1
             if self.should_sync():
                 self._do_sync()
         return result
@@ -263,6 +269,7 @@ class Scheduler(object):
             self.sync()
         finally:
             self._last_sync = monotonic()
+            self._sync_every = 0
 
     def sync(self):
         pass

+ 34 - 1
celery/tests/app/test_beat.py

@@ -162,7 +162,7 @@ class test_Scheduler(AppCase):
         scheduler.apply_async(scheduler.Entry(task=foo.name, app=self.app))
         self.assertTrue(foo.apply_async.called)
 
-    def test_apply_async_should_not_sync(self):
+    def test_should_sync(self):
 
         @self.app.task(shared=False)
         def not_sync():
@@ -181,6 +181,39 @@ class test_Scheduler(AppCase):
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
         self.assertFalse(s._do_sync.called)
 
+    def test_should_sync_increments_sync_every_counter(self):
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 2
+
+        @self.app.task(shared=False)
+        def not_sync():
+            pass
+        not_sync.apply_async = Mock()
+
+        s = mScheduler(app=self.app)
+        s._do_sync = Mock()
+
+        s.apply_async(s.Entry(task=not_sync.name, app=self.app))
+        self.assertEqual(s._sync_every, 1)
+        s.apply_async(s.Entry(task=not_sync.name, app=self.app))
+        s._do_sync.assert_called_with()
+
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 0
+
+    def test_sync_task_counter_resets_on_do_sync(self):
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 1
+
+        @self.app.task(shared=False)
+        def not_sync():
+            pass
+        not_sync.apply_async = Mock()
+
+        s = mScheduler(app=self.app)
+
+        s.apply_async(s.Entry(task=not_sync.name, app=self.app))
+        self.assertEqual(s._sync_every, 0)
+
+        self.app.conf.CELERYBEAT_SYNC_EVERY = 0
+
     @patch('celery.app.base.Celery.send_task')
     def test_send_task(self, send_task):
         b = beat.Scheduler(app=self.app)

+ 10 - 0
docs/configuration.rst

@@ -1818,6 +1818,16 @@ Can also be set via the :option:`--schedule` argument to
 
 .. setting:: CELERYBEAT_MAX_LOOP_INTERVAL
 
+CELERYBEAT_SYNC_EVERY
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The number of async scheduled tasks that should be run prior to forcing a sync.
+Defaults to 0 (sync based on timing - default of 3 minutes as determined by
+scheduler.sync_every). If set to 1, beats will call sync after every task
+execution.
+
+.. setting:: CELERYBEAT_SYNC_EVERY
+
 CELERYBEAT_MAX_LOOP_INTERVAL
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~