Explorar o código

New remote control command: dump_reserved: Dump currently waiting tasks

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
a270422862
Modificáronse 2 ficheiros con 31 adicións e 9 borrados
  1. 17 8
      celery/worker/buckets.py
  2. 14 1
      celery/worker/control/builtins.py

+ 17 - 8
celery/worker/buckets.py

@@ -1,5 +1,6 @@
 import time
 from Queue import Queue, Empty as QueueEmpty
+from itertools import chain, izip_longest
 
 from carrot.utils import partition
 
@@ -176,7 +177,6 @@ class TaskBucket(object):
         if task_name not in self.buckets:
             return self.update_bucket_for_type(task_name)
 
-
     def qsize(self):
         """Get the total size of all the queues."""
         return sum(bucket.qsize() for bucket in self.buckets.values())
@@ -186,13 +186,14 @@ class TaskBucket(object):
 
     def clear(self):
         for bucket in self.buckets.values():
-            try:
-                bucket.clear()
-            except AttributeError:
-                # Probably a Queue, not a TokenBucketQueue, so clear the
-                # underlying deque instead.
-                bucket.queue.clear()
+            bucket.clear()
 
+    @property
+    def items(self):
+        # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
+        # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
+        return filter(None, chain.from_iterable(izip_longest(*[bucket.items
+                                    for bucket in self.buckets.values()])))
 
 class FastQueue(Queue):
     """:class:`Queue.Queue` supporting the interface of
@@ -210,6 +211,10 @@ class FastQueue(Queue):
     def wait(self, block=True):
         return self.get(block=block)
 
+    @property
+    def items(self):
+        return self.queue
+
 
 class TokenBucketQueue(object):
     """Queue with rate limited get operations.
@@ -307,7 +312,7 @@ class TokenBucketQueue(object):
         return self.queue.empty()
 
     def clear(self):
-        return self.queue.queue.clear()
+        return self.items.clear()
 
     def wait(self, block=False):
         """Wait until a token can be retrieved from the bucket and return
@@ -339,3 +344,7 @@ class TokenBucketQueue(object):
             self._tokens = min(self.capacity, self._tokens + delta)
             self.timestamp = now
         return self._tokens
+
+    @property
+    def items(self):
+        return self.queue.queue

+ 14 - 1
celery/worker/control/builtins.py

@@ -53,7 +53,6 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
 @Panel.register
 def dump_schedule(panel, **kwargs):
     schedule = panel.listener.eta_schedule
-    info = ["--Empty Schedule--"]
     if not schedule.queue:
         panel.logger.info("--Empty schedule--")
         return []
@@ -68,6 +67,20 @@ def dump_schedule(panel, **kwargs):
     return info
 
 
+@Panel.register
+def dump_reserved(panel, **kwargs):
+    ready_queue = panel.listener.ready_queue
+    reserved = ready_queue.items
+    if not reserved:
+        panel.logger.info("--Empty queue--")
+        return []
+    info = map(repr, reserved)
+    panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
+                            "\n".join(info, )))
+    return info
+
+
+
 @Panel.register
 def dump_tasks(panel, **kwargs):