test_worker_mediator.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. from __future__ import absolute_import
  2. import sys
  3. from Queue import Queue
  4. from mock import Mock, patch
  5. from celery.utils import uuid
  6. from celery.worker.mediator import Mediator
  7. from celery.worker.state import revoked as revoked_tasks
  8. from celery.tests.utils import Case
  9. class MockTask(object):
  10. hostname = "harness.com"
  11. id = 1234
  12. name = "mocktask"
  13. def __init__(self, value, **kwargs):
  14. self.value = value
  15. on_ack = Mock()
  16. def revoked(self):
  17. if self.id in revoked_tasks:
  18. self.on_ack()
  19. return True
  20. return False
  21. class test_Mediator(Case):
  22. def test_mediator_start__stop(self):
  23. ready_queue = Queue()
  24. m = Mediator(ready_queue, lambda t: t)
  25. m.start()
  26. self.assertFalse(m._is_shutdown.isSet())
  27. self.assertFalse(m._is_stopped.isSet())
  28. m.stop()
  29. m.join()
  30. self.assertTrue(m._is_shutdown.isSet())
  31. self.assertTrue(m._is_stopped.isSet())
  32. def test_mediator_body(self):
  33. ready_queue = Queue()
  34. got = {}
  35. def mycallback(value):
  36. got["value"] = value.value
  37. m = Mediator(ready_queue, mycallback)
  38. ready_queue.put(MockTask("George Costanza"))
  39. m.body()
  40. self.assertEqual(got["value"], "George Costanza")
  41. @patch("os._exit")
  42. def test_mediator_crash(self, _exit):
  43. ms = [None]
  44. class _Mediator(Mediator):
  45. def body(self):
  46. try:
  47. raise KeyError("foo")
  48. finally:
  49. ms[0]._is_shutdown.set()
  50. ready_queue = Queue()
  51. ms[0] = m = _Mediator(ready_queue, None)
  52. ready_queue.put(MockTask("George Constanza"))
  53. stderr = Mock()
  54. p, sys.stderr = sys.stderr, stderr
  55. try:
  56. m.run()
  57. finally:
  58. sys.stderr = p
  59. self.assertTrue(_exit.call_count)
  60. self.assertTrue(stderr.write.call_count)
  61. def test_mediator_body_exception(self):
  62. ready_queue = Queue()
  63. def mycallback(value):
  64. raise KeyError("foo")
  65. m = Mediator(ready_queue, mycallback)
  66. ready_queue.put(MockTask("Elaine M. Benes"))
  67. m.body()
  68. def test_run(self):
  69. ready_queue = Queue()
  70. condition = [None]
  71. def mycallback(value):
  72. condition[0].set()
  73. m = Mediator(ready_queue, mycallback)
  74. condition[0] = m._is_shutdown
  75. ready_queue.put(MockTask("Elaine M. Benes"))
  76. m.run()
  77. self.assertTrue(m._is_shutdown.isSet())
  78. self.assertTrue(m._is_stopped.isSet())
  79. def test_mediator_body_revoked(self):
  80. ready_queue = Queue()
  81. got = {}
  82. def mycallback(value):
  83. got["value"] = value.value
  84. m = Mediator(ready_queue, mycallback)
  85. t = MockTask("Jerry Seinfeld")
  86. t.id = uuid()
  87. revoked_tasks.add(t.id)
  88. ready_queue.put(t)
  89. m.body()
  90. self.assertNotIn("value", got)
  91. self.assertTrue(t.on_ack.call_count)