123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 |
- from __future__ import absolute_import
- import sys
- import socket
- from collections import defaultdict
- from datetime import datetime, timedelta
- from kombu import pidbox
- from mock import Mock, patch, call
- from celery import current_app
- from celery.datastructures import AttributeDict
- from celery.task import task
- from celery.utils import uuid
- from celery.utils.timer2 import Timer
- from celery.worker import WorkController as _WC
- from celery.worker import consumer
- from celery.worker import control
- from celery.worker import state
- from celery.five import Queue as FastQueue
- from celery.worker.job import TaskRequest
- from celery.worker.state import revoked
- from celery.worker.control import Panel
- from celery.tests.utils import Case
- hostname = socket.gethostname()
- @task(rate_limit=200) # for extra info in dump_tasks
- def mytask():
- pass
- class WorkController(object):
- autoscaler = None
- def stats(self):
- return {'total': state.total_count}
- class Consumer(consumer.Consumer):
- def __init__(self):
- self.buffer = FastQueue()
- self.handle_task = self.buffer.put
- self.timer = Timer()
- self.app = current_app
- self.event_dispatcher = Mock()
- self.controller = WorkController()
- self.task_consumer = Mock()
- from celery.concurrency.base import BasePool
- self.pool = BasePool(10)
- self.task_buckets = defaultdict(lambda: None)
- class test_ControlPanel(Case):
- def setUp(self):
- self.app = current_app
- self.panel = self.create_panel(consumer=Consumer())
- def create_state(self, **kwargs):
- kwargs.setdefault('app', self.app)
- return AttributeDict(kwargs)
- def create_panel(self, **kwargs):
- return self.app.control.mailbox.Node(hostname=hostname,
- state=self.create_state(**kwargs),
- handlers=Panel.data)
- def test_enable_events(self):
- consumer = Consumer()
- panel = self.create_panel(consumer=consumer)
- evd = consumer.event_dispatcher
- evd.groups = set()
- panel.handle('enable_events')
- self.assertIn('task', evd.groups)
- evd.groups = set(['task'])
- self.assertIn('already enabled', panel.handle('enable_events')['ok'])
- def test_disable_events(self):
- consumer = Consumer()
- panel = self.create_panel(consumer=consumer)
- evd = consumer.event_dispatcher
- evd.enabled = True
- evd.groups = set(['task'])
- panel.handle('disable_events')
- self.assertNotIn('task', evd.groups)
- self.assertIn('already disabled', panel.handle('disable_events')['ok'])
- def test_heartbeat(self):
- consumer = Consumer()
- panel = self.create_panel(consumer=consumer)
- consumer.event_dispatcher.enabled = True
- panel.handle('heartbeat')
- self.assertIn(('worker-heartbeat', ),
- consumer.event_dispatcher.send.call_args)
- def test_time_limit(self):
- panel = self.create_panel(consumer=Mock())
- th, ts = mytask.time_limit, mytask.soft_time_limit
- try:
- r = panel.handle('time_limit', arguments=dict(
- task_name=mytask.name, hard=30, soft=10))
- self.assertEqual((mytask.time_limit, mytask.soft_time_limit),
- (30, 10))
- self.assertIn('ok', r)
- r = panel.handle('time_limit', arguments=dict(
- task_name=mytask.name, hard=None, soft=None))
- self.assertEqual((mytask.time_limit, mytask.soft_time_limit),
- (None, None))
- self.assertIn('ok', r)
- r = panel.handle('time_limit', arguments=dict(
- task_name='248e8afya9s8dh921eh928', hard=30))
- self.assertIn('error', r)
- finally:
- mytask.time_limit, mytask.soft_time_limit = th, ts
- def test_active_queues(self):
- import kombu
- x = kombu.Consumer(current_app.connection(),
- [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),
- kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],
- auto_declare=False)
- consumer = Mock()
- consumer.task_consumer = x
- panel = self.create_panel(consumer=consumer)
- r = panel.handle('active_queues')
- self.assertListEqual(list(sorted(q['name'] for q in r)),
- ['bar', 'foo'])
- def test_dump_tasks(self):
- info = '\n'.join(self.panel.handle('dump_tasks'))
- self.assertIn('mytask', info)
- self.assertIn('rate_limit=200', info)
- def test_stats(self):
- prev_count, state.total_count = state.total_count, 100
- try:
- self.assertDictContainsSubset({'total': 100},
- self.panel.handle('stats'))
- finally:
- state.total_count = prev_count
- def test_report(self):
- self.panel.handle('report')
- def test_active(self):
- r = TaskRequest(mytask.name, 'do re mi', (), {})
- state.active_requests.add(r)
- try:
- self.assertTrue(self.panel.handle('dump_active'))
- finally:
- state.active_requests.discard(r)
- def test_pool_grow(self):
- class MockPool(object):
- def __init__(self, size=1):
- self.size = size
- def grow(self, n=1):
- self.size += n
- def shrink(self, n=1):
- self.size -= n
- consumer = Consumer()
- consumer.pool = MockPool()
- panel = self.create_panel(consumer=consumer)
- panel.handle('pool_grow')
- self.assertEqual(consumer.pool.size, 2)
- panel.handle('pool_shrink')
- self.assertEqual(consumer.pool.size, 1)
- panel.state.consumer = Mock()
- panel.state.consumer.controller = Mock()
- sc = panel.state.consumer.controller.autoscaler = Mock()
- panel.handle('pool_grow')
- self.assertTrue(sc.force_scale_up.called)
- panel.handle('pool_shrink')
- self.assertTrue(sc.force_scale_down.called)
- def test_add__cancel_consumer(self):
- class MockConsumer(object):
- queues = []
- cancelled = []
- consuming = False
- def add_queue(self, queue):
- self.queues.append(queue.name)
- def consume(self):
- self.consuming = True
- def cancel_by_queue(self, queue):
- self.cancelled.append(queue)
- def consuming_from(self, queue):
- return queue in self.queues
- consumer = Consumer()
- consumer.task_consumer = MockConsumer()
- panel = self.create_panel(consumer=consumer)
- panel.handle('add_consumer', {'queue': 'MyQueue'})
- self.assertIn('MyQueue', consumer.task_consumer.queues)
- self.assertTrue(consumer.task_consumer.consuming)
- panel.handle('add_consumer', {'queue': 'MyQueue'})
- panel.handle('cancel_consumer', {'queue': 'MyQueue'})
- self.assertIn('MyQueue', consumer.task_consumer.cancelled)
- def test_revoked(self):
- state.revoked.clear()
- state.revoked.add('a1')
- state.revoked.add('a2')
- try:
- self.assertEqual(sorted(self.panel.handle('dump_revoked')),
- ['a1', 'a2'])
- finally:
- state.revoked.clear()
- def test_dump_schedule(self):
- consumer = Consumer()
- panel = self.create_panel(consumer=consumer)
- self.assertFalse(panel.handle('dump_schedule'))
- r = TaskRequest(mytask.name, 'CAFEBABE', (), {})
- consumer.timer.schedule.enter(
- consumer.timer.Entry(lambda x: x, (r, )),
- datetime.now() + timedelta(seconds=10))
- self.assertTrue(panel.handle('dump_schedule'))
- def test_dump_reserved(self):
- from celery.worker import state
- consumer = Consumer()
- state.reserved_requests.add(
- TaskRequest(mytask.name, uuid(), args=(2, 2), kwargs={}),
- )
- try:
- panel = self.create_panel(consumer=consumer)
- response = panel.handle('dump_reserved', {'safe': True})
- self.assertDictContainsSubset(
- {'name': mytask.name,
- 'args': (2, 2),
- 'kwargs': {},
- 'hostname': socket.gethostname()},
- response[0],
- )
- state.reserved_requests.clear()
- self.assertFalse(panel.handle('dump_reserved'))
- finally:
- state.reserved_requests.clear()
- def test_rate_limit_invalid_rate_limit_string(self):
- e = self.panel.handle('rate_limit', arguments=dict(
- task_name='tasks.add', rate_limit='x1240301#%!'))
- self.assertIn('Invalid rate limit string', e.get('error'))
- def test_rate_limit(self):
- class Consumer(object):
- reset = False
- def reset_rate_limits(self):
- self.reset = True
- consumer = Consumer()
- panel = self.create_panel(app=current_app, consumer=consumer)
- task = current_app.tasks[mytask.name]
- old_rate_limit = task.rate_limit
- try:
- panel.handle('rate_limit', arguments=dict(task_name=task.name,
- rate_limit='100/m'))
- self.assertEqual(task.rate_limit, '100/m')
- self.assertTrue(consumer.reset)
- consumer.reset = False
- panel.handle('rate_limit', arguments=dict(task_name=task.name,
- rate_limit=0))
- self.assertEqual(task.rate_limit, 0)
- self.assertTrue(consumer.reset)
- finally:
- task.rate_limit = old_rate_limit
- def test_rate_limit_nonexistant_task(self):
- self.panel.handle('rate_limit', arguments={
- 'task_name': 'xxxx.does.not.exist',
- 'rate_limit': '1000/s'})
- def test_unexposed_command(self):
- with self.assertRaises(KeyError):
- self.panel.handle('foo', arguments={})
- def test_revoke_with_name(self):
- tid = uuid()
- m = {'method': 'revoke',
- 'destination': hostname,
- 'arguments': {'task_id': tid,
- 'task_name': mytask.name}}
- self.panel.handle_message(m, None)
- self.assertIn(tid, revoked)
- def test_revoke_with_name_not_in_registry(self):
- tid = uuid()
- m = {'method': 'revoke',
- 'destination': hostname,
- 'arguments': {'task_id': tid,
- 'task_name': 'xxxxxxxxx33333333388888'}}
- self.panel.handle_message(m, None)
- self.assertIn(tid, revoked)
- def test_revoke(self):
- tid = uuid()
- m = {'method': 'revoke',
- 'destination': hostname,
- 'arguments': {'task_id': tid}}
- self.panel.handle_message(m, None)
- self.assertIn(tid, revoked)
- m = {'method': 'revoke',
- 'destination': 'does.not.exist',
- 'arguments': {'task_id': tid + 'xxx'}}
- self.panel.handle_message(m, None)
- self.assertNotIn(tid + 'xxx', revoked)
- def test_revoke_terminate(self):
- request = Mock()
- request.id = tid = uuid()
- state.reserved_requests.add(request)
- try:
- r = control.revoke(Mock(), tid, terminate=True)
- self.assertIn(tid, revoked)
- self.assertTrue(request.terminate.call_count)
- self.assertIn('terminating', r['ok'])
- # unknown task id only revokes
- r = control.revoke(Mock(), uuid(), terminate=True)
- self.assertIn('not found', r['ok'])
- finally:
- state.reserved_requests.discard(request)
- def test_autoscale(self):
- self.panel.state.consumer = Mock()
- self.panel.state.consumer.controller = Mock()
- sc = self.panel.state.consumer.controller.autoscaler = Mock()
- sc.update.return_value = 10, 2
- m = {'method': 'autoscale',
- 'destination': hostname,
- 'arguments': {'max': '10', 'min': '2'}}
- r = self.panel.handle_message(m, None)
- self.assertIn('ok', r)
- self.panel.state.consumer.controller.autoscaler = None
- r = self.panel.handle_message(m, None)
- self.assertIn('error', r)
- def test_ping(self):
- m = {'method': 'ping',
- 'destination': hostname}
- r = self.panel.handle_message(m, None)
- self.assertEqual(r, {'ok': 'pong'})
- def test_shutdown(self):
- m = {'method': 'shutdown',
- 'destination': hostname}
- with self.assertRaises(SystemExit):
- self.panel.handle_message(m, None)
- def test_panel_reply(self):
- replies = []
- class _Node(pidbox.Node):
- def reply(self, data, exchange, routing_key, **kwargs):
- replies.append(data)
- panel = _Node(hostname=hostname,
- state=self.create_state(consumer=Consumer()),
- handlers=Panel.data,
- mailbox=self.app.control.mailbox)
- r = panel.dispatch('ping', reply_to={'exchange': 'x',
- 'routing_key': 'x'})
- self.assertEqual(r, {'ok': 'pong'})
- self.assertDictEqual(replies[0], {panel.hostname: {'ok': 'pong'}})
- def test_pool_restart(self):
- consumer = Consumer()
- consumer.controller = _WC(app=current_app)
- consumer.controller.pool.restart = Mock()
- panel = self.create_panel(consumer=consumer)
- panel.app = self.app
- _import = panel.app.loader.import_from_cwd = Mock()
- _reload = Mock()
- with self.assertRaises(ValueError):
- panel.handle('pool_restart', {'reloader': _reload})
- current_app.conf.CELERYD_POOL_RESTARTS = True
- try:
- panel.handle('pool_restart', {'reloader': _reload})
- self.assertTrue(consumer.controller.pool.restart.called)
- self.assertFalse(_reload.called)
- self.assertFalse(_import.called)
- finally:
- current_app.conf.CELERYD_POOL_RESTARTS = False
- def test_pool_restart_import_modules(self):
- consumer = Consumer()
- consumer.controller = _WC(app=current_app)
- consumer.controller.pool.restart = Mock()
- panel = self.create_panel(consumer=consumer)
- panel.app = self.app
- _import = consumer.controller.app.loader.import_from_cwd = Mock()
- _reload = Mock()
- current_app.conf.CELERYD_POOL_RESTARTS = True
- try:
- panel.handle('pool_restart', {'modules': ['foo', 'bar'],
- 'reloader': _reload})
- self.assertTrue(consumer.controller.pool.restart.called)
- self.assertFalse(_reload.called)
- self.assertItemsEqual(
- [call('bar'), call('foo')],
- _import.call_args_list,
- )
- finally:
- current_app.conf.CELERYD_POOL_RESTARTS = False
- def test_pool_restart_reload_modules(self):
- consumer = Consumer()
- consumer.controller = _WC(app=current_app)
- consumer.controller.pool.restart = Mock()
- panel = self.create_panel(consumer=consumer)
- panel.app = self.app
- _import = panel.app.loader.import_from_cwd = Mock()
- _reload = Mock()
- current_app.conf.CELERYD_POOL_RESTARTS = True
- try:
- with patch.dict(sys.modules, {'foo': None}):
- panel.handle('pool_restart', {'modules': ['foo'],
- 'reload': False,
- 'reloader': _reload})
- self.assertTrue(consumer.controller.pool.restart.called)
- self.assertFalse(_reload.called)
- self.assertFalse(_import.called)
- _import.reset_mock()
- _reload.reset_mock()
- consumer.controller.pool.restart.reset_mock()
- panel.handle('pool_restart', {'modules': ['foo'],
- 'reload': True,
- 'reloader': _reload})
- self.assertTrue(consumer.controller.pool.restart.called)
- self.assertTrue(_reload.called)
- self.assertFalse(_import.called)
- finally:
- current_app.conf.CELERYD_POOL_RESTARTS = False
|