123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import unittest2 as unittest
- from celery.task import control
- from celery.task.builtins import PingTask
- from celery.utils import gen_unique_id
- from celery.utils.functional import wraps
- class MockBroadcastPublisher(object):
- sent = []
- def __init__(self, *args, **kwargs):
- pass
- def send(self, command, *args, **kwargs):
- self.__class__.sent.append(command)
- def close(self):
- pass
- class MockControlReplyConsumer(object):
- def __init__(self, *args, **kwarg):
- pass
- def collect(self, *args, **kwargs):
- pass
- def close(self):
- pass
- def with_mock_broadcast(fun):
- @wraps(fun)
- def _mocked(*args, **kwargs):
- old_pub = control.BroadcastPublisher
- old_rep = control.ControlReplyConsumer
- control.BroadcastPublisher = MockBroadcastPublisher
- control.ControlReplyConsumer = MockControlReplyConsumer
- try:
- return fun(*args, **kwargs)
- finally:
- MockBroadcastPublisher.sent = []
- control.BroadcastPublisher = old_pub
- control.ControlReplyConsumer = old_rep
- return _mocked
- class test_inspect(unittest.TestCase):
- def setUp(self):
- self.i = control.inspect()
- def test_prepare_reply(self):
- self.assertDictEqual(self.i._prepare([{"w1": {"ok": 1}},
- {"w2": {"ok": 1}}]),
- {"w1": {"ok": 1}, "w2": {"ok": 1}})
- i = control.inspect(destination="w1")
- self.assertEqual(i._prepare([{"w1": {"ok": 1}}]),
- {"ok": 1})
- @with_mock_broadcast
- def test_active(self):
- self.i.active()
- self.assertIn("dump_active", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_scheduled(self):
- self.i.scheduled()
- self.assertIn("dump_schedule", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_reserved(self):
- self.i.reserved()
- self.assertIn("dump_reserved", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_stats(self):
- self.i.stats()
- self.assertIn("stats", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_revoked(self):
- self.i.revoked()
- self.assertIn("dump_revoked", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_registered_tasks(self):
- self.i.registered_tasks()
- self.assertIn("dump_tasks", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_enable_events(self):
- self.i.enable_events()
- self.assertIn("enable_events", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_disable_events(self):
- self.i.disable_events()
- self.assertIn("disable_events", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_diagnose(self):
- self.i.diagnose()
- self.assertIn("diagnose", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_ping(self):
- self.i.ping()
- self.assertIn("ping", MockBroadcastPublisher.sent)
- class test_Broadcast(unittest.TestCase):
- def test_discard_all(self):
- control.discard_all()
- @with_mock_broadcast
- def test_broadcast(self):
- control.broadcast("foobarbaz", arguments=[])
- self.assertIn("foobarbaz", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_broadcast_limit(self):
- control.broadcast("foobarbaz1", arguments=[], limit=None,
- destination=[1, 2, 3])
- self.assertIn("foobarbaz1", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_broadcast_validate(self):
- self.assertRaises(ValueError, control.broadcast, "foobarbaz2",
- destination="foo")
- @with_mock_broadcast
- def test_rate_limit(self):
- control.rate_limit(PingTask.name, "100/m")
- self.assertIn("rate_limit", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_revoke(self):
- control.revoke("foozbaaz")
- self.assertIn("revoke", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_ping(self):
- control.ping()
- self.assertIn("ping", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_revoke_from_result(self):
- from celery.result import AsyncResult
- AsyncResult("foozbazzbar").revoke()
- self.assertIn("revoke", MockBroadcastPublisher.sent)
- @with_mock_broadcast
- def test_revoke_from_resultset(self):
- from celery.result import TaskSetResult, AsyncResult
- r = TaskSetResult(gen_unique_id(), map(AsyncResult, [gen_unique_id()
- for i in range(10)]))
- r.revoke()
- self.assertIn("revoke", MockBroadcastPublisher.sent)
|