Browse Source

event.state: Only keep max 4 heartbeats + add limit argument to tasks_by_timestamp, tasks_by_type, tasks_by_worker. Also sort the result of task_types.

Ask Solem 14 years ago
parent
commit
26e433d606
1 changed files with 12 additions and 8 deletions
  1. 12 8
      celery/events/state.py

+ 12 - 8
celery/events/state.py

@@ -20,6 +20,7 @@ class Element(AttributeDict):
 
 class Worker(Element):
     """Worker State."""
+    heartbeat_max = 4
 
     def __init__(self, **fields):
         super(Worker, self).__init__(**fields)
@@ -37,6 +38,8 @@ class Worker(Element):
     def _heartpush(self, timestamp):
         if timestamp:
             heapq.heappush(self.heartbeats, timestamp)
+            if len(self.heartbeats) > self.heartbeat_max:
+                self.heartbeats = self.heartbeats[:self.heartbeat_max]
 
     @property
     def alive(self):
@@ -179,41 +182,42 @@ class State(object):
         if self.event_callback:
             self.event_callback(self, event)
 
-    def tasks_by_timestamp(self):
+    def tasks_by_timestamp(self, limit=None):
         """Get tasks by timestamp.
 
         Returns a list of ``(uuid, task)`` tuples.
 
         """
-        return self._sort_tasks_by_time(self.tasks.items())
+        return self._sort_tasks_by_time(self.tasks.items()[:limit])
 
     def _sort_tasks_by_time(self, tasks):
         """Sort task items by time."""
-        return sorted(tasks, key=lambda t: t[1].timestamp, reverse=True)
+        return sorted(tasks, key=lambda t: t[1].timestamp,
+                      reverse=True)
 
-    def tasks_by_type(self, name):
+    def tasks_by_type(self, name, limit=None):
         """Get all tasks by type.
 
         Returns a list of ``(uuid, task)`` tuples.
 
         """
         return self._sort_tasks_by_time([(uuid, task)
-                for uuid, task in self.tasks.items()
+                for uuid, task in self.tasks.items()[:limit]
                     if task.name == name])
 
-    def tasks_by_worker(self, hostname):
+    def tasks_by_worker(self, hostname, limit=None):
         """Get all tasks by worker.
 
         Returns a list of ``(uuid, task)`` tuples.
 
         """
         return self._sort_tasks_by_time([(uuid, task)
-                for uuid, task in self.tasks.items()
+                for uuid, task in self.tasks.items()[:limit]
                     if task.worker.hostname == hostname])
 
     def task_types(self):
         """Returns a list of all seen task types."""
-        return list(set(task.name for task in self.tasks.values()))
+        return list(sorted(set(task.name for task in self.tasks.values())))
 
     def alive_workers(self):
         """Returns a list of (seemingly) alive workers."""