test_control.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. from __future__ import absolute_import
  2. import warnings
  3. from functools import wraps
  4. from kombu.pidbox import Mailbox
  5. from celery.app import control
  6. from celery.utils import uuid
  7. from celery.tests.case import AppCase, Case
  8. class MockMailbox(Mailbox):
  9. sent = []
  10. def _publish(self, command, *args, **kwargs):
  11. self.__class__.sent.append(command)
  12. def close(self):
  13. pass
  14. def _collect(self, *args, **kwargs):
  15. pass
  16. class Control(control.Control):
  17. Mailbox = MockMailbox
  18. def with_mock_broadcast(fun):
  19. @wraps(fun)
  20. def _resets(*args, **kwargs):
  21. MockMailbox.sent = []
  22. try:
  23. return fun(*args, **kwargs)
  24. finally:
  25. MockMailbox.sent = []
  26. return _resets
  27. class test_flatten_reply(Case):
  28. def test_flatten_reply(self):
  29. reply = [
  30. {'foo@example.com': {'hello': 10}},
  31. {'foo@example.com': {'hello': 20}},
  32. {'bar@example.com': {'hello': 30}}
  33. ]
  34. with warnings.catch_warnings(record=True) as w:
  35. nodes = control.flatten_reply(reply)
  36. self.assertIn(
  37. 'multiple replies',
  38. str(w[-1].message),
  39. )
  40. self.assertIn('foo@example.com', nodes)
  41. self.assertIn('bar@example.com', nodes)
  42. class test_inspect(AppCase):
  43. def setup(self):
  44. self.c = Control(app=self.app)
  45. self.prev, self.app.control = self.app.control, self.c
  46. self.i = self.c.inspect()
  47. def tearDown(self):
  48. self.app.control = self.prev
  49. def test_prepare_reply(self):
  50. self.assertDictEqual(self.i._prepare([{'w1': {'ok': 1}},
  51. {'w2': {'ok': 1}}]),
  52. {'w1': {'ok': 1}, 'w2': {'ok': 1}})
  53. i = self.c.inspect(destination='w1')
  54. self.assertEqual(i._prepare([{'w1': {'ok': 1}}]),
  55. {'ok': 1})
  56. @with_mock_broadcast
  57. def test_active(self):
  58. self.i.active()
  59. self.assertIn('dump_active', MockMailbox.sent)
  60. @with_mock_broadcast
  61. def test_clock(self):
  62. self.i.clock()
  63. self.assertIn('clock', MockMailbox.sent)
  64. @with_mock_broadcast
  65. def test_conf(self):
  66. self.i.conf()
  67. self.assertIn('dump_conf', MockMailbox.sent)
  68. @with_mock_broadcast
  69. def test_hello(self):
  70. self.i.hello()
  71. self.assertIn('hello', MockMailbox.sent)
  72. @with_mock_broadcast
  73. def test_memsample(self):
  74. self.i.memsample()
  75. self.assertIn('memsample', MockMailbox.sent)
  76. @with_mock_broadcast
  77. def test_memdump(self):
  78. self.i.memdump()
  79. self.assertIn('memdump', MockMailbox.sent)
  80. @with_mock_broadcast
  81. def test_objgraph(self):
  82. self.i.objgraph()
  83. self.assertIn('objgraph', MockMailbox.sent)
  84. @with_mock_broadcast
  85. def test_scheduled(self):
  86. self.i.scheduled()
  87. self.assertIn('dump_schedule', MockMailbox.sent)
  88. @with_mock_broadcast
  89. def test_reserved(self):
  90. self.i.reserved()
  91. self.assertIn('dump_reserved', MockMailbox.sent)
  92. @with_mock_broadcast
  93. def test_stats(self):
  94. self.i.stats()
  95. self.assertIn('stats', MockMailbox.sent)
  96. @with_mock_broadcast
  97. def test_revoked(self):
  98. self.i.revoked()
  99. self.assertIn('dump_revoked', MockMailbox.sent)
  100. @with_mock_broadcast
  101. def test_tasks(self):
  102. self.i.registered()
  103. self.assertIn('dump_tasks', MockMailbox.sent)
  104. @with_mock_broadcast
  105. def test_ping(self):
  106. self.i.ping()
  107. self.assertIn('ping', MockMailbox.sent)
  108. @with_mock_broadcast
  109. def test_active_queues(self):
  110. self.i.active_queues()
  111. self.assertIn('active_queues', MockMailbox.sent)
  112. @with_mock_broadcast
  113. def test_report(self):
  114. self.i.report()
  115. self.assertIn('report', MockMailbox.sent)
  116. class test_Broadcast(AppCase):
  117. def setup(self):
  118. self.control = Control(app=self.app)
  119. self.app.control = self.control
  120. @self.app.task()
  121. def mytask():
  122. pass
  123. self.mytask = mytask
  124. def tearDown(self):
  125. del(self.app.control)
  126. def test_purge(self):
  127. self.control.purge()
  128. @with_mock_broadcast
  129. def test_broadcast(self):
  130. self.control.broadcast('foobarbaz', arguments=[])
  131. self.assertIn('foobarbaz', MockMailbox.sent)
  132. @with_mock_broadcast
  133. def test_broadcast_limit(self):
  134. self.control.broadcast(
  135. 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],
  136. )
  137. self.assertIn('foobarbaz1', MockMailbox.sent)
  138. @with_mock_broadcast
  139. def test_broadcast_validate(self):
  140. with self.assertRaises(ValueError):
  141. self.control.broadcast('foobarbaz2',
  142. destination='foo')
  143. @with_mock_broadcast
  144. def test_rate_limit(self):
  145. self.control.rate_limit(self.mytask.name, '100/m')
  146. self.assertIn('rate_limit', MockMailbox.sent)
  147. @with_mock_broadcast
  148. def test_time_limit(self):
  149. self.control.time_limit(self.mytask.name, soft=10, hard=20)
  150. self.assertIn('time_limit', MockMailbox.sent)
  151. @with_mock_broadcast
  152. def test_add_consumer(self):
  153. self.control.add_consumer('foo')
  154. self.assertIn('add_consumer', MockMailbox.sent)
  155. @with_mock_broadcast
  156. def test_cancel_consumer(self):
  157. self.control.cancel_consumer('foo')
  158. self.assertIn('cancel_consumer', MockMailbox.sent)
  159. @with_mock_broadcast
  160. def test_enable_events(self):
  161. self.control.enable_events()
  162. self.assertIn('enable_events', MockMailbox.sent)
  163. @with_mock_broadcast
  164. def test_disable_events(self):
  165. self.control.disable_events()
  166. self.assertIn('disable_events', MockMailbox.sent)
  167. @with_mock_broadcast
  168. def test_revoke(self):
  169. self.control.revoke('foozbaaz')
  170. self.assertIn('revoke', MockMailbox.sent)
  171. @with_mock_broadcast
  172. def test_ping(self):
  173. self.control.ping()
  174. self.assertIn('ping', MockMailbox.sent)
  175. @with_mock_broadcast
  176. def test_election(self):
  177. self.control.election('some_id', 'topic', 'action')
  178. self.assertIn('election', MockMailbox.sent)
  179. @with_mock_broadcast
  180. def test_pool_grow(self):
  181. self.control.pool_grow(2)
  182. self.assertIn('pool_grow', MockMailbox.sent)
  183. @with_mock_broadcast
  184. def test_pool_shrink(self):
  185. self.control.pool_shrink(2)
  186. self.assertIn('pool_shrink', MockMailbox.sent)
  187. @with_mock_broadcast
  188. def test_revoke_from_result(self):
  189. self.app.AsyncResult('foozbazzbar').revoke()
  190. self.assertIn('revoke', MockMailbox.sent)
  191. @with_mock_broadcast
  192. def test_revoke_from_resultset(self):
  193. r = self.app.GroupResult(uuid(),
  194. [self.app.AsyncResult(x)
  195. for x in [uuid() for i in range(10)]])
  196. r.revoke()
  197. self.assertIn('revoke', MockMailbox.sent)