ソースを参照

New tests for the new celery.beat (92% coverage)

Ask Solem 14 年 前
コミット
289b1a59a2
3 ファイル変更255 行追加46 行削除
  1. 45 39
      celery/beat.py
  2. 12 0
      celery/schedules.py
  3. 198 7
      celery/tests/test_beat.py

+ 45 - 39
celery/beat.py

@@ -70,16 +70,14 @@ class ScheduleEntry(object):
         self.last_run_at = last_run_at or datetime.now()
         self.total_run_count = total_run_count or 0
 
-    def next(self):
+    def next(self, last_run_at=None):
         """Returns a new instance of the same class, but with
         its date and count fields updated."""
-        return self.__class__(name=self.name,
-                              schedule=self.schedule,
-                              args=self.args,
-                              kwargs=self.kwargs,
-                              options=self.options,
-                              last_run_at=datetime.now(),
-                              total_run_count=self.total_run_count + 1)
+        last_run_at = last_run_at or datetime.now()
+        total_run_count = self.total_run_count + 1
+        return self.__class__(**dict(self,
+                                     last_run_at=last_run_at,
+                                     total_run_count=total_run_count))
 
     def update(self, other):
         """Update values from another entry.
@@ -97,6 +95,9 @@ class ScheduleEntry(object):
         """See :meth:`celery.task.base.PeriodicTask.is_due`."""
         return self.schedule.is_due(self.last_run_at)
 
+    def __iter__(self):
+        return vars(self).iteritems()
+
     def __repr__(self):
         return "<Entry: %s(*%s, **%s) {%s}>" % (self.name,
                                                 self.args,
@@ -130,9 +131,9 @@ class Scheduler(UserDict):
             **kwargs):
         UserDict.__init__(self)
         if schedule is None:
-            schedule = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
+            schedule = {}
         self.data = schedule
-        self.logger = logger or log.get_default_logger("celery.beat")
+        self.logger = logger or log.get_default_logger(name="celery.beat")
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
         self.setup_schedule()
 
@@ -182,13 +183,16 @@ class Scheduler(UserDict):
         entry = self.reserve(entry)
 
         try:
-            result = send_task(entry.name, entry.args, entry.kwargs,
-                               connection=connection, **entry.options)
+            result = self.send_task(entry.name, entry.args, entry.kwargs,
+                                    connection=connection, **entry.options)
         except Exception, exc:
             raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
                     entry.name, exc))
         return result
 
+    def send_task(self, *args, **kwargs): # pragma: no cover
+        return send_task(*args, **kwargs)
+
     def setup_schedule(self):
         pass
 
@@ -198,21 +202,37 @@ class Scheduler(UserDict):
     def close(self):
         self.sync()
 
-    def dict_to_entries(self, dict_):
-        return dict((name, self.Entry(**entry))
-                        for name, entry in dict_.items())
+    def add(self, **kwargs):
+        entry = self.Entry(**kwargs)
+        self[entry.name] = entry
+        return entry
+
+    def update_from_dict(self, dict_):
+        self.update(dict((name, self.Entry(name, **entry))
+                            for name, entry in dict_.items()))
+
+    def merge_inplace(self, b):
+        A, B = set(self.keys()), set(b.keys())
+
+        # Remove items from disk not in the schedule anymore.
+        for key in A ^ B:
+            self.pop(key, None)
+
+        # Update and add new items in the schedule
+        for key in B:
+            entry = self.Entry(**dict(b[key]))
+            if self.get(key):
+                self[key].update(entry)
+            else:
+                self[key] = entry
 
     def get_schedule(self):
         return self.data
 
-    def _set_schedule(self, schedule):
-        self.data = schedule
-
-    def _get_schedule(self):
+    @property
+    def schedule(self):
         return self.get_schedule()
 
-    schedule = property(_get_schedule, _set_schedule)
-
 
 class PersistentScheduler(Scheduler):
     persistence = shelve
@@ -225,24 +245,10 @@ class PersistentScheduler(Scheduler):
 
     def setup_schedule(self):
         self._store = self.persistence.open(self.schedule_filename)
-        self._diskmerge(self._store, conf.CELERYBEAT_SCHEDULE)
+        self.data = self._store
+        self.merge_inplace(conf.CELERYBEAT_SCHEDULE)
         self.sync()
-        self.schedule = self._store
-
-    def _diskmerge(self, a, b):
-        A, B = set(a), set(b)
-
-        # Remove items from disk not in the schedule anymore.
-        for key in A ^ B:
-            a.pop(key, None)
-
-        # Update and add new items in the schedule
-        for key in B:
-            entry = self.Entry(**b[key])
-            if a.get(key):
-                a[key].update(entry)
-            else:
-                a[key] = entry
+        self.data = self._store
 
     def sync(self):
         if self._store is not None:
@@ -310,10 +316,10 @@ class Service(object):
         if self._scheduler is None:
             filename = self.schedule_filename
             self._scheduler = instantiate(self.scheduler_cls,
-                                          schedule=self.schedule,
                                           schedule_filename=filename,
                                           logger=self.logger,
                                           max_interval=self.max_interval)
+            self._scheduler.update_from_dict(self.schedule)
         return self._scheduler
 
 

+ 12 - 0
celery/schedules.py

@@ -30,6 +30,11 @@ class schedule(object):
             return True, timedelta_seconds(self.run_every)
         return False, rem
 
+    def __eq__(self, other):
+        if isinstance(other, schedule):
+            return self.run_every == other.run_every
+        return self.run_every == other
+
 
 class crontab_parser(object):
     """Parser for crontab expressions. Any expression of the form 'groups' (see
@@ -230,6 +235,13 @@ class crontab(schedule):
                    now.minute in self.minute)
         return due, when
 
+    def __eq__(self, other):
+        if isinstance(other, crontab):
+            return (other.day_of_week == self.day_of_week and
+                    other.hour == self.hour and
+                    other.minute == self.minute)
+        return other is self
+
 
 def maybe_schedule(s, relative=False):
     if isinstance(s, int):

+ 198 - 7
celery/tests/test_beat.py

@@ -1,16 +1,12 @@
 import logging
 import unittest2 as unittest
+
 from datetime import datetime, timedelta
 
-from celery import log
 from celery import beat
-from celery import conf
-from celery.task.base import Task
+from celery.result import AsyncResult
 from celery.schedules import schedule
 from celery.utils import gen_unique_id
-from celery.task.base import PeriodicTask
-from celery.registry import TaskRegistry
-from celery.result import AsyncResult
 
 
 class Object(object):
@@ -42,6 +38,194 @@ class MockService(object):
         self.stopped = True
 
 
+class test_ScheduleEntry(unittest.TestCase):
+    Entry = beat.ScheduleEntry
+
+    def create_entry(self, **kwargs):
+        entry = dict(name="celery.unittest.add",
+                     schedule=schedule(timedelta(seconds=10)),
+                     args=(2, 2),
+                     options={"routing_key": "cpu"})
+        return self.Entry(**dict(entry, **kwargs))
+
+    def test_next(self):
+        entry = self.create_entry(schedule=10)
+        self.assertTrue(entry.last_run_at)
+        self.assertIsInstance(entry.last_run_at, datetime)
+        self.assertEqual(entry.total_run_count, 0)
+
+        next_run_at = entry.last_run_at + timedelta(seconds=10)
+        next = entry.next(next_run_at)
+        self.assertGreaterEqual(next.last_run_at, next_run_at)
+        self.assertEqual(next.total_run_count, 1)
+
+    def test_is_due(self):
+        entry = self.create_entry(schedule=timedelta(seconds=10))
+        due1, next_time_to_run1 = entry.is_due()
+        self.assertFalse(due1)
+        self.assertGreater(next_time_to_run1, 9)
+
+        next_run_at = entry.last_run_at - timedelta(seconds=10)
+        next = entry.next(next_run_at)
+        due2, next_time_to_run2 = next.is_due()
+        self.assertTrue(due2)
+        self.assertGreater(next_time_to_run2, 9)
+
+    def test_repr(self):
+        entry = self.create_entry()
+        self.assertIn("<Entry:", repr(entry))
+
+    def test_update(self):
+        entry = self.create_entry()
+        self.assertEqual(entry.schedule, timedelta(seconds=10))
+        self.assertTupleEqual(entry.args, (2, 2))
+        self.assertDictEqual(entry.kwargs, {})
+        self.assertDictEqual(entry.options, {"routing_key": "cpu"})
+
+        entry2 = self.create_entry(schedule=timedelta(minutes=20),
+                                   args=(16, 16),
+                                   kwargs={"callback": "foo.bar.baz"},
+                                   options={"routing_key": "urgent"})
+        entry.update(entry2)
+        self.assertEqual(entry.schedule, schedule(timedelta(minutes=20)))
+        self.assertTupleEqual(entry.args, (16, 16))
+        self.assertDictEqual(entry.kwargs, {"callback": "foo.bar.baz"})
+        self.assertDictEqual(entry.options, {"routing_key": "urgent"})
+
+
+class MockLogger(logging.Logger):
+
+    def __init__(self, *args, **kwargs):
+        self.logged = []
+        logging.Logger.__init__(self, *args, **kwargs)
+
+    def _log(self, level, msg, args, **kwargs):
+        self.logged.append((level, msg))
+
+
+class mScheduler(beat.Scheduler):
+
+    def __init__(self, *args, **kwargs):
+        self.sent = []
+        beat.Scheduler.__init__(self, *args, **kwargs)
+        self.logger = MockLogger("celery.beat", logging.ERROR)
+
+    def send_task(self, name=None, args=None, kwargs=None, **options):
+        self.sent.append({"name": name,
+                          "args": args,
+                          "kwargs": kwargs,
+                          "options": options})
+        return AsyncResult(gen_unique_id())
+
+
+class mSchedulerSchedulingError(mScheduler):
+
+    def send_task(self, *args, **kwargs):
+        raise beat.SchedulingError("Could not apply task")
+
+
+class mSchedulerRuntimeError(mScheduler):
+
+    def maybe_due(self, *args, **kwargs):
+        raise RuntimeError("dict modified while itervalues")
+
+
+class mocked_schedule(schedule):
+
+    def __init__(self, is_due, next_run_at):
+        self._is_due = is_due
+        self._next_run_at = next_run_at
+        self.run_every = timedelta(seconds=1)
+
+    def is_due(self, last_run_at):
+        return self._is_due, self._next_run_at
+
+
+always_due = mocked_schedule(True, 1)
+always_pending = mocked_schedule(False, 1)
+
+
+class test_Scheduler(unittest.TestCase):
+
+    def test_due_tick(self):
+        scheduler = mScheduler()
+        scheduler.add(name="test_due_tick",
+                      schedule=always_due,
+                      args=(1, 2),
+                      kwargs={"foo": "bar"})
+        self.assertEqual(scheduler.tick(), 1)
+
+    def test_due_tick_SchedulingError(self):
+        scheduler = mSchedulerSchedulingError()
+        scheduler.add(name="test_due_tick_SchedulingError",
+                      schedule=always_due)
+        self.assertEqual(scheduler.tick(), 1)
+        self.assertTrue(scheduler.logger.logged[0])
+        level, msg = scheduler.logger.logged[0]
+        self.assertEqual(level, logging.ERROR)
+        self.assertIn("Couldn't apply scheduled task", msg)
+
+    def test_due_tick_RuntimeError(self):
+        scheduler = mSchedulerRuntimeError()
+        scheduler.add(name="test_due_tick_RuntimeError",
+                      schedule=always_due)
+        self.assertEqual(scheduler.tick(), scheduler.max_interval)
+
+    def test_pending_tick(self):
+        scheduler = mScheduler()
+        scheduler.add(name="test_pending_tick",
+                      schedule=always_pending)
+        self.assertEqual(scheduler.tick(), 1)
+
+    def test_honors_max_interval(self):
+        scheduler = mScheduler()
+        maxi = scheduler.max_interval
+        scheduler.add(name="test_honors_max_interval",
+                      schedule=mocked_schedule(False, maxi * 4))
+        self.assertEqual(scheduler.tick(), maxi)
+
+    def test_ticks(self):
+        scheduler = mScheduler()
+        nums = [600, 300, 650, 120, 250, 36]
+        expected = min(nums, scheduler.max_interval)
+
+        s = dict(("test_ticks%s" % i,
+                 {"schedule": mocked_schedule(False, j)})
+                    for i, j in enumerate(nums))
+        scheduler.update_from_dict(s)
+        self.assertEqual(scheduler.tick(), min(nums))
+
+    def test_schedule_no_remain(self):
+        scheduler = mScheduler()
+        scheduler.add(name="test_schedule_no_remain",
+                      schedule=mocked_schedule(False, None))
+        self.assertEqual(scheduler.tick(), scheduler.max_interval)
+
+    def test_interface(self):
+        scheduler = mScheduler()
+        scheduler.sync()
+        scheduler.setup_schedule()
+        scheduler.close()
+
+    def test_set_schedule(self):
+        scheduler = mScheduler()
+        a, b = scheduler.schedule, {}
+        scheduler.schedule = b
+        self.assertIs(scheduler.schedule, b)
+
+    def test_merge_inplace(self):
+        a = mScheduler()
+        b = mScheduler()
+        a.update_from_dict({"foo": {"schedule": mocked_schedule(True, 10)},
+                            "bar": {"schedule": mocked_schedule(True, 20)}})
+        b.update_from_dict({"bar": {"schedule": mocked_schedule(True, 40)},
+                            "baz": {"schedule": mocked_schedule(True, 10)}})
+        a.merge_inplace(b)
+
+        self.assertNotIn("foo", a)
+        self.assertIn("baz", a)
+        self.assertEqual(a["bar"].schedule._next_run_at, 40)
+
 
 class test_Service(unittest.TestCase):
 
@@ -67,8 +251,15 @@ class test_Service(unittest.TestCase):
         s.stop(wait=True)
         self.assertTrue(s._shutdown.isSet())
 
+        p = s.scheduler._store
+        s.scheduler._store = None
+        try:
+            s.scheduler.sync()
+        finally:
+            s.scheduler._store = p
+
 
-class TestEmbeddedService(unittest.TestCase):
+class test_EmbeddedService(unittest.TestCase):
 
     def test_start_stop_process(self):
         s = beat.EmbeddedService()