Преглед на файлове

Celerybeat fixes: Schedule is no longer a subclass of UserDict, and the implementation is cleaner

Ask Solem преди 14 години
родител
ревизия
fc426d5fb3
променени са 3 файла, в които са добавени 39 реда и са изтрити 50 реда
  1. 27 37
      celery/beat.py
  2. 8 13
      celery/tests/test_app/test_beat.py
  3. 4 0
      celery/tests/test_task/test_task.py

+ 27 - 37
celery/beat.py

@@ -15,6 +15,7 @@ except ImportError:
 
 from datetime import datetime
 
+from celery import __version__
 from celery import platforms
 from celery import registry
 from celery import signals
@@ -22,7 +23,6 @@ from celery.app import app_or_default
 from celery.log import SilenceRepeated
 from celery.schedules import maybe_schedule, crontab
 from celery.utils import cached_property, instantiate, maybe_promise
-from celery.utils.compat import UserDict
 from celery.utils.timeutils import humanize_seconds
 
 
@@ -115,7 +115,7 @@ class ScheduleEntry(object):
                                                    self.schedule)
 
 
-class Scheduler(UserDict):
+class Scheduler(object):
     """Scheduler for periodic tasks.
 
     :keyword schedule: see :attr:`schedule`.
@@ -139,7 +139,6 @@ class Scheduler(UserDict):
 
     def __init__(self, schedule=None, logger=None, max_interval=None,
             app=None, Publisher=None, lazy=False, **kwargs):
-        UserDict.__init__(self)
         if schedule is None:
             schedule = {}
         self.app = app_or_default(app)
@@ -243,30 +242,34 @@ class Scheduler(UserDict):
     def _maybe_entry(self, name, entry):
         if isinstance(entry, self.Entry):
             return entry
-        return self.Entry(name, **entry)
+        return self.Entry(**dict(entry, name=name))
 
     def update_from_dict(self, dict_):
-        self.update(dict((name, self._maybe_entry(name, entry))
-                            for name, entry in dict_.items()))
+        self.schedule.update(dict((name, self._maybe_entry(name, entry))
+                                for name, entry in dict_.items()))
 
     def merge_inplace(self, b):
-        A, B = set(self.keys()), set(b.keys())
+        schedule = self.schedule
+        A, B = set(schedule.keys()), set(b.keys())
 
         # Remove items from disk not in the schedule anymore.
         for key in A ^ B:
-            self.pop(key, None)
+            schedule.pop(key, None)
 
         # Update and add new items in the schedule
         for key in B:
-            entry = self.Entry(**dict(b[key]))
-            if self.get(key):
-                self[key].update(entry)
+            entry = self.Entry(**dict(b[key], name=key))
+            if schedule.get(key):
+                schedule[key].update(entry)
             else:
-                self[key] = entry
+                schedule[key] = entry
 
     def get_schedule(self):
         return self.data
 
+    def set_schedule(self, schedule):
+        self.data = schedule
+
     @cached_property
     def connection(self):
         return self.app.broker_connection()
@@ -295,32 +298,22 @@ class PersistentScheduler(Scheduler):
 
     def setup_schedule(self):
         self._store = self.persistence.open(self.schedule_filename,
-                writeback=True)
+                                            writeback=True)
+        if "__version__" not in self._store:
+            self._store.clear()   # remove schedule at 2.2.2 upgrade.
+        entries = self._store.setdefault("entries", {})
         self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
-        self.install_default_entries(self._store)
+        self.install_default_entries(self.schedule)
+        self._store["__version__"] = __version__
         self.sync()
+        self.logger.debug("Current schedule:\n" +
+                          "\n".join(repr(entry)
+                                    for entry in entries.itervalues()))
 
-    def get_schedule(self):
-        return self._store
-
-    def merge_inplace(self, b):
-        A, B = set(self._store.keys()), set(b.keys())
-
-        # Remove items from disk not in the schedule anymore.
-        for key in A ^ B:
-            self._store.pop(key, None)
 
-        # Update and add new items in the schedule
-        for key in B:
-            entry = self.Entry(**dict(b[key]))
-            if self._store.get(key):
-                self._store[key].update(entry)
-            else:
-                self._store[key] = entry
+    def get_schedule(self):
+        return self._store["entries"]
 
-    def update_from_dict(self, dict_):
-        self._store.update(dict((name, self._maybe_entry(name, entry))
-                            for name, entry in dict_.items()))
     def sync(self):
         if self._store is not None:
             self.logger.debug("CeleryBeat: Syncing schedule to disk...")
@@ -339,7 +332,7 @@ class Service(object):
     scheduler_cls = PersistentScheduler
 
     def __init__(self, logger=None,
-            max_interval=None, schedule=None, schedule_filename=None,
+            max_interval=None, schedule_filename=None,
             scheduler_cls=None, app=None):
         self.app = app_or_default(app)
         self.max_interval = max_interval or \
@@ -347,7 +340,6 @@ class Service(object):
         self.scheduler_cls = scheduler_cls or self.scheduler_cls
         self.logger = logger or self.app.log.get_default_logger(
                                                 name="celery.beat")
-        self.schedule = schedule or self.app.conf.CELERYBEAT_SCHEDULE
         self.schedule_filename = schedule_filename or \
                                     self.app.conf.CELERYBEAT_SCHEDULE_FILENAME
 
@@ -397,8 +389,6 @@ class Service(object):
                                 logger=self.logger,
                                 max_interval=self.max_interval,
                                 lazy=lazy)
-        if not lazy:
-            scheduler.update_from_dict(self.schedule)
         return scheduler
 
     @cached_property

+ 8 - 13
celery/tests/test_app/test_beat.py

@@ -233,12 +233,6 @@ class test_Scheduler(unittest.TestCase):
         scheduler.setup_schedule()
         scheduler.close()
 
-    def test_set_schedule(self):
-        scheduler = mScheduler()
-        a, b = scheduler.schedule, {}
-        scheduler.schedule = b
-        self.assertIs(scheduler.schedule, b)
-
     def test_merge_inplace(self):
         a = mScheduler()
         b = mScheduler()
@@ -246,11 +240,11 @@ class test_Scheduler(unittest.TestCase):
                             "bar": {"schedule": mocked_schedule(True, 20)}})
         b.update_from_dict({"bar": {"schedule": mocked_schedule(True, 40)},
                             "baz": {"schedule": mocked_schedule(True, 10)}})
-        a.merge_inplace(b)
+        a.merge_inplace(b.schedule)
 
-        self.assertNotIn("foo", a)
-        self.assertIn("baz", a)
-        self.assertEqual(a["bar"].schedule._next_run_at, 40)
+        self.assertNotIn("foo", a.schedule)
+        self.assertIn("baz", a.schedule)
+        self.assertEqual(a.schedule["bar"].schedule._next_run_at, 40)
 
 
 class test_Service(unittest.TestCase):
@@ -275,10 +269,11 @@ class test_Service(unittest.TestCase):
 
     def test_start(self):
         s, sh = self.get_service()
-        self.assertIsInstance(s.schedule, dict)
+        schedule = s.scheduler.schedule
+        self.assertIsInstance(schedule, dict)
         self.assertIsInstance(s.scheduler, beat.Scheduler)
-        scheduled = s.schedule.keys()
-        for task_name in sh.keys():
+        scheduled = schedule.keys()
+        for task_name in sh["entries"].keys():
             self.assertIn(task_name, scheduled)
 
         s.sync()

+ 4 - 0
celery/tests/test_task/test_task.py

@@ -525,6 +525,10 @@ class TestPeriodicTask(unittest.TestCase):
         self.assertEqual(remaining,
                          p.timedelta_seconds(p.run_every.run_every))
 
+    def test_schedule_repr(self):
+        p = MyPeriodic()
+        self.assertTrue(repr(p.run_every))
+
 
 class EveryMinutePeriodic(task.PeriodicTask):
     run_every = crontab()