Ask Solem пре 11 година
родитељ
комит
108adeef13
3 измењених фајлова са 27 додато и 19 уклоњено
  1. 16 10
      celery/beat.py
  2. 4 2
      celery/tests/app/test_beat.py
  3. 7 7
      docs/configuration.rst

+ 16 - 10
celery/beat.py

@@ -161,18 +161,24 @@ class Scheduler(object):
     #: How often to sync the schedule (3 minutes by default)
     #: How often to sync the schedule (3 minutes by default)
     sync_every = 3 * 60
     sync_every = 3 * 60
 
 
+    #: How many tasks can be called before a sync is forced.
+    sync_every_tasks = None
+
     _last_sync = None
     _last_sync = None
-    _sync_every = 0
+    _tasks_since_sync = 0
 
 
     logger = logger  # compat
     logger = logger  # compat
 
 
     def __init__(self, app, schedule=None, max_interval=None,
     def __init__(self, app, schedule=None, max_interval=None,
-                 Publisher=None, lazy=False, **kwargs):
+                 Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
         self.app = app
         self.app = app
         self.data = maybe_evaluate({} if schedule is None else schedule)
         self.data = maybe_evaluate({} if schedule is None else schedule)
         self.max_interval = (max_interval
         self.max_interval = (max_interval
                              or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
                              or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
                              or self.max_interval)
                              or self.max_interval)
+        self.sync_every_tasks = (
+            app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
+            else sync_every_tasks)
         self.Publisher = Publisher or app.amqp.TaskProducer
         self.Publisher = Publisher or app.amqp.TaskProducer
         if not lazy:
         if not lazy:
             self.setup_schedule()
             self.setup_schedule()
@@ -220,12 +226,12 @@ class Scheduler(object):
         return min(remaining_times + [self.max_interval])
         return min(remaining_times + [self.max_interval])
 
 
     def should_sync(self):
     def should_sync(self):
-        return (not self._last_sync or
-               (monotonic() - self._last_sync) > self.sync_every) \
-               or \
-               (self.app.conf.CELERYBEAT_SYNC_EVERY and
-                self._sync_every >= self.app.conf.CELERYBEAT_SYNC_EVERY)
-
+        return (
+            (not self._last_sync or
+               (monotonic() - self._last_sync) > self.sync_every) or
+            (self.sync_every_tasks and
+                self._tasks_since_sync >= self.sync_every_tasks)
+        )
 
 
     def reserve(self, entry):
     def reserve(self, entry):
         new_entry = self.schedule[entry.name] = next(entry)
         new_entry = self.schedule[entry.name] = next(entry)
@@ -252,7 +258,7 @@ class Scheduler(object):
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
                 "Couldn't apply scheduled task {0.name}: {exc}".format(
                     entry, exc=exc)), sys.exc_info()[2])
                     entry, exc=exc)), sys.exc_info()[2])
         finally:
         finally:
-            self._sync_every += 1
+            self._tasks_since_sync += 1
             if self.should_sync():
             if self.should_sync():
                 self._do_sync()
                 self._do_sync()
         return result
         return result
@@ -269,7 +275,7 @@ class Scheduler(object):
             self.sync()
             self.sync()
         finally:
         finally:
             self._last_sync = monotonic()
             self._last_sync = monotonic()
-            self._sync_every = 0
+            self._tasks_since_sync = 0
 
 
     def sync(self):
     def sync(self):
         pass
         pass

+ 4 - 2
celery/tests/app/test_beat.py

@@ -190,10 +190,11 @@ class test_Scheduler(AppCase):
         not_sync.apply_async = Mock()
         not_sync.apply_async = Mock()
 
 
         s = mScheduler(app=self.app)
         s = mScheduler(app=self.app)
+        self.assertEqual(s.sync_every_tasks, 2)
         s._do_sync = Mock()
         s._do_sync = Mock()
 
 
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
-        self.assertEqual(s._sync_every, 1)
+        self.assertEqual(s._tasks_since_sync, 1)
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
         s._do_sync.assert_called_with()
         s._do_sync.assert_called_with()
 
 
@@ -208,9 +209,10 @@ class test_Scheduler(AppCase):
         not_sync.apply_async = Mock()
         not_sync.apply_async = Mock()
 
 
         s = mScheduler(app=self.app)
         s = mScheduler(app=self.app)
+        self.assertEqual(s.sync_every_tasks, 1)
 
 
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
         s.apply_async(s.Entry(task=not_sync.name, app=self.app))
-        self.assertEqual(s._sync_every, 0)
+        self.assertEqual(s._tasks_since_sync, 0)
 
 
         self.app.conf.CELERYBEAT_SYNC_EVERY = 0
         self.app.conf.CELERYBEAT_SYNC_EVERY = 0
 
 

+ 7 - 7
docs/configuration.rst

@@ -1816,17 +1816,18 @@ suffix `.db` may be appended to the file name (depending on Python version).
 Can also be set via the :option:`--schedule` argument to
 Can also be set via the :option:`--schedule` argument to
 :mod:`~celery.bin.beat`.
 :mod:`~celery.bin.beat`.
 
 
-.. setting:: CELERYBEAT_MAX_LOOP_INTERVAL
+.. setting:: CELERYBEAT_SYNC_EVERY
 
 
 CELERYBEAT_SYNC_EVERY
 CELERYBEAT_SYNC_EVERY
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~~~~~~~~~~~~~~~~~~~~
 
 
-The number of async scheduled tasks that should be run prior to forcing a sync.
+The number of periodic tasks that can be called before another database sync
+is issued.
 Defaults to 0 (sync based on timing - default of 3 minutes as determined by
 Defaults to 0 (sync based on timing - default of 3 minutes as determined by
-scheduler.sync_every). If set to 1, beats will call sync after every task
-execution.
+scheduler.sync_every). If set to 1, beat will call sync after every task
+message sent.
 
 
-.. setting:: CELERYBEAT_SYNC_EVERY
+.. setting:: CELERYBEAT_MAX_LOOP_INTERVAL
 
 
 CELERYBEAT_MAX_LOOP_INTERVAL
 CELERYBEAT_MAX_LOOP_INTERVAL
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -1834,7 +1835,6 @@ CELERYBEAT_MAX_LOOP_INTERVAL
 The maximum number of seconds :mod:`~celery.bin.beat` can sleep
 The maximum number of seconds :mod:`~celery.bin.beat` can sleep
 between checking the schedule.
 between checking the schedule.
 
 
-
 The default for this value is scheduler specific.
 The default for this value is scheduler specific.
 For the default celery beat scheduler the value is 300 (5 minutes),
 For the default celery beat scheduler the value is 300 (5 minutes),
 but for e.g. the django-celery database scheduler it is 5 seconds
 but for e.g. the django-celery database scheduler it is 5 seconds