瀏覽代碼

celery.beat: Refactored the persistent schedule into PersistentScheduler, also better merging of tasks on disk and in "registry".

Ask Solem 14 年之前
父節點
當前提交
3580f4c75d
共有 2 個文件被更改,包括 119 次插入59 次删除
  1. 110 51
      celery/beat.py
  2. 9 8
      celery/bin/celerybeat.py

+ 110 - 51
celery/beat.py

@@ -24,6 +24,14 @@ class SchedulingError(Exception):
     """An error occured while scheduling a task."""
 
 
+def maybe_schedule(s, relative=False):
+    if isinstance(s, int):
+        s = timedelta(seconds=s)
+    if isinstance(s, timedelta):
+        return schedule(s, relative)
+    return s
+
+
 class ScheduleEntry(object):
     """An entry in the scheduler.
 
@@ -61,9 +69,10 @@ class ScheduleEntry(object):
     """
 
     def __init__(self, name, schedule, args=(), kwargs={},
-            options={}, last_run_at=None, total_run_count=None):
+            options={}, last_run_at=None, total_run_count=None,
+            relative=False):
         self.name = name
-        self.schedule = schedule
+        self.schedule = maybe_schedule(schedule, relative)
         self.args = args
         self.kwargs = kwargs
         self.options = options
@@ -81,6 +90,18 @@ class ScheduleEntry(object):
                               datetime.now(),
                               self.total_run_count + 1)
 
+    def update(self, other):
+        """Update values from another entry.
+
+        Does only update "editable" fields (schedule, args,
+        kwargs, options).
+
+        """
+        self.schedule = other.schedule
+        self.args = other.args
+        self.kwargs = other.kwargs
+        self.options = other.options
+
     def is_due(self):
         """See :meth:`celery.task.base.PeriodicTask.is_due`."""
         return self.schedule.is_due(self.last_run_at)
@@ -114,19 +135,19 @@ class Scheduler(UserDict):
     """
     Entry = ScheduleEntry
 
-    def __init__(self, schedule=None, logger=None,
-            max_interval=None):
+    def __init__(self, schedule=None, logger=None, max_interval=None,
+            **kwargs):
+        UserDict.__init__(self)
+        if schedule is None:
+            schedule = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
         self.data = schedule
-        if self.data is None:
-            self.data = {}
-        self.logger = logger or log.get_default_logger()
+        self.logger = logger or log.get_default_logger("celery.beat")
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
-
-        self.cleanup()
         self.setup_schedule()
 
     def maybe_due(self, entry, connection=None):
         is_due, next_time_to_run = entry.is_due()
+
         if is_due:
             self.logger.debug("Scheduler: Sending due task %s" % entry.name)
             try:
@@ -160,7 +181,7 @@ class Scheduler(UserDict):
         return min(remaining_times + [self.max_interval])
 
     def reserve(self, entry):
-        new_entry = self.schedule[entry.name] = entry.next()
+        new_entry = self[entry.name] = entry.next()
         return new_entry
 
     def apply_async(self, entry, **kwargs):
@@ -169,8 +190,6 @@ class Scheduler(UserDict):
         # forever.)
         entry = self.reserve(entry)
 
-        print("APPLYING: %s" % (entry, ))
-
         try:
             result = send_task(entry.name, entry.args, entry.kwargs,
                                **entry.options)
@@ -179,49 +198,88 @@ class Scheduler(UserDict):
                     entry.name, exc))
         return result
 
-    def maybe_schedule(self, s, relative=False):
-        if isinstance(s, int):
-            return timedelta(seconds=s)
-        if isinstance(s, timedelta):
-            return schedule(s, relative)
-        return s
-
     def setup_schedule(self):
-        self.data = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
+        pass
 
-    def dict_to_entries(self, dict_):
-        entries = {}
-        for name, entry in dict_.items():
-            relative = entry.pop("relative", None)
-            entry["schedule"] = self.maybe_schedule(entry["schedule"],
-                                                    relative)
-            entries[name] = self.Entry(**entry)
-        return entries
-
-    def cleanup(self):
+    def sync(self):
         pass
 
-    @property
-    def schedule(self):
+    def close(self):
+        self.sync()
+
+    def dict_to_entries(self, dict_):
+        return dict((name, self.Entry(**entry))
+                        for name, entry in dict_.items())
+
+    def get_schedule(self):
         return self.data
 
+    def _set_schedule(self, schedule):
+        self.data = schedule
+
+    def _get_schedule(self):
+        return self.get_schedule()
+
+    schedule = property(_get_schedule, _set_schedule)
+
+
+class PersistentScheduler(Scheduler):
+    persistence = shelve
+
+    _store = None
 
-class ClockService(object):
-    scheduler_cls = Scheduler
-    open_schedule = lambda self, filename: shelve.open(filename)
+    def __init__(self, *args, **kwargs):
+        self.schedule_filename = kwargs.get("schedule_filename")
+        Scheduler.__init__(self, *args, **kwargs)
+
+    def setup_schedule(self):
+        self._store = self.persistence.open(self.schedule_filename)
+        self._diskmerge(self._store, conf.CELERYBEAT_SCHEDULE)
+        self.sync()
+        self.schedule = self._store
+
+    def _diskmerge(self, a, b):
+        A, B = set(a), set(b)
+
+        # Remove items from disk not in the schedule anymore.
+        for key in A ^ B:
+            a.pop(key, None)
+
+        # Update and add new items in the schedule
+        for key in B:
+            entry = self.Entry(**b[key])
+            if a.get(key):
+                a[key].update(entry)
+            else:
+                a[key] = entry
+
+    def sync(self):
+        if self._store is not None:
+            self.logger.debug("CeleryBeat: Syncing schedule to disk...")
+            self._store.sync()
+
+    def close(self):
+        self.sync()
+        self._store.close()
+
+
+class Service(object):
+    scheduler_cls = PersistentScheduler
 
     def __init__(self, logger=None,
             max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
             schedule=conf.CELERYBEAT_SCHEDULE,
             schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
             scheduler_cls=None):
-        self.logger = logger or log.get_default_logger()
         self.max_interval = max_interval
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
+        self.logger = logger or log.get_default_logger(name="celery.beat")
         self.schedule = schedule
+        self.schedule_filename = schedule_filename
+
         self._scheduler = None
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
         silence = self.max_interval < 60 and 10 or 1
         self.debug = log.SilenceRepeated(self.logger.debug,
                                          max_iterations=silence)
@@ -237,19 +295,18 @@ class ClockService(object):
 
         try:
             try:
-                while True:
-                    if self._shutdown.isSet():
-                        break
+                while not self._shutdown.isSet():
                     interval = self.scheduler.tick()
                     self.debug("Celerybeat: Waking up %s." % (
                             humanize_seconds(interval, prefix="in ")))
                     time.sleep(interval)
             except (KeyboardInterrupt, SystemExit):
-                self.sync()
+                self._shutdown.set()
         finally:
             self.sync()
 
     def sync(self):
+        self.scheduler.close()
         self._stopped.set()
 
     def stop(self, wait=False):
@@ -260,45 +317,47 @@ class ClockService(object):
     @property
     def scheduler(self):
         if self._scheduler is None:
+            filename = self.schedule_filename
             self._scheduler = instantiate(self.scheduler_cls,
                                           schedule=self.schedule,
+                                          schedule_filename=filename,
                                           logger=self.logger,
                                           max_interval=self.max_interval)
         return self._scheduler
 
 
 class _Threaded(threading.Thread):
-    """Embedded clock service using threading."""
+    """Embedded task scheduler using threading."""
 
     def __init__(self, *args, **kwargs):
         super(_Threaded, self).__init__()
-        self.clockservice = ClockService(*args, **kwargs)
+        self.service = Service(*args, **kwargs)
         self.setDaemon(True)
 
     def run(self):
-        self.clockservice.start()
+        self.service.start()
 
     def stop(self):
-        self.clockservice.stop(wait=True)
+        self.service.stop(wait=True)
 
 
 class _Process(multiprocessing.Process):
-    """Embedded clock service using multiprocessing."""
+    """Embedded task scheduler using multiprocessing."""
 
     def __init__(self, *args, **kwargs):
         super(_Process, self).__init__()
-        self.clockservice = ClockService(*args, **kwargs)
+        self.service = Service(*args, **kwargs)
 
     def run(self):
         platform.reset_signal("SIGTERM")
-        self.clockservice.start(embedded_process=True)
+        self.service.start(embedded_process=True)
 
     def stop(self):
-        self.clockservice.stop()
+        self.service.stop()
         self.terminate()
 
 
-def EmbeddedClockService(*args, **kwargs):
+def EmbeddedService(*args, **kwargs):
     """Return embedded clock service.
 
     :keyword thread: Run threaded instead of as a separate process.

+ 9 - 8
celery/bin/celerybeat.py

@@ -10,7 +10,7 @@
 
 .. cmdoption:: -S, --scheduler
 
-    Scheduler class to use. Default is celery.beat.Scheduler
+    Scheduler class to use. Default is celery.beat.PersistentScheduler
 
 .. cmdoption:: -f, --logfile
 
@@ -27,10 +27,10 @@ import optparse
 import traceback
 
 import celery
+from celery import beat
 from celery import conf
 from celery import platform
 from celery.log import emergency_error
-from celery.beat import ClockService
 from celery.utils import info
 
 STARTUP_INFO_FMT = """
@@ -53,7 +53,8 @@ OPTION_LIST = (
     optparse.make_option('-S', '--scheduler',
             default=None,
             action="store", dest="scheduler_cls",
-            help="Scheduler class. Default is celery.beat.Scheduler"),
+            help="Scheduler class. Default is "
+                 "celery.beat.PersistentScheduler"),
     optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
             action="store", dest="logfile",
             help="Path to log file."),
@@ -65,7 +66,7 @@ OPTION_LIST = (
 
 
 class Beat(object):
-    ClockService = ClockService
+    Service = beat.Service
 
     def __init__(self, loglevel=conf.CELERYBEAT_LOG_LEVEL,
             logfile=conf.CELERYBEAT_LOG_FILE,
@@ -94,10 +95,10 @@ class Beat(object):
     def start_scheduler(self):
         from celery.log import setup_logger
         logger = setup_logger(self.loglevel, self.logfile, name="celery.beat")
-        beat = self.ClockService(logger=logger,
-                                 max_interval=self.max_interval,
-                                 scheduler_cls=self.scheduler_cls,
-                                 schedule_filename=self.schedule)
+        beat = self.Service(logger=logger,
+                            max_interval=self.max_interval,
+                            scheduler_cls=self.scheduler_cls,
+                            schedule_filename=self.schedule)
 
         try:
             self.install_sync_handler(beat)