Browse Source

New remote control command: dump_schedule

Dumps the currently waiting ETA tasks in the worker to the logs.

    >>> from celery.task.control import broadcast
    >>> broadcast("dump_schedule")

[2010-05-10 12:03:22,200: WARNING/MainProcess] * Dump of current schedule:
0. 2010-05-10 12:04:12 pri0 <TaskWrapper: {name:"tasks.add",
id:"c6a306f5-348a-415a-a740-a1f567f0a728", args:"[2, 2]", kwargs:"{}"}>
1. 2010-05-10 12:04:13 pri0 <TaskWrapper: {name:"tasks.add",
id:"28e1cea5-ab41-4ef6-a74a-1551d730c3c3", args:"[2, 2]", kwargs:"{}"}>
2. 2010-05-10 12:04:13 pri0 <TaskWrapper: {name:"tasks.add",
id:"c3cba1a0-159f-4dfd-adee-44d2edeff541", args:"[2, 2]", kwargs:"{}"}>
Ask Solem 15 years ago
parent
commit
392dc55f69
3 changed files with 19 additions and 0 deletions
  1. 14 0
      celery/worker/control.py
  2. 1 0
      celery/worker/listener.py
  3. 4 0
      celery/worker/scheduler.py

+ 14 - 0
celery/worker/control.py

@@ -1,4 +1,5 @@
 import socket
+from datetime import datetime
 
 from celery import log
 from celery.registry import tasks
@@ -6,6 +7,7 @@ from celery.worker.revoke import revoked
 
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 
+
 def expose(fun):
     """Expose method as a celery worker control command, allowed to be called
     from a message."""
@@ -61,6 +63,18 @@ class Control(object):
         self.logger.critical("Got shutdown from remote.")
         raise SystemExit
 
+    @expose
+    def dump_schedule(self, **kwargs):
+        schedule = self.listener.eta_schedule
+        info = "--Empty Schedule--"
+        if schedule.queue:
+            formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
+                    datetime.fromtimestamp(item["eta"]),
+                    item["priority"],
+                    item["item"])
+            info = "\n".join(map(formatitem, enumerate(schedule.info())))
+        self.logger.warn("* Dump of current schedule:\n%s" % (info, ))
+
     @expose
     def dump_tasks(self, **kwargs):
         from celery import registry

+ 1 - 0
celery/worker/listener.py

@@ -1,4 +1,5 @@
 from __future__ import generators
+
 import socket
 import warnings
 from datetime import datetime

+ 4 - 0
celery/worker/scheduler.py

@@ -1,4 +1,5 @@
 from __future__ import generators
+
 import time
 import heapq
 
@@ -81,6 +82,9 @@ class Scheduler(object):
     def clear(self):
         self._queue = []
 
+    def info(self):
+        return ({"eta": eta, "priority": priority, "item": item}
+                    for eta, priority, item, _ in self.queue)
     @property
     def queue(self):
         events = list(self._queue)