Browse Source

celerybeat is running fine

Ask Solem 15 years ago
parent
commit
3c306c6e53
1 changed files with 47 additions and 12 deletions
  1. 47 12
      celery/beat.py

+ 47 - 12
celery/beat.py

@@ -57,19 +57,35 @@ class Scheduler(UserDict):
     """
     interval = 1
 
-    def __init__(self, registry=None, schedule=None, interval=None,
-            logger=None):
-        self.registry = registry or {}
-        self.data = schedule or {}
-        if interval is not None:
-            self.interval = interval
+    def __init__(self, **kwargs):
+
+        def _get_default_logger():
+            import multiprocessing
+            return multiprocessing.get_logger()
+
+        attr_defaults = {"registry": lambda: {},
+                         "schedule": lambda: {},
+                         "interval": lambda: self.interval,
+                         "logger": _get_default_logger}
+
+        for attr_name, attr_default_gen in attr_defaults.items():
+            if attr_name in kwargs:
+                attr_value = kwargs[attr_name]
+            else:
+                attr_value = attr_default_gen()
+            setattr(self, attr_name, attr_value)
+
         self.schedule_registry()
 
     def tick(self):
         """Run a tick, that is one iteration of the scheduler.
         Executes all due tasks."""
-        return [(entry.task, entry.execute())
-                    for entry in self.get_due_tasks()]
+        for entry in self.get_due_tasks():
+            self.logger.debug("Scheduler: Sending due task %s" % (
+                    entry.task.name))
+            result = entry.execute()
+            self.logger.debug("Scheduler: %s sent. id->%s" % (
+                    entry.task_name, result.task_id))
 
     def get_due_tasks(self):
         """Get all the schedule entries that are due to execution."""
@@ -79,6 +95,10 @@ class Scheduler(UserDict):
         """Add the current contents of the registry to the schedule."""
         periodic_tasks = self.registry.get_all_periodic()
         for name, task in self.registry.get_all_periodic().items():
+            if name not in self.schedule:
+                self.logger.debug(
+                        "Scheduler: Adding periodic task %s to schedule" % (
+                            task.name))
             self.schedule.setdefault(name, ScheduleEntry(task))
 
     @property
@@ -97,10 +117,24 @@ class ClockService(object):
         self._stopped = threading.Event()
 
     def start(self):
+        self.logger.info("ClockService: Starting...")
         schedule = shelve.open(filename=self.schedule_filename)
-        atexit.register(schedule.close)
+        #atexit.register(schedule.close)
         scheduler = self.scheduler_cls(schedule=schedule,
-                                       registry=self.registry)
+                                       registry=self.registry,
+                                       logger=self.logger)
+        self.logger.debug(
+                "ClockService: Ticking with interval->%d, schedule->%s" % (
+                    scheduler.interval, self.schedule_filename))
+
+        synced = [False]
+        def _stop():
+            if not synced[0]:
+                self.logger.debug("ClockService: Syncing schedule to disk...")
+                schedule.sync()
+                schedule.close()
+                synced[0] = True
+                self._stopped.set()
 
         try:
             while True:
@@ -108,9 +142,10 @@ class ClockService(object):
                     break
                 scheduler.tick()
                 time.sleep(scheduler.interval)
+        except (KeyboardInterrupt, SystemExit):
+            _stop()
         finally:
-            schedule.close()
-            self._stopped.set()
+            _stop()
 
     def stop(self, wait=False):
         self._shutdown.set()