Browse Source

91% coverage for celery.worker.scheduler

Ask Solem 15 years ago
parent
commit
1048eb69a5
2 changed files with 48 additions and 5 deletions
  1. 43 4
      celery/tests/test_worker_scheduler.py
  2. 5 1
      celery/worker/scheduler.py

+ 43 - 4
celery/tests/test_worker_scheduler.py

@@ -1,23 +1,24 @@
 from __future__ import generators
 
 import unittest2 as unittest
-from Queue import Queue, Empty
+
 from datetime import datetime, timedelta
+from Queue import Queue, Empty
 
 from celery.worker.scheduler import Scheduler
 
 
 class MockItem(object):
-    is_revoked = False
 
-    def __init__(self, value):
+    def __init__(self, value, revoked=False):
         self.task_id = value
+        self.is_revoked = revoked
 
     def revoked(self):
         return self.is_revoked
 
 
-class TestScheduler(unittest.TestCase):
+class test_Scheduler(unittest.TestCase):
 
     def test_sched_and_run_now(self):
         ready_queue = Queue()
@@ -35,6 +36,44 @@ class TestScheduler(unittest.TestCase):
         self.assertTrue(callback_called[0])
         self.assertEqual(ready_queue.get_nowait().task_id, "foo")
 
+    def test_sched_revoked(self):
+        ready_queue = Queue()
+        sched = Scheduler(ready_queue)
+        now = datetime.now()
+
+        callback_called = [False]
+        def callback():
+            callback_called[0] = True
+
+        sched.enter(MockItem("foo", revoked=True), eta=now, callback=callback)
+        remaining = iter(sched).next()
+        self.assertFalse(callback_called[0])
+        self.assertRaises(Empty, ready_queue.get_nowait)
+        self.assertFalse(sched.queue)
+        sched.clear()
+
+    def test_sched_clear(self):
+        ready_queue = Queue()
+        sched = Scheduler(ready_queue)
+        sched.enter(MockItem("foo"), eta=datetime.now(), callback=None)
+        self.assertFalse(sched.empty())
+        sched.clear()
+        self.assertTrue(sched.empty())
+
+    def test_sched_info(self):
+        ready_queue = Queue()
+        sched = Scheduler(ready_queue)
+        item = MockItem("foo")
+        sched.enter(item, eta=10, callback=None)
+        self.assertDictEqual({"eta": 10, "priority": 0,
+                              "item": item}, sched.info().next())
+
+    def test_sched_queue(self):
+        ready_queue = Queue()
+        sched = Scheduler(ready_queue)
+        sched.enter(MockItem("foo"), eta=datetime.now(), callback=None)
+        self.assertTrue(sched.queue)
+
     def test_sched_run_later(self):
         ready_queue = Queue()
         sched = Scheduler(ready_queue)

+ 5 - 1
celery/worker/scheduler.py

@@ -3,6 +3,8 @@ from __future__ import generators
 import time
 import heapq
 
+from datetime import datetime
+
 from celery import log
 
 DEFAULT_MAX_INTERVAL = 2
@@ -34,7 +36,9 @@ class Scheduler(object):
             This callback takes no arguments.
 
         """
-        eta = eta and time.mktime(eta.timetuple()) or time.time()
+        if isinstance(eta, datetime):
+            eta = time.mktime(eta.timetuple())
+        eta = eta or time.time()
         heapq.heappush(self._queue, (eta, priority, item, callback))
 
     def __iter__(self):