test_task_control.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import unittest2 as unittest
  2. from celery.task import control
  3. from celery.task.builtins import PingTask
  4. from celery.utils import gen_unique_id
  5. from celery.utils.functional import wraps
  6. class MockBroadcastPublisher(object):
  7. sent = []
  8. def __init__(self, *args, **kwargs):
  9. pass
  10. def send(self, command, *args, **kwargs):
  11. self.__class__.sent.append(command)
  12. def close(self):
  13. pass
  14. class MockControlReplyConsumer(object):
  15. def __init__(self, *args, **kwarg):
  16. pass
  17. def collect(self, *args, **kwargs):
  18. pass
  19. def close(self):
  20. pass
  21. def with_mock_broadcast(fun):
  22. @wraps(fun)
  23. def _mocked(*args, **kwargs):
  24. old_pub = control.BroadcastPublisher
  25. old_rep = control.ControlReplyConsumer
  26. control.BroadcastPublisher = MockBroadcastPublisher
  27. control.ControlReplyConsumer = MockControlReplyConsumer
  28. try:
  29. return fun(*args, **kwargs)
  30. finally:
  31. MockBroadcastPublisher.sent = []
  32. control.BroadcastPublisher = old_pub
  33. control.ControlReplyConsumer = old_rep
  34. return _mocked
  35. class test_inspect(unittest.TestCase):
  36. def setUp(self):
  37. self.i = control.inspect()
  38. def test_prepare_reply(self):
  39. self.assertDictEqual(self.i._prepare([{"w1": {"ok": 1}},
  40. {"w2": {"ok": 1}}]),
  41. {"w1": {"ok": 1}, "w2": {"ok": 1}})
  42. i = control.inspect(destination="w1")
  43. self.assertEqual(i._prepare([{"w1": {"ok": 1}}]),
  44. {"ok": 1})
  45. @with_mock_broadcast
  46. def test_active(self):
  47. self.i.active()
  48. self.assertIn("dump_active", MockBroadcastPublisher.sent)
  49. @with_mock_broadcast
  50. def test_scheduled(self):
  51. self.i.scheduled()
  52. self.assertIn("dump_schedule", MockBroadcastPublisher.sent)
  53. @with_mock_broadcast
  54. def test_reserved(self):
  55. self.i.reserved()
  56. self.assertIn("dump_reserved", MockBroadcastPublisher.sent)
  57. @with_mock_broadcast
  58. def test_stats(self):
  59. self.i.stats()
  60. self.assertIn("stats", MockBroadcastPublisher.sent)
  61. @with_mock_broadcast
  62. def test_revoked(self):
  63. self.i.revoked()
  64. self.assertIn("dump_revoked", MockBroadcastPublisher.sent)
  65. @with_mock_broadcast
  66. def test_registered_tasks(self):
  67. self.i.registered_tasks()
  68. self.assertIn("dump_tasks", MockBroadcastPublisher.sent)
  69. @with_mock_broadcast
  70. def test_enable_events(self):
  71. self.i.enable_events()
  72. self.assertIn("enable_events", MockBroadcastPublisher.sent)
  73. @with_mock_broadcast
  74. def test_disable_events(self):
  75. self.i.disable_events()
  76. self.assertIn("disable_events", MockBroadcastPublisher.sent)
  77. @with_mock_broadcast
  78. def test_diagnose(self):
  79. self.i.diagnose()
  80. self.assertIn("diagnose", MockBroadcastPublisher.sent)
  81. @with_mock_broadcast
  82. def test_ping(self):
  83. self.i.ping()
  84. self.assertIn("ping", MockBroadcastPublisher.sent)
  85. class test_Broadcast(unittest.TestCase):
  86. def test_discard_all(self):
  87. control.discard_all()
  88. @with_mock_broadcast
  89. def test_broadcast(self):
  90. control.broadcast("foobarbaz", arguments=[])
  91. self.assertIn("foobarbaz", MockBroadcastPublisher.sent)
  92. @with_mock_broadcast
  93. def test_broadcast_limit(self):
  94. control.broadcast("foobarbaz1", arguments=[], limit=None,
  95. destination=[1, 2, 3])
  96. self.assertIn("foobarbaz1", MockBroadcastPublisher.sent)
  97. @with_mock_broadcast
  98. def test_broadcast_validate(self):
  99. self.assertRaises(ValueError, control.broadcast, "foobarbaz2",
  100. destination="foo")
  101. @with_mock_broadcast
  102. def test_rate_limit(self):
  103. control.rate_limit(PingTask.name, "100/m")
  104. self.assertIn("rate_limit", MockBroadcastPublisher.sent)
  105. @with_mock_broadcast
  106. def test_revoke(self):
  107. control.revoke("foozbaaz")
  108. self.assertIn("revoke", MockBroadcastPublisher.sent)
  109. @with_mock_broadcast
  110. def test_ping(self):
  111. control.ping()
  112. self.assertIn("ping", MockBroadcastPublisher.sent)
  113. @with_mock_broadcast
  114. def test_revoke_from_result(self):
  115. from celery.result import AsyncResult
  116. AsyncResult("foozbazzbar").revoke()
  117. self.assertIn("revoke", MockBroadcastPublisher.sent)
  118. @with_mock_broadcast
  119. def test_revoke_from_resultset(self):
  120. from celery.result import TaskSetResult, AsyncResult
  121. r = TaskSetResult(gen_unique_id(), map(AsyncResult, [gen_unique_id()
  122. for i in range(10)]))
  123. r.revoke()
  124. self.assertIn("revoke", MockBroadcastPublisher.sent)