123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- from __future__ import absolute_import, unicode_literals
- from kombu.pidbox import Mailbox
- from vine.utils import wraps
- from celery.app import control
- from celery.exceptions import DuplicateNodenameWarning
- from celery.utils import uuid
- from celery.tests.case import AppCase
- class MockMailbox(Mailbox):
- sent = []
- def _publish(self, command, *args, **kwargs):
- self.__class__.sent.append(command)
- def close(self):
- pass
- def _collect(self, *args, **kwargs):
- pass
- class Control(control.Control):
- Mailbox = MockMailbox
- def with_mock_broadcast(fun):
- @wraps(fun)
- def _resets(*args, **kwargs):
- MockMailbox.sent = []
- try:
- return fun(*args, **kwargs)
- finally:
- MockMailbox.sent = []
- return _resets
- class test_flatten_reply(AppCase):
- def test_flatten_reply(self):
- reply = [
- {'foo@example.com': {'hello': 10}},
- {'foo@example.com': {'hello': 20}},
- {'bar@example.com': {'hello': 30}}
- ]
- with self.assertWarns(DuplicateNodenameWarning) as w:
- nodes = control.flatten_reply(reply)
- self.assertIn(
- 'Received multiple replies from node name: foo@example.com.',
- str(w.warning)
- )
- self.assertIn('foo@example.com', nodes)
- self.assertIn('bar@example.com', nodes)
- class test_inspect(AppCase):
- def setup(self):
- self.c = Control(app=self.app)
- self.prev, self.app.control = self.app.control, self.c
- self.i = self.c.inspect()
- def test_prepare_reply(self):
- self.assertDictEqual(self.i._prepare([{'w1': {'ok': 1}},
- {'w2': {'ok': 1}}]),
- {'w1': {'ok': 1}, 'w2': {'ok': 1}})
- i = self.c.inspect(destination='w1')
- self.assertEqual(i._prepare([{'w1': {'ok': 1}}]),
- {'ok': 1})
- @with_mock_broadcast
- def test_active(self):
- self.i.active()
- self.assertIn('dump_active', MockMailbox.sent)
- @with_mock_broadcast
- def test_clock(self):
- self.i.clock()
- self.assertIn('clock', MockMailbox.sent)
- @with_mock_broadcast
- def test_conf(self):
- self.i.conf()
- self.assertIn('dump_conf', MockMailbox.sent)
- @with_mock_broadcast
- def test_hello(self):
- self.i.hello('george@vandelay.com')
- self.assertIn('hello', MockMailbox.sent)
- @with_mock_broadcast
- def test_memsample(self):
- self.i.memsample()
- self.assertIn('memsample', MockMailbox.sent)
- @with_mock_broadcast
- def test_memdump(self):
- self.i.memdump()
- self.assertIn('memdump', MockMailbox.sent)
- @with_mock_broadcast
- def test_objgraph(self):
- self.i.objgraph()
- self.assertIn('objgraph', MockMailbox.sent)
- @with_mock_broadcast
- def test_scheduled(self):
- self.i.scheduled()
- self.assertIn('dump_schedule', MockMailbox.sent)
- @with_mock_broadcast
- def test_reserved(self):
- self.i.reserved()
- self.assertIn('dump_reserved', MockMailbox.sent)
- @with_mock_broadcast
- def test_stats(self):
- self.i.stats()
- self.assertIn('stats', MockMailbox.sent)
- @with_mock_broadcast
- def test_revoked(self):
- self.i.revoked()
- self.assertIn('dump_revoked', MockMailbox.sent)
- @with_mock_broadcast
- def test_tasks(self):
- self.i.registered()
- self.assertIn('dump_tasks', MockMailbox.sent)
- @with_mock_broadcast
- def test_ping(self):
- self.i.ping()
- self.assertIn('ping', MockMailbox.sent)
- @with_mock_broadcast
- def test_active_queues(self):
- self.i.active_queues()
- self.assertIn('active_queues', MockMailbox.sent)
- @with_mock_broadcast
- def test_report(self):
- self.i.report()
- self.assertIn('report', MockMailbox.sent)
- class test_Broadcast(AppCase):
- def setup(self):
- self.control = Control(app=self.app)
- self.app.control = self.control
- @self.app.task(shared=False)
- def mytask():
- pass
- self.mytask = mytask
- def test_purge(self):
- self.control.purge()
- @with_mock_broadcast
- def test_broadcast(self):
- self.control.broadcast('foobarbaz', arguments=[])
- self.assertIn('foobarbaz', MockMailbox.sent)
- @with_mock_broadcast
- def test_broadcast_limit(self):
- self.control.broadcast(
- 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3],
- )
- self.assertIn('foobarbaz1', MockMailbox.sent)
- @with_mock_broadcast
- def test_broadcast_validate(self):
- with self.assertRaises(ValueError):
- self.control.broadcast('foobarbaz2',
- destination='foo')
- @with_mock_broadcast
- def test_rate_limit(self):
- self.control.rate_limit(self.mytask.name, '100/m')
- self.assertIn('rate_limit', MockMailbox.sent)
- @with_mock_broadcast
- def test_time_limit(self):
- self.control.time_limit(self.mytask.name, soft=10, hard=20)
- self.assertIn('time_limit', MockMailbox.sent)
- @with_mock_broadcast
- def test_add_consumer(self):
- self.control.add_consumer('foo')
- self.assertIn('add_consumer', MockMailbox.sent)
- @with_mock_broadcast
- def test_cancel_consumer(self):
- self.control.cancel_consumer('foo')
- self.assertIn('cancel_consumer', MockMailbox.sent)
- @with_mock_broadcast
- def test_enable_events(self):
- self.control.enable_events()
- self.assertIn('enable_events', MockMailbox.sent)
- @with_mock_broadcast
- def test_disable_events(self):
- self.control.disable_events()
- self.assertIn('disable_events', MockMailbox.sent)
- @with_mock_broadcast
- def test_revoke(self):
- self.control.revoke('foozbaaz')
- self.assertIn('revoke', MockMailbox.sent)
- @with_mock_broadcast
- def test_ping(self):
- self.control.ping()
- self.assertIn('ping', MockMailbox.sent)
- @with_mock_broadcast
- def test_election(self):
- self.control.election('some_id', 'topic', 'action')
- self.assertIn('election', MockMailbox.sent)
- @with_mock_broadcast
- def test_pool_grow(self):
- self.control.pool_grow(2)
- self.assertIn('pool_grow', MockMailbox.sent)
- @with_mock_broadcast
- def test_pool_shrink(self):
- self.control.pool_shrink(2)
- self.assertIn('pool_shrink', MockMailbox.sent)
- @with_mock_broadcast
- def test_revoke_from_result(self):
- self.app.AsyncResult('foozbazzbar').revoke()
- self.assertIn('revoke', MockMailbox.sent)
- @with_mock_broadcast
- def test_revoke_from_resultset(self):
- r = self.app.GroupResult(uuid(),
- [self.app.AsyncResult(x)
- for x in [uuid() for i in range(10)]])
- r.revoke()
- self.assertIn('revoke', MockMailbox.sent)
|