test_worker_controllers.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import unittest
  2. import time
  3. import multiprocessing
  4. from Queue import Queue, Empty
  5. from datetime import datetime, timedelta
  6. from celery.worker.controllers import Mediator, PeriodicWorkController
  7. from celery.worker.controllers import InfinityThread
  8. class MyInfinityThread(InfinityThread):
  9. def on_iteration(self):
  10. import time
  11. time.sleep(1)
  12. class TestInfinityThread(unittest.TestCase):
  13. def test_on_iteration(self):
  14. self.assertRaises(NotImplementedError, InfinityThread().on_iteration)
  15. def test_run(self):
  16. t = MyInfinityThread()
  17. t._shutdown.set()
  18. t.run()
  19. self.assertTrue(t._stopped.isSet())
  20. def test_start_stop(self):
  21. t = MyInfinityThread()
  22. t.start()
  23. self.assertFalse(t._shutdown.isSet())
  24. self.assertFalse(t._stopped.isSet())
  25. t.stop()
  26. self.assertTrue(t._shutdown.isSet())
  27. self.assertTrue(t._stopped.isSet())
  28. class TestMediator(unittest.TestCase):
  29. def test_mediator_start__stop(self):
  30. bucket_queue = Queue()
  31. m = Mediator(bucket_queue, lambda t: t)
  32. m.start()
  33. self.assertFalse(m._shutdown.isSet())
  34. self.assertFalse(m._stopped.isSet())
  35. m.stop()
  36. m.join()
  37. self.assertTrue(m._shutdown.isSet())
  38. self.assertTrue(m._stopped.isSet())
  39. def test_mediator_on_iteration(self):
  40. bucket_queue = Queue()
  41. got = {}
  42. def mycallback(value):
  43. got["value"] = value
  44. m = Mediator(bucket_queue, mycallback)
  45. bucket_queue.put("George Constanza")
  46. m.on_iteration()
  47. self.assertEquals(got["value"], "George Constanza")
  48. class TestPeriodicWorkController(unittest.TestCase):
  49. def test_process_hold_queue(self):
  50. bucket_queue = Queue()
  51. hold_queue = Queue()
  52. m = PeriodicWorkController(bucket_queue, hold_queue)
  53. m.process_hold_queue()
  54. hold_queue.put(("task1", datetime.now() - timedelta(days=1)))
  55. m.process_hold_queue()
  56. self.assertRaises(Empty, hold_queue.get_nowait)
  57. self.assertEquals(bucket_queue.get_nowait(), "task1")
  58. tomorrow = datetime.now() + timedelta(days=1)
  59. hold_queue.put(("task2", tomorrow))
  60. m.process_hold_queue()
  61. self.assertRaises(Empty, bucket_queue.get_nowait)
  62. self.assertEquals(hold_queue.get_nowait(), ("task2", tomorrow))
  63. def test_run_periodic_tasks(self):
  64. bucket_queue = Queue()
  65. hold_queue = Queue()
  66. m = PeriodicWorkController(bucket_queue, hold_queue)
  67. m.run_periodic_tasks()
  68. def test_on_iteration(self):
  69. bucket_queue = Queue()
  70. hold_queue = Queue()
  71. m = PeriodicWorkController(bucket_queue, hold_queue)
  72. m.on_iteration()