Browse Source

Removed some bad tests for celery.beat. Need to write new ones.

Ask Solem 14 years ago
parent
commit
78779b7070
2 changed files with 36 additions and 168 deletions
  1. 11 12
      celery/beat.py
  2. 25 156
      celery/tests/test_beat.py

+ 11 - 12
celery/beat.py

@@ -68,9 +68,8 @@ class ScheduleEntry(object):
 
     """
 
-    def __init__(self, name, schedule, args=(), kwargs={},
-            options={}, last_run_at=None, total_run_count=None,
-            relative=False):
+    def __init__(self, name, last_run_at=None, total_run_count=None,
+            schedule=None, args=(), kwargs={}, options={}, relative=False):
         self.name = name
         self.schedule = maybe_schedule(schedule, relative)
         self.args = args
@@ -82,13 +81,13 @@ class ScheduleEntry(object):
     def next(self):
         """Returns a new instance of the same class, but with
         its date and count fields updated."""
-        return self.__class__(self.name,
-                              self.schedule,
-                              self.args,
-                              self.kwargs,
-                              self.options,
-                              datetime.now(),
-                              self.total_run_count + 1)
+        return self.__class__(name=self.name,
+                              schedule=self.schedule,
+                              args=self.args,
+                              kwargs=self.kwargs,
+                              options=self.options,
+                              last_run_at=datetime.now(),
+                              total_run_count=self.total_run_count + 1)
 
     def update(self, other):
         """Update values from another entry.
@@ -184,7 +183,7 @@ class Scheduler(UserDict):
         new_entry = self[entry.name] = entry.next()
         return new_entry
 
-    def apply_async(self, entry, **kwargs):
+    def apply_async(self, entry, connection=None, **kwargs):
         # Update timestamps and run counts before we actually execute,
         # so we have that done if an exception is raised (doesn't schedule
         # forever.)
@@ -192,7 +191,7 @@ class Scheduler(UserDict):
 
         try:
             result = send_task(entry.name, entry.args, entry.kwargs,
-                               **entry.options)
+                               connection=connection, **entry.options)
         except Exception, exc:
             raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
                     entry.name, exc))

+ 25 - 156
celery/tests/test_beat.py

@@ -5,12 +5,18 @@ from datetime import datetime, timedelta
 from celery import log
 from celery import beat
 from celery import conf
+from celery.task.base import Task
+from celery.schedules import schedule
 from celery.utils import gen_unique_id
 from celery.task.base import PeriodicTask
 from celery.registry import TaskRegistry
 from celery.result import AsyncResult
 
 
+class Object(object):
+    pass
+
+
 class MockShelve(dict):
     closed = False
     synced = False
@@ -22,7 +28,7 @@ class MockShelve(dict):
         self.synced = True
 
 
-class MockClockService(object):
+class MockService(object):
     started = False
     stopped = False
 
@@ -36,177 +42,40 @@ class MockClockService(object):
         self.stopped = True
 
 
-class DuePeriodicTask(PeriodicTask):
-    run_every = timedelta(seconds=1)
-    applied = False
-
-    def is_due(self, *args, **kwargs):
-        return True, 100
-
-    @classmethod
-    def apply_async(self, *args, **kwargs):
-        self.applied = True
-        return AsyncResult(gen_unique_id())
-
-
-class DuePeriodicTaskRaising(PeriodicTask):
-    run_every = timedelta(seconds=1)
-    applied = False
-
-    def is_due(self, *args, **kwargs):
-        return True, 0
-
-    @classmethod
-    def apply_async(self, *args, **kwargs):
-        raise Exception("FoozBaaz")
-
-
-class PendingPeriodicTask(PeriodicTask):
-    run_every = timedelta(seconds=1)
-    applied = False
-
-    def is_due(self, *args, **kwargs):
-        return False, 100
-
-    @classmethod
-    def apply_async(self, *args, **kwargs):
-        self.applied = True
-        return AsyncResult(gen_unique_id())
-
-
-class AdditionalTask(PeriodicTask):
-    run_every = timedelta(days=7)
-
-    @classmethod
-    def apply_async(self, *args, **kwargs):
-        raise Exception("FoozBaaz")
-
 
-class TestScheduleEntry(unittest.TestCase):
-
-    def test_constructor(self):
-        s = beat.ScheduleEntry(DuePeriodicTask.name)
-        self.assertEqual(s.name, DuePeriodicTask.name)
-        self.assertIsInstance(s.last_run_at, datetime)
-        self.assertEqual(s.total_run_count, 0)
-
-        now = datetime.now()
-        s = beat.ScheduleEntry(DuePeriodicTask.name, now, 300)
-        self.assertEqual(s.name, DuePeriodicTask.name)
-        self.assertEqual(s.last_run_at, now)
-        self.assertEqual(s.total_run_count, 300)
-
-    def test_next(self):
-        s = beat.ScheduleEntry(DuePeriodicTask.name, None, 300)
-        n = s.next()
-        self.assertEqual(n.name, s.name)
-        self.assertEqual(n.total_run_count, 301)
-        self.assertGreater(n.last_run_at, s.last_run_at)
-
-    def test_is_due(self):
-        due = beat.ScheduleEntry(DuePeriodicTask.name)
-        pending = beat.ScheduleEntry(PendingPeriodicTask.name)
-
-        self.assertTrue(due.is_due(DuePeriodicTask())[0])
-        self.assertFalse(pending.is_due(PendingPeriodicTask())[0])
-
-
-class TestScheduler(unittest.TestCase):
-
-    def setUp(self):
-        self.registry = TaskRegistry()
-        self.registry.register(DuePeriodicTask)
-        self.registry.register(PendingPeriodicTask)
-        self.scheduler = beat.Scheduler(self.registry,
-                                        max_interval=0.0001,
-                                        logger=log.get_default_logger())
-
-    def test_constructor(self):
-        s = beat.Scheduler()
-        self.assertIsInstance(s.registry, TaskRegistry)
-        self.assertIsInstance(s.schedule, dict)
-        self.assertIsInstance(s.logger, logging.Logger)
-        self.assertEqual(s.max_interval, conf.CELERYBEAT_MAX_LOOP_INTERVAL)
-
-    def test_cleanup(self):
-        self.scheduler.schedule["fbz"] = beat.ScheduleEntry("fbz")
-        self.scheduler.cleanup()
-        self.assertNotIn("fbz", self.scheduler.schedule)
-
-    def test_schedule_registry(self):
-        self.registry.register(AdditionalTask)
-        self.scheduler.schedule_registry()
-        self.assertIn(AdditionalTask.name, self.scheduler.schedule)
-
-    def test_apply_async(self):
-        due_task = self.registry[DuePeriodicTask.name]
-        self.scheduler.apply_async(self.scheduler[due_task.name])
-        self.assertTrue(due_task.applied)
-
-    def test_apply_async_raises_SchedulingError_on_error(self):
-        self.registry.register(AdditionalTask)
-        self.scheduler.schedule_registry()
-        add_task = self.registry[AdditionalTask.name]
-        self.assertRaises(beat.SchedulingError,
-                          self.scheduler.apply_async,
-                          self.scheduler[add_task.name])
-
-    def test_is_due(self):
-        due = self.scheduler[DuePeriodicTask.name]
-        pending = self.scheduler[PendingPeriodicTask.name]
-
-        self.assertTrue(self.scheduler.is_due(due)[0])
-        self.assertFalse(self.scheduler.is_due(pending)[0])
-
-    def test_tick(self):
-        self.scheduler.schedule.pop(DuePeriodicTaskRaising.name, None)
-        self.registry.pop(DuePeriodicTaskRaising.name, None)
-        self.assertEqual(self.scheduler.tick(),
-                            self.scheduler.max_interval)
-
-    def test_quick_schedulingerror(self):
-        self.registry.register(DuePeriodicTaskRaising)
-        self.scheduler.schedule_registry()
-        self.assertEqual(self.scheduler.tick(),
-                            self.scheduler.max_interval)
-
-
-class TestClockService(unittest.TestCase):
+class test_Service(unittest.TestCase):
 
     def test_start(self):
-        s = beat.ClockService()
         sh = MockShelve()
-        s.open_schedule = lambda *a, **kw: sh
 
-        self.assertIsInstance(s.schedule, dict)
+        class PersistentScheduler(beat.PersistentScheduler):
+            persistence = Object()
+            persistence.open = lambda *a, **kw: sh
+
+        s = beat.Service(scheduler_cls=PersistentScheduler)
         self.assertIsInstance(s.schedule, dict)
         self.assertIsInstance(s.scheduler, beat.Scheduler)
-        self.assertIsInstance(s.scheduler, beat.Scheduler)
-
-        self.assertIs(s.schedule, sh)
-        self.assertIs(s._schedule, sh)
+        self.assertListEqual(s.schedule.keys(), sh.keys())
 
-        s._in_sync = False
         s.sync()
         self.assertTrue(sh.closed)
         self.assertTrue(sh.synced)
         self.assertTrue(s._stopped.isSet())
         s.sync()
-
         s.stop(wait=False)
         self.assertTrue(s._shutdown.isSet())
         s.stop(wait=True)
         self.assertTrue(s._shutdown.isSet())
 
 
-class TestEmbeddedClockService(unittest.TestCase):
+class TestEmbeddedService(unittest.TestCase):
 
     def test_start_stop_process(self):
-        s = beat.EmbeddedClockService()
+        s = beat.EmbeddedService()
         from multiprocessing import Process
         self.assertIsInstance(s, Process)
-        self.assertIsInstance(s.clockservice, beat.ClockService)
-        s.clockservice = MockClockService()
+        self.assertIsInstance(s.service, beat.Service)
+        s.service = MockService()
 
         class _Popen(object):
             terminated = False
@@ -215,22 +84,22 @@ class TestEmbeddedClockService(unittest.TestCase):
                 self.terminated = True
 
         s.run()
-        self.assertTrue(s.clockservice.started)
+        self.assertTrue(s.service.started)
 
         s._popen = _Popen()
         s.stop()
-        self.assertTrue(s.clockservice.stopped)
+        self.assertTrue(s.service.stopped)
         self.assertTrue(s._popen.terminated)
 
     def test_start_stop_threaded(self):
-        s = beat.EmbeddedClockService(thread=True)
+        s = beat.EmbeddedService(thread=True)
         from threading import Thread
         self.assertIsInstance(s, Thread)
-        self.assertIsInstance(s.clockservice, beat.ClockService)
-        s.clockservice = MockClockService()
+        self.assertIsInstance(s.service, beat.Service)
+        s.service = MockService()
 
         s.run()
-        self.assertTrue(s.clockservice.started)
+        self.assertTrue(s.service.started)
 
         s.stop()
-        self.assertTrue(s.clockservice.stopped)
+        self.assertTrue(s.service.stopped)