test_task_control.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import unittest
  2. from celery.task import control
  3. from celery.task.builtins import PingTask
  4. from celery.utils import gen_unique_id
  5. class MockBroadcastPublisher(object):
  6. sent = []
  7. def __init__(self, *args, **kwargs):
  8. pass
  9. def send(self, command, *args, **kwargs):
  10. self.__class__.sent.append(command)
  11. def close(self):
  12. pass
  13. def with_mock_broadcast(fun):
  14. def _mocked(*args, **kwargs):
  15. old_pub = control.BroadcastPublisher
  16. control.BroadcastPublisher = MockBroadcastPublisher
  17. try:
  18. return fun(*args, **kwargs)
  19. finally:
  20. MockBroadcastPublisher.sent = []
  21. control.BroadcastPublisher = old_pub
  22. return _mocked
  23. class TestBroadcast(unittest.TestCase):
  24. @with_mock_broadcast
  25. def test_broadcast(self):
  26. control.broadcast("foobarbaz", arguments=[])
  27. self.assertTrue("foobarbaz" in MockBroadcastPublisher.sent)
  28. @with_mock_broadcast
  29. def test_rate_limit(self):
  30. control.rate_limit(PingTask.name, "100/m")
  31. self.assertTrue("rate_limit" in MockBroadcastPublisher.sent)
  32. @with_mock_broadcast
  33. def test_revoke(self):
  34. control.revoke("foozbaaz")
  35. self.assertTrue("revoke" in MockBroadcastPublisher.sent)
  36. @with_mock_broadcast
  37. def test_revoke_from_result(self):
  38. from celery.result import AsyncResult
  39. AsyncResult("foozbazzbar").revoke()
  40. self.assertTrue("revoke" in MockBroadcastPublisher.sent)
  41. @with_mock_broadcast
  42. def test_revoke_from_resultset(self):
  43. from celery.result import TaskSetResult, AsyncResult
  44. r = TaskSetResult(gen_unique_id(), map(AsyncResult, [gen_unique_id()
  45. for i in range(10)]))
  46. r.revoke()
  47. self.assertTrue("revoke" in MockBroadcastPublisher.sent)