test_worker_mediator.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from celery.tests.utils import unittest
  2. from Queue import Queue
  3. from celery.utils import gen_unique_id
  4. from celery.worker.mediator import Mediator
  5. from celery.worker.state import revoked as revoked_tasks
  6. class MockTask(object):
  7. hostname = "harness.com"
  8. task_id = 1234
  9. task_name = "mocktask"
  10. acked = False
  11. def __init__(self, value, **kwargs):
  12. self.value = value
  13. def on_ack(self):
  14. self.acked = True
  15. def revoked(self):
  16. if self.task_id in revoked_tasks:
  17. self.on_ack()
  18. return True
  19. return False
  20. class test_Mediator(unittest.TestCase):
  21. def test_mediator_start__stop(self):
  22. ready_queue = Queue()
  23. m = Mediator(ready_queue, lambda t: t)
  24. m.start()
  25. self.assertFalse(m._shutdown.isSet())
  26. self.assertFalse(m._stopped.isSet())
  27. m.stop()
  28. m.join()
  29. self.assertTrue(m._shutdown.isSet())
  30. self.assertTrue(m._stopped.isSet())
  31. def test_mediator_move(self):
  32. ready_queue = Queue()
  33. got = {}
  34. def mycallback(value):
  35. got["value"] = value.value
  36. m = Mediator(ready_queue, mycallback)
  37. ready_queue.put(MockTask("George Costanza"))
  38. m.move()
  39. self.assertEqual(got["value"], "George Costanza")
  40. def test_mediator_move_exception(self):
  41. ready_queue = Queue()
  42. def mycallback(value):
  43. raise KeyError("foo")
  44. m = Mediator(ready_queue, mycallback)
  45. ready_queue.put(MockTask("Elaine M. Benes"))
  46. m.move()
  47. def test_run(self):
  48. ready_queue = Queue()
  49. condition = [None]
  50. def mycallback(value):
  51. condition[0].set()
  52. m = Mediator(ready_queue, mycallback)
  53. condition[0] = m._shutdown
  54. ready_queue.put(MockTask("Elaine M. Benes"))
  55. m.run()
  56. self.assertTrue(m._shutdown.isSet())
  57. self.assertTrue(m._stopped.isSet())
  58. def test_mediator_move_revoked(self):
  59. ready_queue = Queue()
  60. got = {}
  61. def mycallback(value):
  62. got["value"] = value.value
  63. m = Mediator(ready_queue, mycallback)
  64. t = MockTask("Jerry Seinfeld")
  65. t.task_id = gen_unique_id()
  66. revoked_tasks.add(t.task_id)
  67. ready_queue.put(t)
  68. m.move()
  69. self.assertNotIn("value", got)
  70. self.assertTrue(t.acked)