test_control.py 22 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import sys
  4. import socket
  5. from collections import defaultdict
  6. from datetime import datetime, timedelta
  7. from case import Mock, call, patch
  8. from kombu import pidbox
  9. from kombu.utils.uuid import uuid
  10. from celery.five import Queue as FastQueue
  11. from celery.utils.timer2 import Timer
  12. from celery.worker import WorkController as _WC
  13. from celery.worker import consumer
  14. from celery.worker import control
  15. from celery.worker import state as worker_state
  16. from celery.worker.request import Request
  17. from celery.worker.state import revoked
  18. from celery.worker.pidbox import Pidbox, gPidbox
  19. from celery.utils.collections import AttributeDict
  20. hostname = socket.gethostname()
  21. class WorkController(object):
  22. def stats(self):
  23. return {'total': worker_state.total_count}
  24. class Consumer(consumer.Consumer):
  25. def __init__(self, app):
  26. self.app = app
  27. self.buffer = FastQueue()
  28. self.timer = Timer()
  29. self.event_dispatcher = Mock()
  30. self.controller = WorkController()
  31. self.task_consumer = Mock()
  32. self.prefetch_multiplier = 1
  33. self.initial_prefetch_count = 1
  34. from celery.concurrency.base import BasePool
  35. self.pool = BasePool(10)
  36. self.task_buckets = defaultdict(lambda: None)
  37. self.hub = None
  38. def call_soon(self, p, *args, **kwargs):
  39. return p(*args, **kwargs)
  40. class test_Pidbox:
  41. def test_shutdown(self):
  42. with patch('celery.worker.pidbox.ignore_errors') as eig:
  43. parent = Mock()
  44. pbox = Pidbox(parent)
  45. pbox._close_channel = Mock()
  46. assert pbox.c is parent
  47. pconsumer = pbox.consumer = Mock()
  48. cancel = pconsumer.cancel
  49. pbox.shutdown(parent)
  50. eig.assert_called_with(parent, cancel)
  51. pbox._close_channel.assert_called_with(parent)
  52. class test_Pidbox_green:
  53. def test_stop(self):
  54. parent = Mock()
  55. g = gPidbox(parent)
  56. stopped = g._node_stopped = Mock()
  57. shutdown = g._node_shutdown = Mock()
  58. close_chan = g._close_channel = Mock()
  59. g.stop(parent)
  60. shutdown.set.assert_called_with()
  61. stopped.wait.assert_called_with()
  62. close_chan.assert_called_with(parent)
  63. assert g._node_stopped is None
  64. assert g._node_shutdown is None
  65. close_chan.reset()
  66. g.stop(parent)
  67. close_chan.assert_called_with(parent)
  68. def test_resets(self):
  69. parent = Mock()
  70. g = gPidbox(parent)
  71. g._resets = 100
  72. g.reset()
  73. assert g._resets == 101
  74. def test_loop(self):
  75. parent = Mock()
  76. conn = parent.connect.return_value = self.app.connection_for_read()
  77. drain = conn.drain_events = Mock()
  78. g = gPidbox(parent)
  79. parent.connection = Mock()
  80. do_reset = g._do_reset = Mock()
  81. call_count = [0]
  82. def se(*args, **kwargs):
  83. if call_count[0] > 2:
  84. g._node_shutdown.set()
  85. g.reset()
  86. call_count[0] += 1
  87. drain.side_effect = se
  88. g.loop(parent)
  89. assert do_reset.call_count == 4
  90. class test_ControlPanel:
  91. def setup(self):
  92. self.panel = self.create_panel(consumer=Consumer(self.app))
  93. @self.app.task(name='c.unittest.mytask', rate_limit=200, shared=False)
  94. def mytask():
  95. pass
  96. self.mytask = mytask
  97. def create_state(self, **kwargs):
  98. kwargs.setdefault('app', self.app)
  99. kwargs.setdefault('hostname', hostname)
  100. kwargs.setdefault('tset', set)
  101. return AttributeDict(kwargs)
  102. def create_panel(self, **kwargs):
  103. return self.app.control.mailbox.Node(
  104. hostname=hostname,
  105. state=self.create_state(**kwargs),
  106. handlers=control.Panel.data,
  107. )
  108. def test_enable_events(self):
  109. consumer = Consumer(self.app)
  110. panel = self.create_panel(consumer=consumer)
  111. evd = consumer.event_dispatcher
  112. evd.groups = set()
  113. panel.handle('enable_events')
  114. assert not evd.groups
  115. evd.groups = {'worker'}
  116. panel.handle('enable_events')
  117. assert 'task' in evd.groups
  118. evd.groups = {'task'}
  119. assert 'already enabled' in panel.handle('enable_events')['ok']
  120. def test_disable_events(self):
  121. consumer = Consumer(self.app)
  122. panel = self.create_panel(consumer=consumer)
  123. evd = consumer.event_dispatcher
  124. evd.enabled = True
  125. evd.groups = {'task'}
  126. panel.handle('disable_events')
  127. assert 'task' not in evd.groups
  128. assert 'already disabled' in panel.handle('disable_events')['ok']
  129. def test_clock(self):
  130. consumer = Consumer(self.app)
  131. panel = self.create_panel(consumer=consumer)
  132. panel.state.app.clock.value = 313
  133. x = panel.handle('clock')
  134. assert x['clock'] == 313
  135. def test_hello(self):
  136. consumer = Consumer(self.app)
  137. panel = self.create_panel(consumer=consumer)
  138. panel.state.app.clock.value = 313
  139. panel.state.hostname = 'elaine@vandelay.com'
  140. worker_state.revoked.add('revoked1')
  141. try:
  142. assert panel.handle('hello', {
  143. 'from_node': 'elaine@vandelay.com',
  144. }) is None
  145. x = panel.handle('hello', {
  146. 'from_node': 'george@vandelay.com',
  147. })
  148. assert x['clock'] == 314 # incremented
  149. x = panel.handle('hello', {
  150. 'from_node': 'george@vandelay.com',
  151. 'revoked': {'1234', '4567', '891'}
  152. })
  153. assert 'revoked1' in x['revoked']
  154. assert '1234' in x['revoked']
  155. assert '4567' in x['revoked']
  156. assert '891' in x['revoked']
  157. assert x['clock'] == 315 # incremented
  158. finally:
  159. worker_state.revoked.discard('revoked1')
  160. def test_conf(self):
  161. consumer = Consumer(self.app)
  162. panel = self.create_panel(consumer=consumer)
  163. panel.app = self.app
  164. panel.app.finalize()
  165. self.app.conf.some_key6 = 'hello world'
  166. x = panel.handle('dump_conf')
  167. assert 'some_key6' in x
  168. def test_election(self):
  169. consumer = Consumer(self.app)
  170. panel = self.create_panel(consumer=consumer)
  171. consumer.gossip = Mock()
  172. panel.handle(
  173. 'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},
  174. )
  175. consumer.gossip.election.assert_called_with('id', 'topic', 'action')
  176. def test_election__no_gossip(self):
  177. consumer = Mock(name='consumer')
  178. consumer.gossip = None
  179. panel = self.create_panel(consumer=consumer)
  180. panel.handle(
  181. 'election', {'id': 'id', 'topic': 'topic', 'action': 'action'},
  182. )
  183. def test_heartbeat(self):
  184. consumer = Consumer(self.app)
  185. panel = self.create_panel(consumer=consumer)
  186. event_dispatcher = consumer.event_dispatcher
  187. event_dispatcher.enabled = True
  188. panel.handle('heartbeat')
  189. assert ('worker-heartbeat',) in event_dispatcher.send.call_args
  190. def test_time_limit(self):
  191. panel = self.create_panel(consumer=Mock())
  192. r = panel.handle('time_limit', arguments=dict(
  193. task_name=self.mytask.name, hard=30, soft=10))
  194. assert self.mytask.time_limit == 30
  195. assert self.mytask.soft_time_limit == 10
  196. assert 'ok' in r
  197. r = panel.handle('time_limit', arguments=dict(
  198. task_name=self.mytask.name, hard=None, soft=None))
  199. assert self.mytask.time_limit is None
  200. assert self.mytask.soft_time_limit is None
  201. assert 'ok' in r
  202. r = panel.handle('time_limit', arguments=dict(
  203. task_name='248e8afya9s8dh921eh928', hard=30))
  204. assert 'error' in r
  205. def test_active_queues(self):
  206. import kombu
  207. x = kombu.Consumer(self.app.connection_for_read(),
  208. [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),
  209. kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],
  210. auto_declare=False)
  211. consumer = Mock()
  212. consumer.task_consumer = x
  213. panel = self.create_panel(consumer=consumer)
  214. r = panel.handle('active_queues')
  215. assert list(sorted(q['name'] for q in r)) == ['bar', 'foo']
  216. def test_active_queues__empty(self):
  217. consumer = Mock(name='consumer')
  218. panel = self.create_panel(consumer=consumer)
  219. consumer.task_consumer = None
  220. assert not panel.handle('active_queues')
  221. def test_dump_tasks(self):
  222. info = '\n'.join(self.panel.handle('dump_tasks'))
  223. assert 'mytask' in info
  224. assert 'rate_limit=200' in info
  225. def test_dump_tasks2(self):
  226. prev, control.DEFAULT_TASK_INFO_ITEMS = (
  227. control.DEFAULT_TASK_INFO_ITEMS, [])
  228. try:
  229. info = '\n'.join(self.panel.handle('dump_tasks'))
  230. assert 'mytask' in info
  231. assert 'rate_limit=200' not in info
  232. finally:
  233. control.DEFAULT_TASK_INFO_ITEMS = prev
  234. def test_stats(self):
  235. prev_count, worker_state.total_count = worker_state.total_count, 100
  236. try:
  237. assert self.panel.handle('stats')['total'] == 100
  238. finally:
  239. worker_state.total_count = prev_count
  240. def test_report(self):
  241. self.panel.handle('report')
  242. def test_active(self):
  243. r = Request(
  244. self.TaskMessage(self.mytask.name, 'do re mi'),
  245. app=self.app,
  246. )
  247. worker_state.active_requests.add(r)
  248. try:
  249. assert self.panel.handle('dump_active')
  250. finally:
  251. worker_state.active_requests.discard(r)
  252. def test_pool_grow(self):
  253. class MockPool(object):
  254. def __init__(self, size=1):
  255. self.size = size
  256. def grow(self, n=1):
  257. self.size += n
  258. def shrink(self, n=1):
  259. self.size -= n
  260. @property
  261. def num_processes(self):
  262. return self.size
  263. consumer = Consumer(self.app)
  264. consumer.prefetch_multiplier = 8
  265. consumer.qos = Mock(name='qos')
  266. consumer.pool = MockPool(1)
  267. panel = self.create_panel(consumer=consumer)
  268. panel.handle('pool_grow')
  269. assert consumer.pool.size == 2
  270. consumer.qos.increment_eventually.assert_called_with(8)
  271. assert consumer.initial_prefetch_count == 16
  272. panel.handle('pool_shrink')
  273. assert consumer.pool.size == 1
  274. consumer.qos.decrement_eventually.assert_called_with(8)
  275. assert consumer.initial_prefetch_count == 8
  276. panel.state.consumer = Mock()
  277. panel.state.consumer.controller = Mock()
  278. def test_add__cancel_consumer(self):
  279. class MockConsumer(object):
  280. queues = []
  281. canceled = []
  282. consuming = False
  283. hub = Mock(name='hub')
  284. def add_queue(self, queue):
  285. self.queues.append(queue.name)
  286. def consume(self):
  287. self.consuming = True
  288. def cancel_by_queue(self, queue):
  289. self.canceled.append(queue)
  290. def consuming_from(self, queue):
  291. return queue in self.queues
  292. consumer = Consumer(self.app)
  293. consumer.task_consumer = MockConsumer()
  294. panel = self.create_panel(consumer=consumer)
  295. panel.handle('add_consumer', {'queue': 'MyQueue'})
  296. assert 'MyQueue' in consumer.task_consumer.queues
  297. assert consumer.task_consumer.consuming
  298. panel.handle('add_consumer', {'queue': 'MyQueue'})
  299. panel.handle('cancel_consumer', {'queue': 'MyQueue'})
  300. assert 'MyQueue' in consumer.task_consumer.canceled
  301. def test_revoked(self):
  302. worker_state.revoked.clear()
  303. worker_state.revoked.add('a1')
  304. worker_state.revoked.add('a2')
  305. try:
  306. assert sorted(self.panel.handle('dump_revoked')) == ['a1', 'a2']
  307. finally:
  308. worker_state.revoked.clear()
  309. def test_dump_schedule(self):
  310. consumer = Consumer(self.app)
  311. panel = self.create_panel(consumer=consumer)
  312. assert not panel.handle('dump_schedule')
  313. r = Request(
  314. self.TaskMessage(self.mytask.name, 'CAFEBABE'),
  315. app=self.app,
  316. )
  317. consumer.timer.schedule.enter_at(
  318. consumer.timer.Entry(lambda x: x, (r,)),
  319. datetime.now() + timedelta(seconds=10))
  320. consumer.timer.schedule.enter_at(
  321. consumer.timer.Entry(lambda x: x, (object(),)),
  322. datetime.now() + timedelta(seconds=10))
  323. assert panel.handle('dump_schedule')
  324. def test_dump_reserved(self):
  325. consumer = Consumer(self.app)
  326. req = Request(
  327. self.TaskMessage(self.mytask.name, args=(2, 2)), app=self.app,
  328. ) # ^ need to keep reference for reserved_tasks WeakSet.
  329. worker_state.task_reserved(req)
  330. try:
  331. panel = self.create_panel(consumer=consumer)
  332. response = panel.handle('dump_reserved', {'safe': True})
  333. assert response[0]['name'] == self.mytask.name
  334. assert response[0]['hostname'] == socket.gethostname()
  335. worker_state.reserved_requests.clear()
  336. assert not panel.handle('dump_reserved')
  337. finally:
  338. worker_state.reserved_requests.clear()
  339. def test_rate_limit_invalid_rate_limit_string(self):
  340. e = self.panel.handle('rate_limit', arguments=dict(
  341. task_name='tasks.add', rate_limit='x1240301#%!'))
  342. assert 'Invalid rate limit string' in e.get('error')
  343. def test_rate_limit(self):
  344. class xConsumer(object):
  345. reset = False
  346. def reset_rate_limits(self):
  347. self.reset = True
  348. consumer = xConsumer()
  349. panel = self.create_panel(app=self.app, consumer=consumer)
  350. task = self.app.tasks[self.mytask.name]
  351. panel.handle('rate_limit', arguments=dict(task_name=task.name,
  352. rate_limit='100/m'))
  353. assert task.rate_limit == '100/m'
  354. assert consumer.reset
  355. consumer.reset = False
  356. panel.handle('rate_limit', arguments=dict(
  357. task_name=task.name,
  358. rate_limit=0,
  359. ))
  360. assert task.rate_limit == 0
  361. assert consumer.reset
  362. def test_rate_limit_nonexistant_task(self):
  363. self.panel.handle('rate_limit', arguments={
  364. 'task_name': 'xxxx.does.not.exist',
  365. 'rate_limit': '1000/s'})
  366. def test_unexposed_command(self):
  367. with pytest.raises(KeyError):
  368. self.panel.handle('foo', arguments={})
  369. def test_revoke_with_name(self):
  370. tid = uuid()
  371. m = {
  372. 'method': 'revoke',
  373. 'destination': hostname,
  374. 'arguments': {
  375. 'task_id': tid,
  376. 'task_name': self.mytask.name,
  377. },
  378. }
  379. self.panel.handle_message(m, None)
  380. assert tid in revoked
  381. def test_revoke_with_name_not_in_registry(self):
  382. tid = uuid()
  383. m = {
  384. 'method': 'revoke',
  385. 'destination': hostname,
  386. 'arguments': {
  387. 'task_id': tid,
  388. 'task_name': 'xxxxxxxxx33333333388888',
  389. },
  390. }
  391. self.panel.handle_message(m, None)
  392. assert tid in revoked
  393. def test_revoke(self):
  394. tid = uuid()
  395. m = {
  396. 'method': 'revoke',
  397. 'destination': hostname,
  398. 'arguments': {
  399. 'task_id': tid,
  400. },
  401. }
  402. self.panel.handle_message(m, None)
  403. assert tid in revoked
  404. m = {
  405. 'method': 'revoke',
  406. 'destination': 'does.not.exist',
  407. 'arguments': {
  408. 'task_id': tid + 'xxx',
  409. },
  410. }
  411. self.panel.handle_message(m, None)
  412. assert tid + 'xxx' not in revoked
  413. def test_revoke_terminate(self):
  414. request = Mock()
  415. request.id = tid = uuid()
  416. state = self.create_state()
  417. state.consumer = Mock()
  418. worker_state.task_reserved(request)
  419. try:
  420. r = control.revoke(state, tid, terminate=True)
  421. assert tid in revoked
  422. assert request.terminate.call_count
  423. assert 'terminate:' in r['ok']
  424. # unknown task id only revokes
  425. r = control.revoke(state, uuid(), terminate=True)
  426. assert 'tasks unknown' in r['ok']
  427. finally:
  428. worker_state.task_ready(request)
  429. def test_ping(self):
  430. m = {'method': 'ping',
  431. 'destination': hostname}
  432. r = self.panel.handle_message(m, None)
  433. assert r == {'ok': 'pong'}
  434. def test_shutdown(self):
  435. m = {'method': 'shutdown',
  436. 'destination': hostname}
  437. with pytest.raises(SystemExit):
  438. self.panel.handle_message(m, None)
  439. def test_panel_reply(self):
  440. replies = []
  441. class _Node(pidbox.Node):
  442. def reply(self, data, exchange, routing_key, **kwargs):
  443. replies.append(data)
  444. panel = _Node(
  445. hostname=hostname,
  446. state=self.create_state(consumer=Consumer(self.app)),
  447. handlers=control.Panel.data,
  448. mailbox=self.app.control.mailbox,
  449. )
  450. r = panel.dispatch('ping', reply_to={
  451. 'exchange': 'x',
  452. 'routing_key': 'x',
  453. })
  454. assert r == {'ok': 'pong'}
  455. assert replies[0] == {panel.hostname: {'ok': 'pong'}}
  456. def test_pool_restart(self):
  457. consumer = Consumer(self.app)
  458. consumer.controller = _WC(app=self.app)
  459. consumer.controller.consumer = consumer
  460. consumer.controller.pool.restart = Mock()
  461. consumer.reset_rate_limits = Mock(name='reset_rate_limits()')
  462. consumer.update_strategies = Mock(name='update_strategies()')
  463. consumer.event_dispatcher = Mock(name='evd')
  464. panel = self.create_panel(consumer=consumer)
  465. assert panel.state.consumer.controller.consumer is consumer
  466. panel.app = self.app
  467. _import = panel.app.loader.import_from_cwd = Mock()
  468. _reload = Mock()
  469. with pytest.raises(ValueError):
  470. panel.handle('pool_restart', {'reloader': _reload})
  471. self.app.conf.worker_pool_restarts = True
  472. panel.handle('pool_restart', {'reloader': _reload})
  473. consumer.controller.pool.restart.assert_called()
  474. consumer.reset_rate_limits.assert_called_with()
  475. consumer.update_strategies.assert_called_with()
  476. _reload.assert_not_called()
  477. _import.assert_not_called()
  478. consumer.controller.pool.restart.side_effect = NotImplementedError()
  479. panel.handle('pool_restart', {'reloader': _reload})
  480. consumer.controller.consumer = None
  481. panel.handle('pool_restart', {'reloader': _reload})
  482. @patch('celery.worker.logger.debug')
  483. def test_pool_restart_import_modules(self, _debug):
  484. consumer = Consumer(self.app)
  485. consumer.controller = _WC(app=self.app)
  486. consumer.controller.consumer = consumer
  487. consumer.controller.pool.restart = Mock()
  488. consumer.reset_rate_limits = Mock(name='reset_rate_limits()')
  489. consumer.update_strategies = Mock(name='update_strategies()')
  490. panel = self.create_panel(consumer=consumer)
  491. panel.app = self.app
  492. assert panel.state.consumer.controller.consumer is consumer
  493. _import = consumer.controller.app.loader.import_from_cwd = Mock()
  494. _reload = Mock()
  495. self.app.conf.worker_pool_restarts = True
  496. with patch('sys.modules'):
  497. panel.handle('pool_restart', {
  498. 'modules': ['foo', 'bar'],
  499. 'reloader': _reload,
  500. })
  501. consumer.controller.pool.restart.assert_called()
  502. consumer.reset_rate_limits.assert_called_with()
  503. consumer.update_strategies.assert_called_with()
  504. _reload.assert_not_called()
  505. _import.assert_has_calls([call('bar'), call('foo')], any_order=True)
  506. assert _import.call_count == 2
  507. def test_pool_restart_reload_modules(self):
  508. consumer = Consumer(self.app)
  509. consumer.controller = _WC(app=self.app)
  510. consumer.controller.consumer = consumer
  511. consumer.controller.pool.restart = Mock()
  512. consumer.reset_rate_limits = Mock(name='reset_rate_limits()')
  513. consumer.update_strategies = Mock(name='update_strategies()')
  514. panel = self.create_panel(consumer=consumer)
  515. panel.app = self.app
  516. _import = panel.app.loader.import_from_cwd = Mock()
  517. _reload = Mock()
  518. self.app.conf.worker_pool_restarts = True
  519. with patch.dict(sys.modules, {'foo': None}):
  520. panel.handle('pool_restart', {
  521. 'modules': ['foo'],
  522. 'reload': False,
  523. 'reloader': _reload,
  524. })
  525. consumer.controller.pool.restart.assert_called()
  526. _reload.assert_not_called()
  527. _import.assert_not_called()
  528. _import.reset_mock()
  529. _reload.reset_mock()
  530. consumer.controller.pool.restart.reset_mock()
  531. panel.handle('pool_restart', {
  532. 'modules': ['foo'],
  533. 'reload': True,
  534. 'reloader': _reload,
  535. })
  536. consumer.controller.pool.restart.assert_called()
  537. _reload.assert_called()
  538. _import.assert_not_called()
  539. def test_query_task(self):
  540. consumer = Consumer(self.app)
  541. consumer.controller = _WC(app=self.app)
  542. consumer.controller.consumer = consumer
  543. panel = self.create_panel(consumer=consumer)
  544. panel.app = self.app
  545. req1 = Request(
  546. self.TaskMessage(self.mytask.name, args=(2, 2)),
  547. app=self.app,
  548. )
  549. worker_state.task_reserved(req1)
  550. try:
  551. assert not panel.handle('query_task', {'ids': {'1daa'}})
  552. ret = panel.handle('query_task', {'ids': {req1.id}})
  553. assert req1.id in ret
  554. assert ret[req1.id][0] == 'reserved'
  555. worker_state.active_requests.add(req1)
  556. try:
  557. ret = panel.handle('query_task', {'ids': {req1.id}})
  558. assert ret[req1.id][0] == 'active'
  559. finally:
  560. worker_state.active_requests.clear()
  561. ret = panel.handle('query_task', {'ids': {req1.id}})
  562. assert ret[req1.id][0] == 'reserved'
  563. finally:
  564. worker_state.reserved_requests.clear()