test_worker_scheduler.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import unittest
  2. from Queue import Queue, Empty
  3. from datetime import datetime, timedelta
  4. from celery.worker.scheduler import Scheduler
  5. class TestScheduler(unittest.TestCase):
  6. def test_sched_and_run_now(self):
  7. ready_queue = Queue()
  8. sched = Scheduler(ready_queue)
  9. now = datetime.now()
  10. callback_called = [False]
  11. def callback():
  12. callback_called[0] = True
  13. sched.enter("foo", eta=now, callback=callback)
  14. remaining = iter(sched).next()
  15. self.assertEquals(remaining, 0)
  16. self.assertTrue(callback_called[0])
  17. self.assertEquals(ready_queue.get_nowait(), "foo")
  18. def test_sched_run_later(self):
  19. ready_queue = Queue()
  20. sched = Scheduler(ready_queue)
  21. now = datetime.now()
  22. callback_called = [False]
  23. def callback():
  24. callback_called[0] = True
  25. eta = now + timedelta(seconds=10)
  26. sched.enter("foo", eta=eta, callback=callback)
  27. remaining = iter(sched).next()
  28. self.assertTrue(remaining > 7)
  29. self.assertFalse(callback_called[0])
  30. self.assertRaises(Empty, ready_queue.get_nowait)
  31. def test_empty_queue_yields_None(self):
  32. ready_queue = Queue()
  33. sched = Scheduler(ready_queue)
  34. self.assertTrue(iter(sched).next() is None)