Browse Source

All tests passing again

Ask Solem 14 years ago
parent
commit
7ddf622a5b
5 changed files with 44 additions and 40 deletions
  1. 2 10
      celery/beat.py
  2. 9 1
      celery/schedules.py
  3. 7 3
      celery/task/base.py
  4. 23 23
      celery/tests/test_bin/test_celerybeat.py
  5. 3 3
      celery/tests/test_worker.py

+ 2 - 10
celery/beat.py

@@ -7,14 +7,14 @@ import time
 import shelve
 import threading
 import multiprocessing
-from datetime import datetime, timedelta
+from datetime import datetime
 from UserDict import UserDict
 
 from celery import log
 from celery import conf
 from celery import platform
 from celery.execute import send_task
-from celery.schedules import schedule
+from celery.schedules import maybe_schedule
 from celery.messaging import establish_connection
 from celery.utils import instantiate
 from celery.utils.info import humanize_seconds
@@ -24,14 +24,6 @@ class SchedulingError(Exception):
     """An error occured while scheduling a task."""
 
 
-def maybe_schedule(s, relative=False):
-    if isinstance(s, int):
-        s = timedelta(seconds=s)
-    if isinstance(s, timedelta):
-        return schedule(s, relative)
-    return s
-
-
 class ScheduleEntry(object):
     """An entry in the scheduler.
 

+ 9 - 1
celery/schedules.py

@@ -1,4 +1,4 @@
-from datetime import datetime
+from datetime import datetime, timedelta
 from pyparsing import (Word, Literal, ZeroOrMore, Optional,
                        Group, StringEnd, alphas)
 
@@ -229,3 +229,11 @@ class crontab(schedule):
                    now.hour in self.hour and
                    now.minute in self.minute)
         return due, when
+
+
+def maybe_schedule(s, relative=False):
+    if isinstance(s, int):
+        s = timedelta(seconds=s)
+    if isinstance(s, timedelta):
+        return schedule(s, relative)
+    return s

+ 7 - 3
celery/task/base.py

@@ -12,7 +12,7 @@ from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import establish_connection as _establish_connection
 from celery.registry import tasks
 from celery.result import BaseAsyncResult, EagerResult
-from celery.schedules import schedule
+from celery.schedules import maybe_schedule
 from celery.utils.timeutils import timedelta_seconds
 
 from celery.task.sets import TaskSet, subtask
@@ -616,9 +616,13 @@ class PeriodicTask(Task):
         if not hasattr(self, "run_every"):
             raise NotImplementedError(
                     "Periodic tasks must have a run_every attribute")
+        self.run_every = maybe_schedule(self.run_every, self.relative)
 
-        warnings.warn(PERIODIC_DEPRECATION_TEXT,
-                        PendingDeprecationWarning)
+        # Periodic task classes is pending deprecation.
+        warnings.warn(PendingDeprecationWarning(PERIODIC_DEPRECATION_TEXT))
+
+        # For backward compatibility, add the periodic task to the
+        # configuration schedule instead.
         conf.CELERYBEAT_SCHEDULE[self.name] = {
                 "name": self.name,
                 "schedule": self.run_every,

+ 23 - 23
celery/tests/test_bin/test_celerybeat.py

@@ -2,12 +2,12 @@ import logging
 import sys
 import unittest2 as unittest
 
+from celery import beat
 from celery import platform
-from celery.beat import ClockService
-from celery.bin import celerybeat as beat
+from celery.bin import celerybeat as celerybeat
 
 
-class MockClockService(ClockService):
+class MockService(beat.Service):
     started = False
     in_sync = False
 
@@ -18,15 +18,15 @@ class MockClockService(ClockService):
         self.__class__.in_sync = True
 
 
-class MockBeat(beat.Beat):
+class MockBeat(celerybeat.Beat):
     running = False
 
     def run(self):
         self.__class__.running = True
 
 
-class MockBeat2(beat.Beat):
-    ClockService = MockClockService
+class MockBeat2(celerybeat.Beat):
+    Service = MockService
 
     def install_sync_handler(self, b):
         pass
@@ -35,29 +35,29 @@ class MockBeat2(beat.Beat):
 class test_Beat(unittest.TestCase):
 
     def test_loglevel_string(self):
-        b = beat.Beat(loglevel="DEBUG")
+        b = celerybeat.Beat(loglevel="DEBUG")
         self.assertEqual(b.loglevel, logging.DEBUG)
 
-        b2 = beat.Beat(loglevel=logging.DEBUG)
+        b2 = celerybeat.Beat(loglevel=logging.DEBUG)
         self.assertEqual(b2.loglevel, logging.DEBUG)
 
     def test_init_loader(self):
-        b = beat.Beat()
+        b = celerybeat.Beat()
         b.init_loader()
 
     def test_startup_info(self):
-        b = beat.Beat()
+        b = celerybeat.Beat()
         self.assertIn("@stderr", b.startup_info())
 
     def test_process_title(self):
-        b = beat.Beat()
+        b = celerybeat.Beat()
         b.set_process_title()
 
     def test_run(self):
         b = MockBeat2()
-        MockClockService.started = False
+        MockService.started = False
         b.run()
-        self.assertTrue(MockClockService.started)
+        self.assertTrue(MockService.started)
 
     def psig(self, fun, *args, **kwargs):
         handlers = {}
@@ -73,39 +73,39 @@ class test_Beat(unittest.TestCase):
             platform.install_signal_handler = p
 
     def test_install_sync_handler(self):
-        b = beat.Beat()
-        clock = MockClockService()
-        MockClockService.in_sync = False
+        b = celerybeat.Beat()
+        clock = MockService()
+        MockService.in_sync = False
         handlers = self.psig(b.install_sync_handler, clock)
         self.assertRaises(SystemExit, handlers["SIGINT"],
                           "SIGINT", object())
-        self.assertTrue(MockClockService.in_sync)
-        MockClockService.in_sync = False
+        self.assertTrue(MockService.in_sync)
+        MockService.in_sync = False
 
 
 class test_div(unittest.TestCase):
 
     def setUp(self):
-        self.prev, beat.Beat = beat.Beat, MockBeat
+        self.prev, celerybeat.Beat = celerybeat.Beat, MockBeat
 
     def tearDown(self):
-        beat.Beat = self.prev
+        celerybeat.Beat = self.prev
 
     def test_main(self):
         sys.argv = [sys.argv[0], "-s", "foo"]
         try:
-            beat.main()
+            celerybeat.main()
             self.assertTrue(MockBeat.running)
         finally:
             MockBeat.running = False
 
     def test_run_celerybeat(self):
         try:
-            beat.run_celerybeat()
+            celerybeat.run_celerybeat()
             self.assertTrue(MockBeat.running)
         finally:
             MockBeat.running = False
 
     def test_parse_options(self):
-        options = beat.parse_options(["-s", "foo"])
+        options = celerybeat.parse_options(["-s", "foo"])
         self.assertEqual(options.schedule, "foo")

+ 3 - 3
celery/tests/test_worker.py

@@ -473,11 +473,11 @@ class test_WorkController(unittest.TestCase):
         self.assertTrue(worker.mediator)
         self.assertTrue(worker.components)
 
-    def test_with_embedded_clockservice(self):
+    def test_with_embedded_celerybeat(self):
         worker = WorkController(concurrency=1, loglevel=0,
                                 embed_clockservice=True)
-        self.assertTrue(worker.clockservice)
-        self.assertIn(worker.clockservice, worker.components)
+        self.assertTrue(worker.beat)
+        self.assertIn(worker.beat, worker.components)
 
     def test_process_task(self):
         worker = self.worker