from __future__ import absolute_import, unicode_literals import pytest from kombu.pidbox import Mailbox from vine.utils import wraps from celery import uuid from celery.app import control from celery.exceptions import DuplicateNodenameWarning class MockMailbox(Mailbox): sent = [] def _publish(self, command, *args, **kwargs): self.__class__.sent.append(command) def close(self): pass def _collect(self, *args, **kwargs): pass class Control(control.Control): Mailbox = MockMailbox def with_mock_broadcast(fun): @wraps(fun) def _resets(*args, **kwargs): MockMailbox.sent = [] try: return fun(*args, **kwargs) finally: MockMailbox.sent = [] return _resets class test_flatten_reply: def test_flatten_reply(self): reply = [ {'foo@example.com': {'hello': 10}}, {'foo@example.com': {'hello': 20}}, {'bar@example.com': {'hello': 30}} ] with pytest.warns(DuplicateNodenameWarning) as w: nodes = control.flatten_reply(reply) assert 'Received multiple replies from node name: {0}.'.format( next(iter(reply[0]))) in str(w[0].message.args[0]) assert 'foo@example.com' in nodes assert 'bar@example.com' in nodes class test_inspect: def setup(self): self.c = Control(app=self.app) self.prev, self.app.control = self.app.control, self.c self.i = self.c.inspect() def test_prepare_reply(self): reply = self.i._prepare([ {'w1': {'ok': 1}}, {'w2': {'ok': 1}}, ]) assert reply == { 'w1': {'ok': 1}, 'w2': {'ok': 1}, } i = self.c.inspect(destination='w1') assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1} @with_mock_broadcast def test_active(self): self.i.active() assert 'active' in MockMailbox.sent @with_mock_broadcast def test_clock(self): self.i.clock() assert 'clock' in MockMailbox.sent @with_mock_broadcast def test_conf(self): self.i.conf() assert 'conf' in MockMailbox.sent @with_mock_broadcast def test_hello(self): self.i.hello('george@vandelay.com') assert 'hello' in MockMailbox.sent @with_mock_broadcast def test_memsample(self): self.i.memsample() assert 'memsample' in MockMailbox.sent @with_mock_broadcast def test_memdump(self): self.i.memdump() assert 'memdump' in MockMailbox.sent @with_mock_broadcast def test_objgraph(self): self.i.objgraph() assert 'objgraph' in MockMailbox.sent @with_mock_broadcast def test_scheduled(self): self.i.scheduled() assert 'scheduled' in MockMailbox.sent @with_mock_broadcast def test_reserved(self): self.i.reserved() assert 'reserved' in MockMailbox.sent @with_mock_broadcast def test_stats(self): self.i.stats() assert 'stats' in MockMailbox.sent @with_mock_broadcast def test_revoked(self): self.i.revoked() assert 'revoked' in MockMailbox.sent @with_mock_broadcast def test_tasks(self): self.i.registered() assert 'registered' in MockMailbox.sent @with_mock_broadcast def test_ping(self): self.i.ping() assert 'ping' in MockMailbox.sent @with_mock_broadcast def test_active_queues(self): self.i.active_queues() assert 'active_queues' in MockMailbox.sent @with_mock_broadcast def test_report(self): self.i.report() assert 'report' in MockMailbox.sent class test_Broadcast: def setup(self): self.control = Control(app=self.app) self.app.control = self.control @self.app.task(shared=False) def mytask(): pass self.mytask = mytask def test_purge(self): self.control.purge() @with_mock_broadcast def test_broadcast(self): self.control.broadcast('foobarbaz', arguments=[]) assert 'foobarbaz' in MockMailbox.sent @with_mock_broadcast def test_broadcast_limit(self): self.control.broadcast( 'foobarbaz1', arguments=[], limit=None, destination=[1, 2, 3], ) assert 'foobarbaz1' in MockMailbox.sent @with_mock_broadcast def test_broadcast_validate(self): with pytest.raises(ValueError): self.control.broadcast('foobarbaz2', destination='foo') @with_mock_broadcast def test_rate_limit(self): self.control.rate_limit(self.mytask.name, '100/m') assert 'rate_limit' in MockMailbox.sent @with_mock_broadcast def test_time_limit(self): self.control.time_limit(self.mytask.name, soft=10, hard=20) assert 'time_limit' in MockMailbox.sent @with_mock_broadcast def test_add_consumer(self): self.control.add_consumer('foo') assert 'add_consumer' in MockMailbox.sent @with_mock_broadcast def test_cancel_consumer(self): self.control.cancel_consumer('foo') assert 'cancel_consumer' in MockMailbox.sent @with_mock_broadcast def test_enable_events(self): self.control.enable_events() assert 'enable_events' in MockMailbox.sent @with_mock_broadcast def test_disable_events(self): self.control.disable_events() assert 'disable_events' in MockMailbox.sent @with_mock_broadcast def test_revoke(self): self.control.revoke('foozbaaz') assert 'revoke' in MockMailbox.sent @with_mock_broadcast def test_ping(self): self.control.ping() assert 'ping' in MockMailbox.sent @with_mock_broadcast def test_election(self): self.control.election('some_id', 'topic', 'action') assert 'election' in MockMailbox.sent @with_mock_broadcast def test_pool_grow(self): self.control.pool_grow(2) assert 'pool_grow' in MockMailbox.sent @with_mock_broadcast def test_pool_shrink(self): self.control.pool_shrink(2) assert 'pool_shrink' in MockMailbox.sent @with_mock_broadcast def test_revoke_from_result(self): self.app.AsyncResult('foozbazzbar').revoke() assert 'revoke' in MockMailbox.sent @with_mock_broadcast def test_revoke_from_resultset(self): r = self.app.GroupResult(uuid(), [self.app.AsyncResult(x) for x in [uuid() for i in range(10)]]) r.revoke() assert 'revoke' in MockMailbox.sent