Selaa lähdekoodia

Added CELERYD_ETA_SCHEDULER_PRECISION to control the minimum sleep interval between scheduler ticks.

A good value would be a float between 0 and 1. If the value is 0.8,
it means it will take at most 0.8s for a task with its eta met to be moved
to the ready queue.
Ask Solem 14 vuotta sitten
vanhempi
commit
6ca8aab89f
3 muutettua tiedostoa jossa 8 lisäystä ja 3 poistoa
  1. 2 0
      celery/conf.py
  2. 1 1
      celery/tests/test_worker_controllers.py
  3. 5 2
      celery/worker/controllers.py

+ 2 - 0
celery/conf.py

@@ -58,6 +58,7 @@ _DEFAULTS = {
     "CELERYD_LOG_LEVEL": "WARN",
     "CELERYD_LOG_FILE": None, # stderr
     "CELERYD_STATE_DB": None,
+    "CELERYD_ETA_SCHEDULER_PRECISION": 1,
     "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
     "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
     "CELERYBEAT_LOG_LEVEL": "INFO",
@@ -167,6 +168,7 @@ CELERYD_POOL = _get("CELERYD_POOL")
 CELERYD_LISTENER = _get("CELERYD_LISTENER")
 CELERYD_MEDIATOR = _get("CELERYD_MEDIATOR")
 CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
+CELERYD_ETA_SCHEDULER_PRECISION = _get("CELERYD_ETA_SCHEDULER_PRECISION")
 
 # :--- Email settings                               <-   --   --- - ----- -- #
 ADMINS = _get("ADMINS")

+ 1 - 1
celery/tests/test_worker_controllers.py

@@ -117,7 +117,7 @@ class TestScheduleController(unittest.TestCase):
         try:
             for i in times:
                 c.on_iteration()
-                res = i is None and 1 or i
+                res = i or 1
                 self.assertEqual(slept[0], res)
         finally:
             time.sleep = old_sleep

+ 5 - 2
celery/worker/controllers.py

@@ -7,6 +7,7 @@ import time
 import threading
 from Queue import Empty as QueueEmpty
 
+from celery import conf
 from celery import log
 
 
@@ -101,10 +102,12 @@ class Mediator(BackgroundThread):
 class ScheduleController(BackgroundThread):
     """Schedules tasks with an ETA by moving them to the bucket queue."""
 
-    def __init__(self, eta_schedule, logger=None):
+    def __init__(self, eta_schedule, logger=None,
+            precision=None):
         super(ScheduleController, self).__init__()
         self.logger = logger or log.get_default_logger()
         self._scheduler = iter(eta_schedule)
+        self.precision = precision or conf.CELERYD_ETA_SCHEDULER_PRECISION
         self.debug = log.SilenceRepeated(self.logger.debug, max_iterations=10)
 
     def on_iteration(self):
@@ -112,4 +115,4 @@ class ScheduleController(BackgroundThread):
         delay = self._scheduler.next()
         self.debug("ScheduleController: Scheduler wake-up"
                 "ScheduleController: Next wake-up eta %s seconds..." % delay)
-        time.sleep(delay or 1)
+        time.sleep(delay or self.precision)