test_worker_controllers.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import time
  2. import unittest2 as unittest
  3. from Queue import Queue
  4. from celery.utils import gen_unique_id
  5. from celery.worker.controllers import Mediator
  6. from celery.worker.controllers import BackgroundThread, ScheduleController
  7. from celery.worker.revoke import revoked as revoked_tasks
  8. class MockTask(object):
  9. task_id = 1234
  10. task_name = "mocktask"
  11. acked = False
  12. def __init__(self, value, **kwargs):
  13. self.value = value
  14. def on_ack(self):
  15. self.acked = True
  16. def revoked(self):
  17. if self.task_id in revoked_tasks:
  18. self.on_ack()
  19. return True
  20. return False
  21. class MyBackgroundThread(BackgroundThread):
  22. def on_iteration(self):
  23. time.sleep(1)
  24. class TestBackgroundThread(unittest.TestCase):
  25. def test_on_iteration(self):
  26. self.assertRaises(NotImplementedError,
  27. BackgroundThread().on_iteration)
  28. def test_run(self):
  29. t = MyBackgroundThread()
  30. t._shutdown.set()
  31. t.run()
  32. self.assertTrue(t._stopped.isSet())
  33. def test_start_stop(self):
  34. t = MyBackgroundThread()
  35. t.start()
  36. self.assertFalse(t._shutdown.isSet())
  37. self.assertFalse(t._stopped.isSet())
  38. t.stop()
  39. self.assertTrue(t._shutdown.isSet())
  40. self.assertTrue(t._stopped.isSet())
  41. class TestMediator(unittest.TestCase):
  42. def test_mediator_start__stop(self):
  43. ready_queue = Queue()
  44. m = Mediator(ready_queue, lambda t: t)
  45. m.start()
  46. self.assertFalse(m._shutdown.isSet())
  47. self.assertFalse(m._stopped.isSet())
  48. m.stop()
  49. m.join()
  50. self.assertTrue(m._shutdown.isSet())
  51. self.assertTrue(m._stopped.isSet())
  52. def test_mediator_on_iteration(self):
  53. ready_queue = Queue()
  54. got = {}
  55. def mycallback(value):
  56. got["value"] = value.value
  57. m = Mediator(ready_queue, mycallback)
  58. ready_queue.put(MockTask("George Constanza"))
  59. m.on_iteration()
  60. self.assertEqual(got["value"], "George Constanza")
  61. def test_mediator_on_iteration_revoked(self):
  62. ready_queue = Queue()
  63. got = {}
  64. def mycallback(value):
  65. got["value"] = value.value
  66. m = Mediator(ready_queue, mycallback)
  67. t = MockTask("Jerry Seinfeld")
  68. t.task_id = gen_unique_id()
  69. revoked_tasks.add(t.task_id)
  70. ready_queue.put(t)
  71. m.on_iteration()
  72. self.assertNotIn("value", got)
  73. self.assertTrue(t.acked)
  74. class TestScheduleController(unittest.TestCase):
  75. def test_on_iteration(self):
  76. times = range(10) + [None]
  77. c = ScheduleController(times)
  78. import time
  79. slept = [None]
  80. def _sleep(count):
  81. slept[0] = count
  82. old_sleep = time.sleep
  83. time.sleep = _sleep
  84. try:
  85. for i in times:
  86. c.on_iteration()
  87. res = i is None and 1 or i
  88. self.assertEqual(slept[0], res)
  89. finally:
  90. time.sleep = old_sleep