Browse Source

Merge branch '3.0'

Ask Solem 12 years ago
parent
commit
3153042116
2 changed files with 16 additions and 15 deletions
  1. 15 13
      celery/tests/worker/test_control.py
  2. 1 2
      celery/worker/control.py

+ 15 - 13
celery/tests/worker/test_control.py

@@ -39,10 +39,6 @@ class Consumer(consumer.Consumer):
 
     def __init__(self):
         self.ready_queue = FastQueue()
-        self.ready_queue.put(TaskRequest(mytask.name,
-                                         uuid(),
-                                         args=(2, 2),
-                                         kwargs={}))
         self.timer = Timer()
         self.app = current_app
         self.event_dispatcher = Mock()
@@ -250,16 +246,22 @@ class test_ControlPanel(Case):
         self.assertTrue(panel.handle('dump_schedule'))
 
     def test_dump_reserved(self):
+        from celery.worker import state
         consumer = Consumer()
-        panel = self.create_panel(consumer=consumer)
-        response = panel.handle('dump_reserved', {'safe': True})
-        self.assertDictContainsSubset({'name': mytask.name,
-                                       'args': (2, 2),
-                                       'kwargs': {},
-                                       'hostname': socket.gethostname()},
-                                       response[0])
-        consumer.ready_queue = FastQueue()
-        self.assertFalse(panel.handle('dump_reserved'))
+        state.reserved_requests.add(TaskRequest(mytask.name,
+                uuid(), args=(2, 2), kwargs={}))
+        try:
+            panel = self.create_panel(consumer=consumer)
+            response = panel.handle('dump_reserved', {'safe': True})
+            self.assertDictContainsSubset({'name': mytask.name,
+                                        'args': (2, 2),
+                                        'kwargs': {},
+                                        'hostname': socket.gethostname()},
+                                        response[0])
+            state.reserved_requests.clear()
+            self.assertFalse(panel.handle('dump_reserved'))
+        finally:
+            state.reserved_requests.clear()
 
     def test_rate_limit_when_disabled(self):
         app = current_app

+ 1 - 2
celery/worker/control.py

@@ -161,8 +161,7 @@ def dump_schedule(panel, safe=False, **kwargs):
 
 @Panel.register
 def dump_reserved(panel, safe=False, **kwargs):
-    ready_queue = panel.consumer.ready_queue
-    reserved = ready_queue.items
+    reserved = state.reserved_requests
     if not reserved:
         logger.debug('--Empty queue--')
         return []