test_control.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from kombu.pidbox import Mailbox
  4. from vine.utils import wraps
  5. from celery import uuid
  6. from celery.app import control
  7. from celery.exceptions import DuplicateNodenameWarning
  8. class MockMailbox(Mailbox):
  9. sent = []
  10. def _publish(self, command, *args, **kwargs):
  11. self.__class__.sent.append(command)
  12. def close(self):
  13. pass
  14. def _collect(self, *args, **kwargs):
  15. pass
  16. class Control(control.Control):
  17. Mailbox = MockMailbox
  18. def with_mock_broadcast(fun):
  19. @wraps(fun)
  20. def _resets(*args, **kwargs):
  21. MockMailbox.sent = []
  22. try:
  23. return fun(*args, **kwargs)
  24. finally:
  25. MockMailbox.sent = []
  26. return _resets
  27. class test_flatten_reply:
  28. def test_flatten_reply(self):
  29. reply = [
  30. {'foo@example.com': {'hello': 10}},
  31. {'foo@example.com': {'hello': 20}},
  32. {'bar@example.com': {'hello': 30}}
  33. ]
  34. with pytest.warns(DuplicateNodenameWarning) as w:
  35. nodes = control.flatten_reply(reply)
  36. assert 'Received multiple replies from node name: {0}.'.format(
  37. next(iter(reply[0]))) in str(w[0].message.args[0])
  38. assert 'foo@example.com' in nodes
  39. assert 'bar@example.com' in nodes
  40. class test_inspect:
  41. def setup(self):
  42. self.c = Control(app=self.app)
  43. self.prev, self.app.control = self.app.control, self.c
  44. self.i = self.c.inspect()
  45. def test_prepare_reply(self):
  46. reply = self.i._prepare([
  47. {'w1': {'ok': 1}},
  48. {'w2': {'ok': 1}},
  49. ])
  50. assert reply == {
  51. 'w1': {'ok': 1},
  52. 'w2': {'ok': 1},
  53. }
  54. i = self.c.inspect(destination='w1')
  55. assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}
  56. @with_mock_broadcast
  57. def test_active(self):
  58. self.i.active()
  59. assert 'active' in MockMailbox.sent
  60. @with_mock_broadcast
  61. def test_clock(self):
  62. self.i.clock()
  63. assert 'clock' in MockMailbox.sent
  64. @with_mock_broadcast
  65. def test_conf(self):
  66. self.i.conf()
  67. assert 'conf' in MockMailbox.sent
  68. @with_mock_broadcast
  69. def test_hello(self):
  70. self.i.hello('george@vandelay.com')
  71. assert 'hello' in MockMailbox.sent
  72. @with_mock_broadcast
  73. def test_memsample(self):
  74. self.i.memsample()
  75. assert 'memsample' in MockMailbox.sent
  76. @with_mock_broadcast
  77. def test_memdump(self):
  78. self.i.memdump()
  79. assert 'memdump' in MockMailbox.sent
  80. @with_mock_broadcast
  81. def test_objgraph(self):
  82. self.i.objgraph()
  83. assert 'objgraph' in MockMailbox.sent
  84. @with_mock_broadcast
  85. def test_scheduled(self):
  86. self.i.scheduled()
  87. assert 'scheduled' in MockMailbox.sent
  88. @with_mock_broadcast
  89. def test_reserved(self):
  90. self.i.reserved()
  91. assert 'reserved' in MockMailbox.sent
  92. @with_mock_broadcast
  93. def test_stats(self):
  94. self.i.stats()
  95. assert 'stats' in MockMailbox.sent
  96. @with_mock_broadcast
  97. def test_revoked(self):
  98. self.i.revoked()
  99. assert 'revoked' in MockMailbox.sent
  100. @with_mock_broadcast
  101. def test_tasks(self):
  102. self.i.registered()
  103. assert 'registered' in MockMailbox.sent
  104. @with_mock_broadcast
  105. def test_ping(self):
  106. self.i.ping()
  107. assert 'ping' in MockMailbox.sent
  108. @with_mock_broadcast
  109. def test_active_queues(self):
  110. self.i.active_queues()
  111. assert 'active_queues' in MockMailbox.sent
  112. @with_mock_broadcast
  113. def test_report(self):
  114. self.i.report()
  115. assert 'report' in MockMailbox.sent
  116. class test_Broadcast:
  117. def setup(self):
  118. self.control = Control(app=self.app)
  119. self.app.control = self.control
  120. @self.app.task(shared=False)
  121. def mytask():
  122. pass
  123. self.mytask = mytask
  124. def test_purge(self):
  125. self.control.purge()
  126. @with_mock_broadcast
  127. def test_broadcast(self):
  128. self.control.broadcast('foobarbaz', arguments=[])
  129. assert 'foobarbaz' in MockMailbox.sent
  130. @with_mock_broadcast
  131. def test_broadcast_limit(self):
  132. self.control.broadcast(
  133. 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],
  134. )
  135. assert 'foobarbaz1' in MockMailbox.sent
  136. @with_mock_broadcast
  137. def test_broadcast_validate(self):
  138. with pytest.raises(ValueError):
  139. self.control.broadcast('foobarbaz2',
  140. destination='foo')
  141. @with_mock_broadcast
  142. def test_rate_limit(self):
  143. self.control.rate_limit(self.mytask.name, '100/m')
  144. assert 'rate_limit' in MockMailbox.sent
  145. @with_mock_broadcast
  146. def test_time_limit(self):
  147. self.control.time_limit(self.mytask.name, soft=10, hard=20)
  148. assert 'time_limit' in MockMailbox.sent
  149. @with_mock_broadcast
  150. def test_add_consumer(self):
  151. self.control.add_consumer('foo')
  152. assert 'add_consumer' in MockMailbox.sent
  153. @with_mock_broadcast
  154. def test_cancel_consumer(self):
  155. self.control.cancel_consumer('foo')
  156. assert 'cancel_consumer' in MockMailbox.sent
  157. @with_mock_broadcast
  158. def test_enable_events(self):
  159. self.control.enable_events()
  160. assert 'enable_events' in MockMailbox.sent
  161. @with_mock_broadcast
  162. def test_disable_events(self):
  163. self.control.disable_events()
  164. assert 'disable_events' in MockMailbox.sent
  165. @with_mock_broadcast
  166. def test_revoke(self):
  167. self.control.revoke('foozbaaz')
  168. assert 'revoke' in MockMailbox.sent
  169. @with_mock_broadcast
  170. def test_ping(self):
  171. self.control.ping()
  172. assert 'ping' in MockMailbox.sent
  173. @with_mock_broadcast
  174. def test_election(self):
  175. self.control.election('some_id', 'topic', 'action')
  176. assert 'election' in MockMailbox.sent
  177. @with_mock_broadcast
  178. def test_pool_grow(self):
  179. self.control.pool_grow(2)
  180. assert 'pool_grow' in MockMailbox.sent
  181. @with_mock_broadcast
  182. def test_pool_shrink(self):
  183. self.control.pool_shrink(2)
  184. assert 'pool_shrink' in MockMailbox.sent
  185. @with_mock_broadcast
  186. def test_revoke_from_result(self):
  187. self.app.AsyncResult('foozbazzbar').revoke()
  188. assert 'revoke' in MockMailbox.sent
  189. @with_mock_broadcast
  190. def test_revoke_from_resultset(self):
  191. r = self.app.GroupResult(uuid(),
  192. [self.app.AsyncResult(x)
  193. for x in [uuid() for i in range(10)]])
  194. r.revoke()
  195. assert 'revoke' in MockMailbox.sent