test_task_control.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from functools import wraps
  4. from kombu.pidbox import Mailbox
  5. from celery.app import app_or_default
  6. from celery.task import control
  7. from celery.task import task
  8. from celery.utils import uuid
  9. from celery.tests.utils import Case
  10. @task
  11. def mytask():
  12. pass
  13. class MockMailbox(Mailbox):
  14. sent = []
  15. def _publish(self, command, *args, **kwargs):
  16. self.__class__.sent.append(command)
  17. def close(self):
  18. pass
  19. def _collect(self, *args, **kwargs):
  20. pass
  21. class Control(control.Control):
  22. Mailbox = MockMailbox
  23. def with_mock_broadcast(fun):
  24. @wraps(fun)
  25. def _resets(*args, **kwargs):
  26. MockMailbox.sent = []
  27. try:
  28. return fun(*args, **kwargs)
  29. finally:
  30. MockMailbox.sent = []
  31. return _resets
  32. class test_inspect(Case):
  33. def setUp(self):
  34. app = app_or_default()
  35. self.i = Control(app=app).inspect()
  36. def test_prepare_reply(self):
  37. self.assertDictEqual(self.i._prepare([{"w1": {"ok": 1}},
  38. {"w2": {"ok": 1}}]),
  39. {"w1": {"ok": 1}, "w2": {"ok": 1}})
  40. i = control.inspect(destination="w1")
  41. self.assertEqual(i._prepare([{"w1": {"ok": 1}}]),
  42. {"ok": 1})
  43. @with_mock_broadcast
  44. def test_active(self):
  45. self.i.active()
  46. self.assertIn("dump_active", MockMailbox.sent)
  47. @with_mock_broadcast
  48. def test_scheduled(self):
  49. self.i.scheduled()
  50. self.assertIn("dump_schedule", MockMailbox.sent)
  51. @with_mock_broadcast
  52. def test_reserved(self):
  53. self.i.reserved()
  54. self.assertIn("dump_reserved", MockMailbox.sent)
  55. @with_mock_broadcast
  56. def test_stats(self):
  57. self.i.stats()
  58. self.assertIn("stats", MockMailbox.sent)
  59. @with_mock_broadcast
  60. def test_revoked(self):
  61. self.i.revoked()
  62. self.assertIn("dump_revoked", MockMailbox.sent)
  63. @with_mock_broadcast
  64. def test_asks(self):
  65. self.i.registered()
  66. self.assertIn("dump_tasks", MockMailbox.sent)
  67. @with_mock_broadcast
  68. def test_enable_events(self):
  69. self.i.enable_events()
  70. self.assertIn("enable_events", MockMailbox.sent)
  71. @with_mock_broadcast
  72. def test_disable_events(self):
  73. self.i.disable_events()
  74. self.assertIn("disable_events", MockMailbox.sent)
  75. @with_mock_broadcast
  76. def test_ping(self):
  77. self.i.ping()
  78. self.assertIn("ping", MockMailbox.sent)
  79. @with_mock_broadcast
  80. def test_add_consumer(self):
  81. self.i.add_consumer("foo")
  82. self.assertIn("add_consumer", MockMailbox.sent)
  83. @with_mock_broadcast
  84. def test_cancel_consumer(self):
  85. self.i.cancel_consumer("foo")
  86. self.assertIn("cancel_consumer", MockMailbox.sent)
  87. class test_Broadcast(Case):
  88. def setUp(self):
  89. self.app = app_or_default()
  90. self.control = Control(app=self.app)
  91. self.app.control = self.control
  92. def tearDown(self):
  93. del(self.app.control)
  94. def test_discard_all(self):
  95. self.control.discard_all()
  96. @with_mock_broadcast
  97. def test_broadcast(self):
  98. self.control.broadcast("foobarbaz", arguments=[])
  99. self.assertIn("foobarbaz", MockMailbox.sent)
  100. @with_mock_broadcast
  101. def test_broadcast_limit(self):
  102. self.control.broadcast("foobarbaz1", arguments=[], limit=None,
  103. destination=[1, 2, 3])
  104. self.assertIn("foobarbaz1", MockMailbox.sent)
  105. @with_mock_broadcast
  106. def test_broadcast_validate(self):
  107. with self.assertRaises(ValueError):
  108. self.control.broadcast("foobarbaz2",
  109. destination="foo")
  110. @with_mock_broadcast
  111. def test_rate_limit(self):
  112. self.control.rate_limit(mytask.name, "100/m")
  113. self.assertIn("rate_limit", MockMailbox.sent)
  114. @with_mock_broadcast
  115. def test_revoke(self):
  116. self.control.revoke("foozbaaz")
  117. self.assertIn("revoke", MockMailbox.sent)
  118. @with_mock_broadcast
  119. def test_ping(self):
  120. self.control.ping()
  121. self.assertIn("ping", MockMailbox.sent)
  122. @with_mock_broadcast
  123. def test_revoke_from_result(self):
  124. self.app.AsyncResult("foozbazzbar").revoke()
  125. self.assertIn("revoke", MockMailbox.sent)
  126. @with_mock_broadcast
  127. def test_revoke_from_resultset(self):
  128. r = self.app.TaskSetResult(uuid(),
  129. map(self.app.AsyncResult,
  130. [uuid() for i in range(10)]))
  131. r.revoke()
  132. self.assertIn("revoke", MockMailbox.sent)