test_control.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from kombu.pidbox import Mailbox
  4. from vine.utils import wraps
  5. from celery import uuid
  6. from celery.app import control
  7. from celery.exceptions import DuplicateNodenameWarning
  8. from celery.five import items
  9. def _info_for_commandclass(type_):
  10. from celery.worker.control import Panel
  11. return [
  12. (name, info)
  13. for name, info in items(Panel.meta)
  14. if info.type == type_
  15. ]
  16. def test_client_implements_all_commands(app):
  17. commands = _info_for_commandclass('control')
  18. assert commands
  19. for name, info in commands:
  20. assert getattr(app.control, name)
  21. def test_inspect_implements_all_commands(app):
  22. inspect = app.control.inspect()
  23. commands = _info_for_commandclass('inspect')
  24. assert commands
  25. for name, info in commands:
  26. if info.type == 'inspect':
  27. assert getattr(inspect, name)
  28. class MockMailbox(Mailbox):
  29. sent = []
  30. def _publish(self, command, *args, **kwargs):
  31. self.__class__.sent.append(command)
  32. def close(self):
  33. pass
  34. def _collect(self, *args, **kwargs):
  35. pass
  36. class Control(control.Control):
  37. Mailbox = MockMailbox
  38. def with_mock_broadcast(fun):
  39. @wraps(fun)
  40. def _resets(*args, **kwargs):
  41. MockMailbox.sent = []
  42. try:
  43. return fun(*args, **kwargs)
  44. finally:
  45. MockMailbox.sent = []
  46. return _resets
  47. class test_flatten_reply:
  48. def test_flatten_reply(self):
  49. reply = [
  50. {'foo@example.com': {'hello': 10}},
  51. {'foo@example.com': {'hello': 20}},
  52. {'bar@example.com': {'hello': 30}}
  53. ]
  54. with pytest.warns(DuplicateNodenameWarning) as w:
  55. nodes = control.flatten_reply(reply)
  56. assert 'Received multiple replies from node name: {0}.'.format(
  57. next(iter(reply[0]))) in str(w[0].message.args[0])
  58. assert 'foo@example.com' in nodes
  59. assert 'bar@example.com' in nodes
  60. class test_inspect:
  61. def setup(self):
  62. self.c = Control(app=self.app)
  63. self.prev, self.app.control = self.app.control, self.c
  64. self.i = self.c.inspect()
  65. def test_prepare_reply(self):
  66. reply = self.i._prepare([
  67. {'w1': {'ok': 1}},
  68. {'w2': {'ok': 1}},
  69. ])
  70. assert reply == {
  71. 'w1': {'ok': 1},
  72. 'w2': {'ok': 1},
  73. }
  74. i = self.c.inspect(destination='w1')
  75. assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}
  76. @with_mock_broadcast
  77. def test_active(self):
  78. self.i.active()
  79. assert 'active' in MockMailbox.sent
  80. @with_mock_broadcast
  81. def test_clock(self):
  82. self.i.clock()
  83. assert 'clock' in MockMailbox.sent
  84. @with_mock_broadcast
  85. def test_conf(self):
  86. self.i.conf()
  87. assert 'conf' in MockMailbox.sent
  88. @with_mock_broadcast
  89. def test_hello(self):
  90. self.i.hello('george@vandelay.com')
  91. assert 'hello' in MockMailbox.sent
  92. @with_mock_broadcast
  93. def test_memsample(self):
  94. self.i.memsample()
  95. assert 'memsample' in MockMailbox.sent
  96. @with_mock_broadcast
  97. def test_memdump(self):
  98. self.i.memdump()
  99. assert 'memdump' in MockMailbox.sent
  100. @with_mock_broadcast
  101. def test_objgraph(self):
  102. self.i.objgraph()
  103. assert 'objgraph' in MockMailbox.sent
  104. @with_mock_broadcast
  105. def test_scheduled(self):
  106. self.i.scheduled()
  107. assert 'scheduled' in MockMailbox.sent
  108. @with_mock_broadcast
  109. def test_reserved(self):
  110. self.i.reserved()
  111. assert 'reserved' in MockMailbox.sent
  112. @with_mock_broadcast
  113. def test_stats(self):
  114. self.i.stats()
  115. assert 'stats' in MockMailbox.sent
  116. @with_mock_broadcast
  117. def test_revoked(self):
  118. self.i.revoked()
  119. assert 'revoked' in MockMailbox.sent
  120. @with_mock_broadcast
  121. def test_tasks(self):
  122. self.i.registered()
  123. assert 'registered' in MockMailbox.sent
  124. @with_mock_broadcast
  125. def test_ping(self):
  126. self.i.ping()
  127. assert 'ping' in MockMailbox.sent
  128. @with_mock_broadcast
  129. def test_active_queues(self):
  130. self.i.active_queues()
  131. assert 'active_queues' in MockMailbox.sent
  132. @with_mock_broadcast
  133. def test_report(self):
  134. self.i.report()
  135. assert 'report' in MockMailbox.sent
  136. class test_Broadcast:
  137. def setup(self):
  138. self.control = Control(app=self.app)
  139. self.app.control = self.control
  140. @self.app.task(shared=False)
  141. def mytask():
  142. pass
  143. self.mytask = mytask
  144. def test_purge(self):
  145. self.control.purge()
  146. @with_mock_broadcast
  147. def test_broadcast(self):
  148. self.control.broadcast('foobarbaz', arguments=[])
  149. assert 'foobarbaz' in MockMailbox.sent
  150. @with_mock_broadcast
  151. def test_broadcast_limit(self):
  152. self.control.broadcast(
  153. 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],
  154. )
  155. assert 'foobarbaz1' in MockMailbox.sent
  156. @with_mock_broadcast
  157. def test_broadcast_validate(self):
  158. with pytest.raises(ValueError):
  159. self.control.broadcast('foobarbaz2',
  160. destination='foo')
  161. @with_mock_broadcast
  162. def test_rate_limit(self):
  163. self.control.rate_limit(self.mytask.name, '100/m')
  164. assert 'rate_limit' in MockMailbox.sent
  165. @with_mock_broadcast
  166. def test_time_limit(self):
  167. self.control.time_limit(self.mytask.name, soft=10, hard=20)
  168. assert 'time_limit' in MockMailbox.sent
  169. @with_mock_broadcast
  170. def test_add_consumer(self):
  171. self.control.add_consumer('foo')
  172. assert 'add_consumer' in MockMailbox.sent
  173. @with_mock_broadcast
  174. def test_cancel_consumer(self):
  175. self.control.cancel_consumer('foo')
  176. assert 'cancel_consumer' in MockMailbox.sent
  177. @with_mock_broadcast
  178. def test_shutdown(self):
  179. self.control.shutdown()
  180. assert 'shutdown' in MockMailbox.sent
  181. @with_mock_broadcast
  182. def test_heartbeat(self):
  183. self.control.heartbeat()
  184. assert 'heartbeat' in MockMailbox.sent
  185. @with_mock_broadcast
  186. def test_pool_restart(self):
  187. self.control.pool_restart()
  188. assert 'pool_restart' in MockMailbox.sent
  189. @with_mock_broadcast
  190. def test_terminate(self):
  191. self.control.terminate('124')
  192. assert 'revoke' in MockMailbox.sent
  193. @with_mock_broadcast
  194. def test_enable_events(self):
  195. self.control.enable_events()
  196. assert 'enable_events' in MockMailbox.sent
  197. @with_mock_broadcast
  198. def test_disable_events(self):
  199. self.control.disable_events()
  200. assert 'disable_events' in MockMailbox.sent
  201. @with_mock_broadcast
  202. def test_revoke(self):
  203. self.control.revoke('foozbaaz')
  204. assert 'revoke' in MockMailbox.sent
  205. @with_mock_broadcast
  206. def test_ping(self):
  207. self.control.ping()
  208. assert 'ping' in MockMailbox.sent
  209. @with_mock_broadcast
  210. def test_election(self):
  211. self.control.election('some_id', 'topic', 'action')
  212. assert 'election' in MockMailbox.sent
  213. @with_mock_broadcast
  214. def test_pool_grow(self):
  215. self.control.pool_grow(2)
  216. assert 'pool_grow' in MockMailbox.sent
  217. @with_mock_broadcast
  218. def test_pool_shrink(self):
  219. self.control.pool_shrink(2)
  220. assert 'pool_shrink' in MockMailbox.sent
  221. @with_mock_broadcast
  222. def test_revoke_from_result(self):
  223. self.app.AsyncResult('foozbazzbar').revoke()
  224. assert 'revoke' in MockMailbox.sent
  225. @with_mock_broadcast
  226. def test_revoke_from_resultset(self):
  227. r = self.app.GroupResult(uuid(),
  228. [self.app.AsyncResult(x)
  229. for x in [uuid() for i in range(10)]])
  230. r.revoke()
  231. assert 'revoke' in MockMailbox.sent