test_worker_control.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import socket
  2. import unittest2 as unittest
  3. from celery import conf
  4. from celery.decorators import task
  5. from celery.registry import tasks
  6. from celery.task.builtins import PingTask
  7. from celery.utils import gen_unique_id
  8. from celery.worker import control
  9. from celery.worker.buckets import FastQueue
  10. from celery.worker.state import revoked
  11. from celery.worker.scheduler import Scheduler
  12. hostname = socket.gethostname()
  13. @task(rate_limit=200) # for extra info in dump_tasks
  14. def mytask():
  15. pass
  16. class Dispatcher(object):
  17. enabled = None
  18. def __init__(self, *args, **kwargs):
  19. self.sent = []
  20. def enable(self):
  21. self.enabled = True
  22. def disable(self):
  23. self.enabled = False
  24. def send(self, event):
  25. self.sent.append(event)
  26. class Listener(object):
  27. def __init__(self):
  28. self.ready_queue = FastQueue()
  29. self.ready_queue.put("the quick brown fox")
  30. self.eta_schedule = Scheduler(self.ready_queue)
  31. self.event_dispatcher = Dispatcher()
  32. class test_ControlPanel(unittest.TestCase):
  33. def setUp(self):
  34. self.panel = self.create_panel(listener=Listener())
  35. def create_panel(self, **kwargs):
  36. return control.ControlDispatch(hostname=hostname, **kwargs)
  37. def test_disable_events(self):
  38. listener = Listener()
  39. panel = self.create_panel(listener=listener)
  40. listener.event_dispatcher.enabled = True
  41. panel.execute("disable_events")
  42. self.assertEqual(listener.event_dispatcher.enabled, False)
  43. self.assertIn("worker-offline", listener.event_dispatcher.sent)
  44. def test_enable_events(self):
  45. listener = Listener()
  46. panel = self.create_panel(listener=listener)
  47. listener.event_dispatcher.enabled = False
  48. panel.execute("enable_events")
  49. self.assertEqual(listener.event_dispatcher.enabled, True)
  50. self.assertIn("worker-online", listener.event_dispatcher.sent)
  51. def test_dump_tasks(self):
  52. info = "\n".join(self.panel.execute("dump_tasks"))
  53. self.assertIn("mytask", info)
  54. self.assertIn("rate_limit=200", info)
  55. def test_dump_schedule(self):
  56. listener = Listener()
  57. panel = self.create_panel(listener=listener)
  58. self.assertFalse(panel.execute("dump_schedule"))
  59. listener.eta_schedule.enter("foo", eta=100)
  60. self.assertTrue(panel.execute("dump_schedule"))
  61. def test_dump_reserved(self):
  62. listener = Listener()
  63. panel = self.create_panel(listener=listener)
  64. info = "\n".join(panel.execute("dump_reserved"))
  65. self.assertIn("the quick brown fox", info)
  66. listener.ready_queue = FastQueue()
  67. info = "\n".join(panel.execute("dump_reserved"))
  68. self.assertFalse(info)
  69. def test_rate_limit_when_disabled(self):
  70. conf.DISABLE_RATE_LIMITS = True
  71. try:
  72. e = self.panel.execute("rate_limit", kwargs=dict(
  73. task_name=mytask.name, rate_limit="100/m"))
  74. self.assertIn("rate limits disabled", e.get("error"))
  75. finally:
  76. conf.DISABLE_RATE_LIMITS = False
  77. def test_rate_limit_invalid_rate_limit_string(self):
  78. e = self.panel.execute("rate_limit", kwargs=dict(
  79. task_name="tasks.add", rate_limit="x1240301#%!"))
  80. self.assertIn("Invalid rate limit string", e.get("error"))
  81. def test_rate_limit(self):
  82. class Listener(object):
  83. class ReadyQueue(object):
  84. fresh = False
  85. def refresh(self):
  86. self.fresh = True
  87. def __init__(self):
  88. self.ready_queue = self.ReadyQueue()
  89. listener = Listener()
  90. panel = self.create_panel(listener=listener)
  91. task = tasks[PingTask.name]
  92. old_rate_limit = task.rate_limit
  93. try:
  94. panel.execute("rate_limit", kwargs=dict(task_name=task.name,
  95. rate_limit="100/m"))
  96. self.assertEqual(task.rate_limit, "100/m")
  97. self.assertTrue(listener.ready_queue.fresh)
  98. listener.ready_queue.fresh = False
  99. panel.execute("rate_limit", kwargs=dict(task_name=task.name,
  100. rate_limit=0))
  101. self.assertEqual(task.rate_limit, 0)
  102. self.assertTrue(listener.ready_queue.fresh)
  103. finally:
  104. task.rate_limit = old_rate_limit
  105. def test_rate_limit_nonexistant_task(self):
  106. self.panel.execute("rate_limit", kwargs={
  107. "task_name": "xxxx.does.not.exist",
  108. "rate_limit": "1000/s"})
  109. def test_unexposed_command(self):
  110. self.panel.execute("foo", kwargs={})
  111. def test_revoke_with_name(self):
  112. uuid = gen_unique_id()
  113. m = {"command": "revoke",
  114. "destination": hostname,
  115. "task_id": uuid,
  116. "task_name": mytask.name}
  117. self.panel.dispatch_from_message(m)
  118. self.assertIn(uuid, revoked)
  119. def test_revoke_with_name_not_in_registry(self):
  120. uuid = gen_unique_id()
  121. m = {"command": "revoke",
  122. "destination": hostname,
  123. "task_id": uuid,
  124. "task_name": "xxxxxxxxx33333333388888"}
  125. self.panel.dispatch_from_message(m)
  126. self.assertIn(uuid, revoked)
  127. def test_revoke(self):
  128. uuid = gen_unique_id()
  129. m = {"command": "revoke",
  130. "destination": hostname,
  131. "task_id": uuid}
  132. self.panel.dispatch_from_message(m)
  133. self.assertIn(uuid, revoked)
  134. m = {"command": "revoke",
  135. "destination": "does.not.exist",
  136. "task_id": uuid + "xxx"}
  137. self.panel.dispatch_from_message(m)
  138. self.assertNotIn(uuid + "xxx", revoked)
  139. def test_ping(self):
  140. m = {"command": "ping",
  141. "destination": hostname}
  142. r = self.panel.dispatch_from_message(m)
  143. self.assertEqual(r, "pong")
  144. def test_shutdown(self):
  145. m = {"command": "shutdown",
  146. "destination": hostname}
  147. self.assertRaises(SystemExit, self.panel.dispatch_from_message, m)
  148. def test_panel_reply(self):
  149. replies = []
  150. class MockReplyPublisher(object):
  151. def __init__(self, *args, **kwargs):
  152. pass
  153. def send(self, reply, **kwargs):
  154. replies.append(reply)
  155. def close(self):
  156. pass
  157. class _Dispatch(control.ControlDispatch):
  158. ReplyPublisher = MockReplyPublisher
  159. panel = _Dispatch(hostname, listener=Listener())
  160. r = panel.execute("ping", reply_to={"exchange": "x",
  161. "routing_key": "x"})
  162. self.assertEqual(r, "pong")
  163. self.assertDictEqual(replies[0], {panel.hostname: "pong"})