test_task_control.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import unittest2 as unittest
  2. from celery.task import control
  3. from celery.task.builtins import PingTask
  4. from celery.utils import gen_unique_id
  5. from celery.utils.functional import wraps
  6. class MockBroadcastPublisher(object):
  7. sent = []
  8. def __init__(self, *args, **kwargs):
  9. pass
  10. def send(self, command, *args, **kwargs):
  11. self.__class__.sent.append(command)
  12. def close(self):
  13. pass
  14. class MockControlReplyConsumer(object):
  15. def __init__(self, *args, **kwarg):
  16. pass
  17. def collect(self, *args, **kwargs):
  18. pass
  19. def close(self):
  20. pass
  21. def with_mock_broadcast(fun):
  22. @wraps(fun)
  23. def _mocked(*args, **kwargs):
  24. old_pub = control.BroadcastPublisher
  25. old_rep = control.ControlReplyConsumer
  26. control.BroadcastPublisher = MockBroadcastPublisher
  27. control.ControlReplyConsumer = MockControlReplyConsumer
  28. try:
  29. return fun(*args, **kwargs)
  30. finally:
  31. MockBroadcastPublisher.sent = []
  32. control.BroadcastPublisher = old_pub
  33. control.ControlReplyConsumer = old_rep
  34. return _mocked
  35. class test_Broadcast(unittest.TestCase):
  36. def test_discard_all(self):
  37. control.discard_all()
  38. @with_mock_broadcast
  39. def test_broadcast(self):
  40. control.broadcast("foobarbaz", arguments=[])
  41. self.assertIn("foobarbaz", MockBroadcastPublisher.sent)
  42. @with_mock_broadcast
  43. def test_broadcast_limit(self):
  44. control.broadcast("foobarbaz1", arguments=[], limit=None,
  45. destination=[1, 2, 3])
  46. self.assertIn("foobarbaz1", MockBroadcastPublisher.sent)
  47. @with_mock_broadcast
  48. def test_broadcast_validate(self):
  49. self.assertRaises(ValueError, control.broadcast, "foobarbaz2",
  50. destination="foo")
  51. @with_mock_broadcast
  52. def test_rate_limit(self):
  53. control.rate_limit(PingTask.name, "100/m")
  54. self.assertIn("rate_limit", MockBroadcastPublisher.sent)
  55. @with_mock_broadcast
  56. def test_revoke(self):
  57. control.revoke("foozbaaz")
  58. self.assertIn("revoke", MockBroadcastPublisher.sent)
  59. @with_mock_broadcast
  60. def test_ping(self):
  61. control.ping()
  62. self.assertIn("ping", MockBroadcastPublisher.sent)
  63. @with_mock_broadcast
  64. def test_revoke_from_result(self):
  65. from celery.result import AsyncResult
  66. AsyncResult("foozbazzbar").revoke()
  67. self.assertIn("revoke", MockBroadcastPublisher.sent)
  68. @with_mock_broadcast
  69. def test_revoke_from_resultset(self):
  70. from celery.result import TaskSetResult, AsyncResult
  71. r = TaskSetResult(gen_unique_id(), map(AsyncResult, [gen_unique_id()
  72. for i in range(10)]))
  73. r.revoke()
  74. self.assertIn("revoke", MockBroadcastPublisher.sent)