test_worker_controllers.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import unittest
  2. import time
  3. import multiprocessing
  4. from Queue import Queue, Empty
  5. from datetime import datetime, timedelta
  6. from celery.worker.controllers import Mediator, PeriodicWorkController
  7. from celery.worker.controllers import BackgroundThread
  8. class MockTask(object):
  9. task_id = 1234
  10. task_name = "mocktask"
  11. def __init__(self, value, **kwargs):
  12. self.value = value
  13. class MyBackgroundThread(BackgroundThread):
  14. def on_iteration(self):
  15. import time
  16. time.sleep(1)
  17. class TestBackgroundThread(unittest.TestCase):
  18. def test_on_iteration(self):
  19. self.assertRaises(NotImplementedError,
  20. BackgroundThread().on_iteration)
  21. def test_run(self):
  22. t = MyBackgroundThread()
  23. t._shutdown.set()
  24. t.run()
  25. self.assertTrue(t._stopped.isSet())
  26. def test_start_stop(self):
  27. t = MyBackgroundThread()
  28. t.start()
  29. self.assertFalse(t._shutdown.isSet())
  30. self.assertFalse(t._stopped.isSet())
  31. t.stop()
  32. self.assertTrue(t._shutdown.isSet())
  33. self.assertTrue(t._stopped.isSet())
  34. class TestMediator(unittest.TestCase):
  35. def test_mediator_start__stop(self):
  36. bucket_queue = Queue()
  37. m = Mediator(bucket_queue, lambda t: t)
  38. m.start()
  39. self.assertFalse(m._shutdown.isSet())
  40. self.assertFalse(m._stopped.isSet())
  41. m.stop()
  42. m.join()
  43. self.assertTrue(m._shutdown.isSet())
  44. self.assertTrue(m._stopped.isSet())
  45. def test_mediator_on_iteration(self):
  46. bucket_queue = Queue()
  47. got = {}
  48. def mycallback(value):
  49. got["value"] = value.value
  50. m = Mediator(bucket_queue, mycallback)
  51. bucket_queue.put(MockTask("George Constanza"))
  52. m.on_iteration()
  53. self.assertEquals(got["value"], "George Constanza")
  54. class TestPeriodicWorkController(unittest.TestCase):
  55. def test_process_hold_queue(self):
  56. bucket_queue = Queue()
  57. hold_queue = Queue()
  58. m = PeriodicWorkController(bucket_queue, hold_queue)
  59. m.process_hold_queue()
  60. scratchpad = {}
  61. def on_accept():
  62. scratchpad["accepted"] = True
  63. hold_queue.put((MockTask("task1"),
  64. datetime.now() - timedelta(days=1),
  65. on_accept))
  66. m.process_hold_queue()
  67. self.assertRaises(Empty, hold_queue.get_nowait)
  68. self.assertTrue(scratchpad.get("accepted"))
  69. self.assertEquals(bucket_queue.get_nowait().value, "task1")
  70. tomorrow = datetime.now() + timedelta(days=1)
  71. hold_queue.put((MockTask("task2"), tomorrow, on_accept))
  72. m.process_hold_queue()
  73. self.assertRaises(Empty, bucket_queue.get_nowait)
  74. value, eta, on_accept = hold_queue.get_nowait()
  75. self.assertEquals(value.value, "task2")
  76. self.assertEquals(eta, tomorrow)
  77. def test_run_periodic_tasks(self):
  78. bucket_queue = Queue()
  79. hold_queue = Queue()
  80. m = PeriodicWorkController(bucket_queue, hold_queue)
  81. m.run_periodic_tasks()
  82. def test_on_iteration(self):
  83. bucket_queue = Queue()
  84. hold_queue = Queue()
  85. m = PeriodicWorkController(bucket_queue, hold_queue)
  86. m.on_iteration()