test_worker_controllers.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import time
  2. import unittest
  3. from Queue import Queue
  4. from celery.utils import gen_unique_id
  5. from celery.worker.controllers import Mediator
  6. from celery.worker.controllers import BackgroundThread, ScheduleController
  7. class MockTask(object):
  8. task_id = 1234
  9. task_name = "mocktask"
  10. acked = False
  11. def __init__(self, value, **kwargs):
  12. self.value = value
  13. def on_ack(self):
  14. self.acked = True
  15. class MyBackgroundThread(BackgroundThread):
  16. def on_iteration(self):
  17. time.sleep(1)
  18. class TestBackgroundThread(unittest.TestCase):
  19. def test_on_iteration(self):
  20. self.assertRaises(NotImplementedError,
  21. BackgroundThread().on_iteration)
  22. def test_run(self):
  23. t = MyBackgroundThread()
  24. t._shutdown.set()
  25. t.run()
  26. self.assertTrue(t._stopped.isSet())
  27. def test_start_stop(self):
  28. t = MyBackgroundThread()
  29. t.start()
  30. self.assertFalse(t._shutdown.isSet())
  31. self.assertFalse(t._stopped.isSet())
  32. t.stop()
  33. self.assertTrue(t._shutdown.isSet())
  34. self.assertTrue(t._stopped.isSet())
  35. class TestMediator(unittest.TestCase):
  36. def test_mediator_start__stop(self):
  37. ready_queue = Queue()
  38. m = Mediator(ready_queue, lambda t: t)
  39. m.start()
  40. self.assertFalse(m._shutdown.isSet())
  41. self.assertFalse(m._stopped.isSet())
  42. m.stop()
  43. m.join()
  44. self.assertTrue(m._shutdown.isSet())
  45. self.assertTrue(m._stopped.isSet())
  46. def test_mediator_on_iteration(self):
  47. ready_queue = Queue()
  48. got = {}
  49. def mycallback(value):
  50. got["value"] = value.value
  51. m = Mediator(ready_queue, mycallback)
  52. ready_queue.put(MockTask("George Constanza"))
  53. m.on_iteration()
  54. self.assertEquals(got["value"], "George Constanza")
  55. def test_mediator_on_iteration_revoked(self):
  56. ready_queue = Queue()
  57. got = {}
  58. def mycallback(value):
  59. got["value"] = value.value
  60. m = Mediator(ready_queue, mycallback)
  61. t = MockTask("Jerry Seinfeld")
  62. t.task_id = gen_unique_id()
  63. from celery.worker.revoke import revoked
  64. revoked.add(t.task_id)
  65. ready_queue.put(t)
  66. m.on_iteration()
  67. self.assertTrue("value" not in got)
  68. self.assertTrue(t.acked)
  69. class TestScheduleController(unittest.TestCase):
  70. def test_on_iteration(self):
  71. times = range(10) + [None]
  72. c = ScheduleController(times)
  73. import time
  74. slept = [None]
  75. def _sleep(count):
  76. slept[0] = count
  77. old_sleep = time.sleep
  78. time.sleep = _sleep
  79. try:
  80. for i in times:
  81. c.on_iteration()
  82. res = i is None and 1 or i
  83. self.assertEquals(slept[0], res)
  84. finally:
  85. time.sleep = old_sleep