Преглед на файлове

when embedded inside celeryd, celerybeat now uses a tight loop interval (1 second), this because the service can't shutdown until all threads have woken up.

Ask Solem преди 15 години
родител
ревизия
0a09648fe3
променени са 2 файла, в които са добавени 12 реда и са изтрити 8 реда
  1. 8 7
      celery/beat.py
  2. 4 1
      celery/worker/__init__.py

+ 8 - 7
celery/beat.py

@@ -68,14 +68,13 @@ class Scheduler(UserDict):
         persistent schedule ``celery.beat.schedule``.
 
     """
-    interval = 1
 
     def __init__(self, **kwargs):
 
         attr_defaults = {"registry": lambda: {},
                          "schedule": lambda: {},
-                         "interval": lambda: self.interval,
-                         "logger": log.get_default_logger}
+                         "logger": log.get_default_logger,
+                         "max_interval": conf.CELERYBEAT_MAX_LOOP_INTERVAL}
 
         for attr_name, attr_default_gen in attr_defaults.items():
             if attr_name in kwargs:
@@ -102,7 +101,7 @@ class Scheduler(UserDict):
             if next_time_to_run:
                 remaining_times.append(next_time_to_run)
 
-        return min(remaining_times + [conf.CELERYBEAT_MAX_LOOP_INTERVAL])
+        return min(remaining_times + [self.max_interval])
 
     def get_task(self, name):
         try:
@@ -154,8 +153,10 @@ class ClockService(object):
     registry = registry.tasks
 
     def __init__(self, logger=None, is_detached=False,
+            max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
             schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
         self.logger = logger
+        self.max_interval = max_interval
         self.schedule_filename = schedule_filename
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
@@ -166,10 +167,11 @@ class ClockService(object):
         #atexit.register(schedule.close)
         scheduler = self.scheduler_cls(schedule=schedule,
                                        registry=self.registry,
-                                       logger=self.logger)
+                                       logger=self.logger,
+                                       max_interval=self.max_interval)
         self.logger.debug("ClockService: "
             "Ticking with max interval->%s, schedule->%s" % (
-                    humanize_seconds(conf.CELERYBEAT_MAX_LOOP_INTERVAL),
+                    humanize_seconds(self.max_interval),
                     self.schedule_filename))
 
         synced = [False]
@@ -181,7 +183,6 @@ class ClockService(object):
                 synced[0] = True
                 self._stopped.set()
 
-
         try:
             while True:
                 if self._shutdown.isSet():

+ 4 - 1
celery/worker/__init__.py

@@ -134,8 +134,11 @@ class WorkController(object):
 
         self.clockservice = None
         if self.embed_clockservice:
+            # Need a tight loop interval when embedded so the program
+            # can be stopped in a sensible short time.
             self.clockservice = ClockServiceThread(logger=self.logger,
-                                                is_detached=self.is_detached)
+                                                is_detached=self.is_detached,
+                                                max_interval=1)
 
         # The order is important here;
         #   the first in the list is the first to start,