test_control.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock
  4. from celery import uuid
  5. from celery.app import control
  6. from celery.exceptions import DuplicateNodenameWarning
  7. from celery.five import items
  8. from celery.utils.collections import LimitedSet
  9. def _info_for_commandclass(type_):
  10. from celery.worker.control import Panel
  11. return [
  12. (name, info)
  13. for name, info in items(Panel.meta)
  14. if info.type == type_
  15. ]
  16. def test_client_implements_all_commands(app):
  17. commands = _info_for_commandclass('control')
  18. assert commands
  19. for name, info in commands:
  20. assert getattr(app.control, name)
  21. def test_inspect_implements_all_commands(app):
  22. inspect = app.control.inspect()
  23. commands = _info_for_commandclass('inspect')
  24. assert commands
  25. for name, info in commands:
  26. if info.type == 'inspect':
  27. assert getattr(inspect, name)
  28. class test_flatten_reply:
  29. def test_flatten_reply(self):
  30. reply = [
  31. {'foo@example.com': {'hello': 10}},
  32. {'foo@example.com': {'hello': 20}},
  33. {'bar@example.com': {'hello': 30}}
  34. ]
  35. with pytest.warns(DuplicateNodenameWarning) as w:
  36. nodes = control.flatten_reply(reply)
  37. assert 'Received multiple replies from node name: {0}.'.format(
  38. next(iter(reply[0]))) in str(w[0].message.args[0])
  39. assert 'foo@example.com' in nodes
  40. assert 'bar@example.com' in nodes
  41. class test_inspect:
  42. def setup(self):
  43. self.app.control.broadcast = Mock(name='broadcast')
  44. self.app.control.broadcast.return_value = {}
  45. self.inspect = self.app.control.inspect()
  46. def test_prepare_reply(self):
  47. reply = self.inspect._prepare([
  48. {'w1': {'ok': 1}},
  49. {'w2': {'ok': 1}},
  50. ])
  51. assert reply == {
  52. 'w1': {'ok': 1},
  53. 'w2': {'ok': 1},
  54. }
  55. i = self.app.control.inspect(destination='w1')
  56. assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}
  57. def assert_broadcast_called(self, command,
  58. destination=None,
  59. callback=None,
  60. connection=None,
  61. limit=None,
  62. timeout=None,
  63. reply=True,
  64. **arguments):
  65. self.app.control.broadcast.assert_called_with(
  66. command,
  67. arguments=arguments,
  68. destination=destination or self.inspect.destination,
  69. callback=callback or self.inspect.callback,
  70. connection=connection or self.inspect.connection,
  71. limit=limit if limit is not None else self.inspect.limit,
  72. timeout=timeout if timeout is not None else self.inspect.timeout,
  73. reply=reply,
  74. )
  75. def test_active(self):
  76. self.inspect.active()
  77. self.assert_broadcast_called('active')
  78. def test_clock(self):
  79. self.inspect.clock()
  80. self.assert_broadcast_called('clock')
  81. def test_conf(self):
  82. self.inspect.conf()
  83. self.assert_broadcast_called('conf', with_defaults=False)
  84. def test_conf__with_defaults(self):
  85. self.inspect.conf(with_defaults=True)
  86. self.assert_broadcast_called('conf', with_defaults=True)
  87. def test_hello(self):
  88. self.inspect.hello('george@vandelay.com')
  89. self.assert_broadcast_called(
  90. 'hello', from_node='george@vandelay.com', revoked=None)
  91. def test_hello__with_revoked(self):
  92. revoked = LimitedSet(100)
  93. for i in range(100):
  94. revoked.add('id{0}'.format(i))
  95. self.inspect.hello('george@vandelay.com', revoked=revoked._data)
  96. self.assert_broadcast_called(
  97. 'hello', from_node='george@vandelay.com', revoked=revoked._data)
  98. def test_memsample(self):
  99. self.inspect.memsample()
  100. self.assert_broadcast_called('memsample')
  101. def test_memdump(self):
  102. self.inspect.memdump()
  103. self.assert_broadcast_called('memdump', samples=10)
  104. def test_memdump__samples_specified(self):
  105. self.inspect.memdump(samples=303)
  106. self.assert_broadcast_called('memdump', samples=303)
  107. def test_objgraph(self):
  108. self.inspect.objgraph()
  109. self.assert_broadcast_called(
  110. 'objgraph', num=200, type='Request', max_depth=10)
  111. def test_scheduled(self):
  112. self.inspect.scheduled()
  113. self.assert_broadcast_called('scheduled')
  114. def test_reserved(self):
  115. self.inspect.reserved()
  116. self.assert_broadcast_called('reserved')
  117. def test_stats(self):
  118. self.inspect.stats()
  119. self.assert_broadcast_called('stats')
  120. def test_revoked(self):
  121. self.inspect.revoked()
  122. self.assert_broadcast_called('revoked')
  123. def test_registered(self):
  124. self.inspect.registered()
  125. self.assert_broadcast_called('registered', taskinfoitems=())
  126. def test_registered__taskinfoitems(self):
  127. self.inspect.registered('rate_limit', 'time_limit')
  128. self.assert_broadcast_called(
  129. 'registered',
  130. taskinfoitems=('rate_limit', 'time_limit'),
  131. )
  132. def test_ping(self):
  133. self.inspect.ping()
  134. self.assert_broadcast_called('ping')
  135. def test_active_queues(self):
  136. self.inspect.active_queues()
  137. self.assert_broadcast_called('active_queues')
  138. def test_query_task(self):
  139. self.inspect.query_task('foo', 'bar')
  140. self.assert_broadcast_called('query_task', ids=('foo', 'bar'))
  141. def test_query_task__compat_single_list_argument(self):
  142. self.inspect.query_task(['foo', 'bar'])
  143. self.assert_broadcast_called('query_task', ids=['foo', 'bar'])
  144. def test_query_task__scalar(self):
  145. self.inspect.query_task('foo')
  146. self.assert_broadcast_called('query_task', ids=('foo',))
  147. def test_report(self):
  148. self.inspect.report()
  149. self.assert_broadcast_called('report')
  150. class test_Control_broadcast:
  151. def setup(self):
  152. self.app.control.mailbox = Mock(name='mailbox')
  153. def test_broadcast(self):
  154. self.app.control.broadcast('foobarbaz', arguments={'foo': 2})
  155. self.app.control.mailbox.assert_called()
  156. self.app.control.mailbox()._broadcast.assert_called_with(
  157. 'foobarbaz', {'foo': 2}, None, False, 1.0, None, None,
  158. channel=None,
  159. )
  160. def test_broadcast_limit(self):
  161. self.app.control.broadcast(
  162. 'foobarbaz1', arguments=None, limit=None, destination=[1, 2, 3],
  163. )
  164. self.app.control.mailbox.assert_called()
  165. self.app.control.mailbox()._broadcast.assert_called_with(
  166. 'foobarbaz1', {}, [1, 2, 3], False, 1.0, None, None,
  167. channel=None,
  168. )
  169. class test_Control:
  170. def setup(self):
  171. self.app.control.broadcast = Mock(name='broadcast')
  172. self.app.control.broadcast.return_value = {}
  173. @self.app.task(shared=False)
  174. def mytask():
  175. pass
  176. self.mytask = mytask
  177. def assert_control_called_with_args(self, name, destination=None,
  178. _options=None, **args):
  179. self.app.control.broadcast.assert_called_with(
  180. name, destination=destination, arguments=args, **_options or {})
  181. def test_purge(self):
  182. self.app.amqp.TaskConsumer = Mock(name='TaskConsumer')
  183. self.app.control.purge()
  184. self.app.amqp.TaskConsumer().purge.assert_called_with()
  185. def test_rate_limit(self):
  186. self.app.control.rate_limit(self.mytask.name, '100/m')
  187. self.assert_control_called_with_args(
  188. 'rate_limit',
  189. destination=None,
  190. task_name=self.mytask.name,
  191. rate_limit='100/m',
  192. )
  193. def test_rate_limit__with_destination(self):
  194. self.app.control.rate_limit(
  195. self.mytask.name, '100/m', 'a@w.com', limit=100)
  196. self.assert_control_called_with_args(
  197. 'rate_limit',
  198. destination='a@w.com',
  199. task_name=self.mytask.name,
  200. rate_limit='100/m',
  201. _options={'limit': 100},
  202. )
  203. def test_time_limit(self):
  204. self.app.control.time_limit(self.mytask.name, soft=10, hard=20)
  205. self.assert_control_called_with_args(
  206. 'time_limit',
  207. destination=None,
  208. task_name=self.mytask.name,
  209. soft=10,
  210. hard=20,
  211. )
  212. def test_time_limit__with_destination(self):
  213. self.app.control.time_limit(
  214. self.mytask.name, soft=10, hard=20,
  215. destination='a@q.com', limit=99,
  216. )
  217. self.assert_control_called_with_args(
  218. 'time_limit',
  219. destination='a@q.com',
  220. task_name=self.mytask.name,
  221. soft=10,
  222. hard=20,
  223. _options={'limit': 99},
  224. )
  225. def test_add_consumer(self):
  226. self.app.control.add_consumer('foo')
  227. self.assert_control_called_with_args(
  228. 'add_consumer',
  229. destination=None,
  230. queue='foo',
  231. exchange=None,
  232. exchange_type='direct',
  233. routing_key=None,
  234. )
  235. def test_add_consumer__with_options_and_dest(self):
  236. self.app.control.add_consumer(
  237. 'foo', 'ex', 'topic', 'rkey', destination='a@q.com', limit=78)
  238. self.assert_control_called_with_args(
  239. 'add_consumer',
  240. destination='a@q.com',
  241. queue='foo',
  242. exchange='ex',
  243. exchange_type='topic',
  244. routing_key='rkey',
  245. _options={'limit': 78},
  246. )
  247. def test_cancel_consumer(self):
  248. self.app.control.cancel_consumer('foo')
  249. self.assert_control_called_with_args(
  250. 'cancel_consumer',
  251. destination=None,
  252. queue='foo',
  253. )
  254. def test_cancel_consumer__with_destination(self):
  255. self.app.control.cancel_consumer(
  256. 'foo', destination='w1@q.com', limit=3)
  257. self.assert_control_called_with_args(
  258. 'cancel_consumer',
  259. destination='w1@q.com',
  260. queue='foo',
  261. _options={'limit': 3},
  262. )
  263. def test_shutdown(self):
  264. self.app.control.shutdown()
  265. self.assert_control_called_with_args('shutdown', destination=None)
  266. def test_shutdown__with_destination(self):
  267. self.app.control.shutdown(destination='a@q.com', limit=3)
  268. self.assert_control_called_with_args(
  269. 'shutdown', destination='a@q.com', _options={'limit': 3})
  270. def test_heartbeat(self):
  271. self.app.control.heartbeat()
  272. self.assert_control_called_with_args('heartbeat', destination=None)
  273. def test_heartbeat__with_destination(self):
  274. self.app.control.heartbeat(destination='a@q.com', limit=3)
  275. self.assert_control_called_with_args(
  276. 'heartbeat', destination='a@q.com', _options={'limit': 3})
  277. def test_pool_restart(self):
  278. self.app.control.pool_restart()
  279. self.assert_control_called_with_args(
  280. 'pool_restart',
  281. destination=None,
  282. modules=None,
  283. reload=False,
  284. reloader=None)
  285. def test_terminate(self):
  286. self.app.control.revoke = Mock(name='revoke')
  287. self.app.control.terminate('124')
  288. self.app.control.revoke.assert_called_with(
  289. '124', destination=None,
  290. terminate=True,
  291. signal=control.TERM_SIGNAME,
  292. )
  293. def test_enable_events(self):
  294. self.app.control.enable_events()
  295. self.assert_control_called_with_args('enable_events', destination=None)
  296. def test_enable_events_with_destination(self):
  297. self.app.control.enable_events(destination='a@q.com', limit=3)
  298. self.assert_control_called_with_args(
  299. 'enable_events', destination='a@q.com', _options={'limit': 3})
  300. def test_disable_events(self):
  301. self.app.control.disable_events()
  302. self.assert_control_called_with_args(
  303. 'disable_events', destination=None)
  304. def test_disable_events_with_destination(self):
  305. self.app.control.disable_events(destination='a@q.com', limit=3)
  306. self.assert_control_called_with_args(
  307. 'disable_events', destination='a@q.com', _options={'limit': 3})
  308. def test_ping(self):
  309. self.app.control.ping()
  310. self.assert_control_called_with_args(
  311. 'ping', destination=None,
  312. _options={'timeout': 1.0, 'reply': True})
  313. def test_ping_with_destination(self):
  314. self.app.control.ping(destination='a@q.com', limit=3)
  315. self.assert_control_called_with_args(
  316. 'ping',
  317. destination='a@q.com',
  318. _options={
  319. 'limit': 3,
  320. 'timeout': 1.0,
  321. 'reply': True,
  322. })
  323. def test_revoke(self):
  324. self.app.control.revoke('foozbaaz')
  325. self.assert_control_called_with_args(
  326. 'revoke',
  327. destination=None,
  328. task_id='foozbaaz',
  329. signal=control.TERM_SIGNAME,
  330. terminate=False,
  331. )
  332. def test_revoke__with_options(self):
  333. self.app.control.revoke(
  334. 'foozbaaz',
  335. destination='a@q.com',
  336. terminate=True,
  337. signal='KILL',
  338. limit=404,
  339. )
  340. self.assert_control_called_with_args(
  341. 'revoke',
  342. destination='a@q.com',
  343. task_id='foozbaaz',
  344. signal='KILL',
  345. terminate=True,
  346. _options={'limit': 404},
  347. )
  348. def test_election(self):
  349. self.app.control.election('some_id', 'topic', 'action')
  350. self.assert_control_called_with_args(
  351. 'election',
  352. destination=None,
  353. topic='topic',
  354. action='action',
  355. id='some_id',
  356. _options={'connection': None},
  357. )
  358. def test_autoscale(self):
  359. self.app.control.autoscale(300, 10)
  360. self.assert_control_called_with_args(
  361. 'autoscale', max=300, min=10, destination=None)
  362. def test_autoscale__with_options(self):
  363. self.app.control.autoscale(300, 10, destination='a@q.com', limit=39)
  364. self.assert_control_called_with_args(
  365. 'autoscale', max=300, min=10,
  366. destination='a@q.com',
  367. _options={'limit': 39}
  368. )
  369. def test_pool_grow(self):
  370. self.app.control.pool_grow(2)
  371. self.assert_control_called_with_args(
  372. 'pool_grow', n=2, destination=None)
  373. def test_pool_grow__with_options(self):
  374. self.app.control.pool_grow(2, destination='a@q.com', limit=39)
  375. self.assert_control_called_with_args(
  376. 'pool_grow', n=2,
  377. destination='a@q.com',
  378. _options={'limit': 39}
  379. )
  380. def test_pool_shrink(self):
  381. self.app.control.pool_shrink(2)
  382. self.assert_control_called_with_args(
  383. 'pool_shrink', n=2, destination=None)
  384. def test_pool_shrink__with_options(self):
  385. self.app.control.pool_shrink(2, destination='a@q.com', limit=39)
  386. self.assert_control_called_with_args(
  387. 'pool_shrink', n=2,
  388. destination='a@q.com',
  389. _options={'limit': 39}
  390. )
  391. def test_revoke_from_result(self):
  392. self.app.control.revoke = Mock(name='revoke')
  393. self.app.AsyncResult('foozbazzbar').revoke()
  394. self.app.control.revoke.assert_called_with(
  395. 'foozbazzbar',
  396. connection=None, reply=False, signal=None,
  397. terminate=False, timeout=None)
  398. def test_revoke_from_resultset(self):
  399. self.app.control.revoke = Mock(name='revoke')
  400. uuids = [uuid() for _ in range(10)]
  401. r = self.app.GroupResult(
  402. uuid(), [self.app.AsyncResult(x) for x in uuids])
  403. r.revoke()
  404. self.app.control.revoke.assert_called_with(
  405. uuids,
  406. connection=None, reply=False, signal=None,
  407. terminate=False, timeout=None)
  408. def test_after_fork_clears_mailbox_pool(self):
  409. amqp = Mock(name='amqp')
  410. self.app.amqp = amqp
  411. closed_pool = Mock(name='closed pool')
  412. amqp.producer_pool = closed_pool
  413. assert closed_pool is self.app.control.mailbox.producer_pool
  414. self.app.control._after_fork()
  415. new_pool = Mock(name='new pool')
  416. amqp.producer_pool = new_pool
  417. assert new_pool is self.app.control.mailbox.producer_pool