test_worker_controllers.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import unittest
  2. import time
  3. import multiprocessing
  4. from Queue import Queue, Empty
  5. from datetime import datetime, timedelta
  6. from celery.worker.controllers import Mediator, PeriodicWorkController
  7. class TestMediator(unittest.TestCase):
  8. def test_mediator_start__stop(self):
  9. bucket_queue = Queue()
  10. m = Mediator(bucket_queue, lambda t: t)
  11. m.start()
  12. self.assertFalse(m._shutdown.isSet())
  13. self.assertFalse(m._stopped.isSet())
  14. m.stop()
  15. m.join()
  16. self.assertTrue(m._shutdown.isSet())
  17. self.assertTrue(m._stopped.isSet())
  18. class TestPeriodicWorkController(unittest.TestCase):
  19. def test_process_hold_queue(self):
  20. bucket_queue = Queue()
  21. hold_queue = Queue()
  22. m = PeriodicWorkController(bucket_queue, hold_queue)
  23. m.process_hold_queue()
  24. hold_queue.put(("task1", datetime.now() - timedelta(days=1)))
  25. m.process_hold_queue()
  26. self.assertRaises(Empty, hold_queue.get_nowait)
  27. self.assertEquals(bucket_queue.get_nowait(), "task1")
  28. tomorrow = datetime.now() + timedelta(days=1)
  29. hold_queue.put(("task2", tomorrow))
  30. m.process_hold_queue()
  31. self.assertRaises(Empty, bucket_queue.get_nowait)
  32. self.assertEquals(hold_queue.get_nowait(), ("task2", tomorrow))