test_worker_scheduler.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from __future__ import generators
  2. import unittest
  3. from Queue import Queue, Empty
  4. from datetime import datetime, timedelta
  5. from celery.worker.scheduler import Scheduler
  6. class MockItem(object):
  7. def __init__(self, value):
  8. self.task_id = value
  9. class TestScheduler(unittest.TestCase):
  10. def test_sched_and_run_now(self):
  11. ready_queue = Queue()
  12. sched = Scheduler(ready_queue)
  13. now = datetime.now()
  14. callback_called = [False]
  15. def callback():
  16. callback_called[0] = True
  17. sched.enter(MockItem("foo"), eta=now, callback=callback)
  18. remaining = iter(sched).next()
  19. self.assertEquals(remaining, 0)
  20. self.assertTrue(callback_called[0])
  21. self.assertEquals(ready_queue.get_nowait().task_id, "foo")
  22. def test_sched_run_later(self):
  23. ready_queue = Queue()
  24. sched = Scheduler(ready_queue)
  25. now = datetime.now()
  26. callback_called = [False]
  27. def callback():
  28. callback_called[0] = True
  29. eta = now + timedelta(seconds=10)
  30. sched.enter(MockItem("foo"), eta=eta, callback=callback)
  31. remaining = iter(sched).next()
  32. self.assertTrue(remaining > 7 or remaining == sched.max_interval)
  33. self.assertFalse(callback_called[0])
  34. self.assertRaises(Empty, ready_queue.get_nowait)
  35. def test_empty_queue_yields_None(self):
  36. ready_queue = Queue()
  37. sched = Scheduler(ready_queue)
  38. self.assertTrue(iter(sched).next() is None)