test_control.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. from __future__ import absolute_import, unicode_literals
  2. from kombu.pidbox import Mailbox
  3. from vine.utils import wraps
  4. from celery.app import control
  5. from celery.exceptions import DuplicateNodenameWarning
  6. from celery.utils import uuid
  7. from celery.tests.case import AppCase
  8. class MockMailbox(Mailbox):
  9. sent = []
  10. def _publish(self, command, *args, **kwargs):
  11. self.__class__.sent.append(command)
  12. def close(self):
  13. pass
  14. def _collect(self, *args, **kwargs):
  15. pass
  16. class Control(control.Control):
  17. Mailbox = MockMailbox
  18. def with_mock_broadcast(fun):
  19. @wraps(fun)
  20. def _resets(*args, **kwargs):
  21. MockMailbox.sent = []
  22. try:
  23. return fun(*args, **kwargs)
  24. finally:
  25. MockMailbox.sent = []
  26. return _resets
  27. class test_flatten_reply(AppCase):
  28. def test_flatten_reply(self):
  29. reply = [
  30. {'foo@example.com': {'hello': 10}},
  31. {'foo@example.com': {'hello': 20}},
  32. {'bar@example.com': {'hello': 30}}
  33. ]
  34. with self.assertWarns(DuplicateNodenameWarning) as w:
  35. nodes = control.flatten_reply(reply)
  36. self.assertIn(
  37. 'Received multiple replies from node name: foo@example.com.',
  38. str(w.warning)
  39. )
  40. self.assertIn('foo@example.com', nodes)
  41. self.assertIn('bar@example.com', nodes)
  42. class test_inspect(AppCase):
  43. def setup(self):
  44. self.c = Control(app=self.app)
  45. self.prev, self.app.control = self.app.control, self.c
  46. self.i = self.c.inspect()
  47. def test_prepare_reply(self):
  48. self.assertDictEqual(self.i._prepare([{'w1': {'ok': 1}},
  49. {'w2': {'ok': 1}}]),
  50. {'w1': {'ok': 1}, 'w2': {'ok': 1}})
  51. i = self.c.inspect(destination='w1')
  52. self.assertEqual(i._prepare([{'w1': {'ok': 1}}]),
  53. {'ok': 1})
  54. @with_mock_broadcast
  55. def test_active(self):
  56. self.i.active()
  57. self.assertIn('dump_active', MockMailbox.sent)
  58. @with_mock_broadcast
  59. def test_clock(self):
  60. self.i.clock()
  61. self.assertIn('clock', MockMailbox.sent)
  62. @with_mock_broadcast
  63. def test_conf(self):
  64. self.i.conf()
  65. self.assertIn('dump_conf', MockMailbox.sent)
  66. @with_mock_broadcast
  67. def test_hello(self):
  68. self.i.hello('george@vandelay.com')
  69. self.assertIn('hello', MockMailbox.sent)
  70. @with_mock_broadcast
  71. def test_memsample(self):
  72. self.i.memsample()
  73. self.assertIn('memsample', MockMailbox.sent)
  74. @with_mock_broadcast
  75. def test_memdump(self):
  76. self.i.memdump()
  77. self.assertIn('memdump', MockMailbox.sent)
  78. @with_mock_broadcast
  79. def test_objgraph(self):
  80. self.i.objgraph()
  81. self.assertIn('objgraph', MockMailbox.sent)
  82. @with_mock_broadcast
  83. def test_scheduled(self):
  84. self.i.scheduled()
  85. self.assertIn('dump_schedule', MockMailbox.sent)
  86. @with_mock_broadcast
  87. def test_reserved(self):
  88. self.i.reserved()
  89. self.assertIn('dump_reserved', MockMailbox.sent)
  90. @with_mock_broadcast
  91. def test_stats(self):
  92. self.i.stats()
  93. self.assertIn('stats', MockMailbox.sent)
  94. @with_mock_broadcast
  95. def test_revoked(self):
  96. self.i.revoked()
  97. self.assertIn('dump_revoked', MockMailbox.sent)
  98. @with_mock_broadcast
  99. def test_tasks(self):
  100. self.i.registered()
  101. self.assertIn('dump_tasks', MockMailbox.sent)
  102. @with_mock_broadcast
  103. def test_ping(self):
  104. self.i.ping()
  105. self.assertIn('ping', MockMailbox.sent)
  106. @with_mock_broadcast
  107. def test_active_queues(self):
  108. self.i.active_queues()
  109. self.assertIn('active_queues', MockMailbox.sent)
  110. @with_mock_broadcast
  111. def test_report(self):
  112. self.i.report()
  113. self.assertIn('report', MockMailbox.sent)
  114. class test_Broadcast(AppCase):
  115. def setup(self):
  116. self.control = Control(app=self.app)
  117. self.app.control = self.control
  118. @self.app.task(shared=False)
  119. def mytask():
  120. pass
  121. self.mytask = mytask
  122. def test_purge(self):
  123. self.control.purge()
  124. @with_mock_broadcast
  125. def test_broadcast(self):
  126. self.control.broadcast('foobarbaz', arguments=[])
  127. self.assertIn('foobarbaz', MockMailbox.sent)
  128. @with_mock_broadcast
  129. def test_broadcast_limit(self):
  130. self.control.broadcast(
  131. 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],
  132. )
  133. self.assertIn('foobarbaz1', MockMailbox.sent)
  134. @with_mock_broadcast
  135. def test_broadcast_validate(self):
  136. with self.assertRaises(ValueError):
  137. self.control.broadcast('foobarbaz2',
  138. destination='foo')
  139. @with_mock_broadcast
  140. def test_rate_limit(self):
  141. self.control.rate_limit(self.mytask.name, '100/m')
  142. self.assertIn('rate_limit', MockMailbox.sent)
  143. @with_mock_broadcast
  144. def test_time_limit(self):
  145. self.control.time_limit(self.mytask.name, soft=10, hard=20)
  146. self.assertIn('time_limit', MockMailbox.sent)
  147. @with_mock_broadcast
  148. def test_add_consumer(self):
  149. self.control.add_consumer('foo')
  150. self.assertIn('add_consumer', MockMailbox.sent)
  151. @with_mock_broadcast
  152. def test_cancel_consumer(self):
  153. self.control.cancel_consumer('foo')
  154. self.assertIn('cancel_consumer', MockMailbox.sent)
  155. @with_mock_broadcast
  156. def test_enable_events(self):
  157. self.control.enable_events()
  158. self.assertIn('enable_events', MockMailbox.sent)
  159. @with_mock_broadcast
  160. def test_disable_events(self):
  161. self.control.disable_events()
  162. self.assertIn('disable_events', MockMailbox.sent)
  163. @with_mock_broadcast
  164. def test_revoke(self):
  165. self.control.revoke('foozbaaz')
  166. self.assertIn('revoke', MockMailbox.sent)
  167. @with_mock_broadcast
  168. def test_ping(self):
  169. self.control.ping()
  170. self.assertIn('ping', MockMailbox.sent)
  171. @with_mock_broadcast
  172. def test_election(self):
  173. self.control.election('some_id', 'topic', 'action')
  174. self.assertIn('election', MockMailbox.sent)
  175. @with_mock_broadcast
  176. def test_pool_grow(self):
  177. self.control.pool_grow(2)
  178. self.assertIn('pool_grow', MockMailbox.sent)
  179. @with_mock_broadcast
  180. def test_pool_shrink(self):
  181. self.control.pool_shrink(2)
  182. self.assertIn('pool_shrink', MockMailbox.sent)
  183. @with_mock_broadcast
  184. def test_revoke_from_result(self):
  185. self.app.AsyncResult('foozbazzbar').revoke()
  186. self.assertIn('revoke', MockMailbox.sent)
  187. @with_mock_broadcast
  188. def test_revoke_from_resultset(self):
  189. r = self.app.GroupResult(uuid(),
  190. [self.app.AsyncResult(x)
  191. for x in [uuid() for i in range(10)]])
  192. r.revoke()
  193. self.assertIn('revoke', MockMailbox.sent)