123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- from __future__ import generators
- import unittest
- from Queue import Queue, Empty
- from datetime import datetime, timedelta
- from celery.worker.scheduler import Scheduler
- class MockItem(object):
- def __init__(self, value):
- self.task_id = value
- class TestScheduler(unittest.TestCase):
- def test_sched_and_run_now(self):
- ready_queue = Queue()
- sched = Scheduler(ready_queue)
- now = datetime.now()
- callback_called = [False]
- def callback():
- callback_called[0] = True
- sched.enter(MockItem("foo"), eta=now, callback=callback)
- remaining = iter(sched).next()
- self.assertEquals(remaining, 0)
- self.assertTrue(callback_called[0])
- self.assertEquals(ready_queue.get_nowait().task_id, "foo")
- def test_sched_run_later(self):
- ready_queue = Queue()
- sched = Scheduler(ready_queue)
- now = datetime.now()
- callback_called = [False]
- def callback():
- callback_called[0] = True
- eta = now + timedelta(seconds=10)
- sched.enter(MockItem("foo"), eta=eta, callback=callback)
- remaining = iter(sched).next()
- self.assertTrue(remaining > 7 or remaining == sched.max_interval)
- self.assertFalse(callback_called[0])
- self.assertRaises(Empty, ready_queue.get_nowait)
- def test_empty_queue_yields_None(self):
- ready_queue = Queue()
- sched = Scheduler(ready_queue)
- self.assertTrue(iter(sched).next() is None)
|